Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Monitoring NServiceBus endpoints with Application Insights

Component: NServiceBus
NuGet Package: NServiceBus (8.x)

Azure Application Insights (App Insights) provides monitoring and alerting capabilities that can be leveraged to monitor the health of NServiceBus endpoints.

This sample shows how to capture NServiceBus OpenTelemetry traces and export them to App Insights. The sample simulates message load as well as a 10% failure rate on processing messages.

Prerequisites

This sample requires an App Insights connection string.

Although the sample uses Azure Application Insights, the solution itself does not require an Azure message transport. This example uses the Learning Transport but could be modified to run on any transport.

Running the sample

  1. Create an Application Insights resource in Azure
  2. Copy the connection string from the Azure portal dashboard into the sample
  3. Start the sample endpoint
  4. Press any key to send a message, or ESC to quit

Reviewing traces

  1. On the Azure portal dashboard, open the InvestigatePerformance panel
  2. Drill into the samples
  3. Review the custom properties

Timeline view of a trace in Application Insights

Reviewing meters

Navigate to MonitoringMetrics on the Azure portal dashboard for the configured Application Insight instance to start creating graphs.

It may take a few minutes for the meter data to populate to Azure. Meters will only appear on the dashboard once they have reported at least one value.

Message processing counters

To monitor the rate of messages being fetched from the queuing system, processed successfully, retried, and failed for the endpoint use:

  • nservicebus.messaging.fetches
  • nservicebus.messaging.successes
  • nservicebus.messaging.failures

Graph showing fetched, success, and failed counters in Application Insights

Recoverability

To monitor recoverability metrics use:

  • nservicebus.recoverability.immediate_retries
  • nservicebus.recoverability.delayed_retries
  • nservicebus.recoverability.retries
  • nservicebus.recoverability.sent_to_error

Graph showing recoverability metrics in Application Insights

Critical time and processing time

To monitor critical time and processing time (in milliseconds) for successfully processed messages use:

  • nservicebus.messaging.processingtime
  • nservicebus.messaging.criticaltime

Graph showing processing time and critical time metrics in Application Insights

Code walk-through

The OpenTelemetry instrumentation is enabled on the endpoint.

var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.EnableOpenTelemetry();

Tracing

The endpoint configures an OpenTelemetry trace provider that includes the NServiceBus.Core source and exports collected traces to Azure Monitor.

var traceProvider = Sdk.CreateTracerProviderBuilder()
    .SetResourceBuilder(resourceBuilder)
    .AddSource("NServiceBus.Core")
    .AddAzureMonitorTraceExporter(o => o.ConnectionString = appInsightsConnectionString)
    .AddConsoleExporter()
    .Build();

Meters

The endpoint configures an OpenTelemetry meter provider that includes the NServiceBus.Core meter and exports metric data to Azure Monitor.

var telemetryConfiguration = TelemetryConfiguration.CreateDefault();
telemetryConfiguration.ConnectionString = appInsightsConnectionString;
var telemetryClient = new TelemetryClient(telemetryConfiguration);
telemetryClient.Context.GlobalProperties["Endpoint"] = endpointName;

var meterProvider = Sdk.CreateMeterProviderBuilder()
    .SetResourceBuilder(resourceBuilder)
    .AddMeter("NServiceBus.Core")
    .AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
    .AddConsoleExporter()
    .Build();

Critical time and processing time

Critical time and processing time captured by the metrics package are not yet supported in OpenTelemetry's native format (using System.Diagnostics), so a shim is required to expose them as OpenTelemetry metrics.

class EmitNServiceBusMetrics : Feature
{
    public EmitNServiceBusMetrics()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        if (context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
        {
            // there are no metrics relevant for send only endpoints yet
            return;
        }

        var queueName = context.LocalQueueAddress().BaseAddress;
        var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;

        var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
              BindingFlags.NonPublic | BindingFlags.Instance,
              null, new Type[] { typeof(SettingsHolder) },
              null).Invoke(new object[] { (SettingsHolder)context.Settings });

        recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
        recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
        recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));

        context.Pipeline.OnReceivePipelineCompleted((e, _) =>
        {
            e.TryGetMessageType(out var messageType);

            var tags = CreateTags(queueName, discriminator, messageType);

            ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);

            if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
            {
                CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
            }

            return Task.CompletedTask;
        });
    }

    static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
    {
        headers.TryGetMessageType(out var messageType);

        var tags = CreateTags(queueName, discriminator, messageType);

        if (immediate)
        {
            ImmedidateRetries.Add(1, tags);
        }
        else
        {
            DelayedRetries.Add(1, tags);
        }
        Retries.Add(1, tags);

        return Task.CompletedTask;
    }

    static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
    {
        headers.TryGetMessageType(out var messageType);

        MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));

        return Task.CompletedTask;
    }

    static TagList CreateTags(string queueName, string discriminator, string messageType)
    {
        var tags = new TagList(new KeyValuePair<string, object>[] { new(Tags.QueueName, queueName) });

        if (!string.IsNullOrWhiteSpace(discriminator))
        {
            tags.Add(Tags.EndpointDiscriminator, discriminator);
        }

        if (!string.IsNullOrWhiteSpace(messageType))
        {
            tags.Add(Tags.MessageType, messageType);
        }

        return tags;
    }

    static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");

    public static readonly Counter<long> ImmedidateRetries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate_retries", description: "Number of immediate retries performed by the endpoint.");

    public static readonly Counter<long> DelayedRetries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed_retries", description: "Number of delayed retries performed by the endpoint.");

    public static readonly Counter<long> Retries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");

    public static readonly Counter<long> MessageSentToErrorQueue =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.moved_to_error", description: "Number of messages sent to the error queue.");

    public static readonly Histogram<double> ProcessingTime =
        NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processingtime", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");

    public static readonly Histogram<double> CriticalTime =
        NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.criticaltime", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");

    public static class Tags
    {
        public const string EndpointDiscriminator = "nservicebus.discriminator";
        public const string QueueName = "nservicebus.queue";
        public const string MessageType = "nservicebus.message_type";
    }
}
The shim passes QueueName as a custom dimension which allows filtering the graphs in Application Insights. Multi-dimensional metrics are not enabled by default. Check the Azure Monitor documentation for instructions on how to enable this feature.

Related Articles

  • OpenTelemetry
    Observability of NServiceBus endpoints with OpenTelemetry.