Getting Started
Architecture
Particular Service Platform
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Monitoring NServiceBus endpoints with Application Insights

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

Azure Application Insights (App Insights) provides monitoring and alerting capabilities that can be leveraged to monitor the health of NServiceBus endpoints.

This sample shows how to capture NServiceBus OpenTelemetry traces and export them to App Insights. The sample simulates message load as well as a 10% failure rate on processing messages.

Prerequisites

This sample requires an App Insights connection string.

Running the sample

  1. Create an Application Insights resource in Azure
  2. Copy the connection string from the Azure portal dashboard into the sample
  3. Start the sample endpoint
  4. Press any key to send a message, or ESC to quit

Reviewing traces

  1. On the Azure portal dashboard, open the InvestigatePerformance panel
  2. Drill into the samples
  3. Review the custom properties

Timeline view of a trace in Application Insights

Reviewing meters

Navigate to MonitoringMetrics on the Azure portal dashboard for the configured Application Insight instance to start creating graphs.

Message processing counters

To monitor the rate of messages being fetched from the queuing system, processed successfully, retried, and failed for the endpoint use:

  • nservicebus.messaging.fetches
  • nservicebus.messaging.successes
  • nservicebus.messaging.failures

Graph showing fetched, success, and failed counters in Application Insights

Recoverability

To monitor recoverability metrics use:

  • nservicebus.recoverability.immediate_retries
  • nservicebus.recoverability.delayed_retries
  • nservicebus.recoverability.retries
  • nservicebus.recoverability.sent_to_error

Graph showing recoverability metrics in Application Insights

Critical time and processing time

To monitor critical time and processing time (in milliseconds) for successfully processed messages use:

  • nservicebus.messaging.processingtime
  • nservicebus.messaging.criticaltime

Graph showing processing time and critical time metrics in Application Insights

Code walk-through

The OpenTelemetry instrumentation is enabled on the endpoint.

var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.EnableOpenTelemetry();

Tracing

The endpoint configures an OpenTelemetry trace provider that includes the NServiceBus.Core source and exports collected traces to Azure Monitor.

var traceProvider = Sdk.CreateTracerProviderBuilder()
    .SetResourceBuilder(resourceBuilder)
    .AddSource("NServiceBus.Core")
    .AddAzureMonitorTraceExporter(o => o.ConnectionString = appInsightsConnectionString)
    .AddConsoleExporter()
    .Build();

Meters

The endpoint configures an OpenTelemetry meter provider that includes the NServiceBus.Core meter and exports metric data to Azure Monitor.

var meterProvider = Sdk.CreateMeterProviderBuilder()
    .SetResourceBuilder(resourceBuilder)
    .AddMeter("NServiceBus.Core")
    .AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
    .AddConsoleExporter()
    .Build();

Critical time and processing time

Critical time and processing time captured by the metrics package are not yet supported in OpenTelemetry's native format (using System.Diagnostics), so a shim is required to expose them as OpenTelemetry metrics.

class EmitNServiceBusMetrics : Feature
{
    public EmitNServiceBusMetrics()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        if(context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
        {
            // there are no metrics relevant for send only endpoints yet
            return;
        }

        var queueName = context.LocalQueueAddress().BaseAddress;
        var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;

        var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
              BindingFlags.NonPublic | BindingFlags.Instance,
              null, [typeof(SettingsHolder)],
              null).Invoke([(SettingsHolder)context.Settings]);

        recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
        recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
        recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));

        context.Pipeline.OnReceivePipelineCompleted((e, _) =>
        {
            e.TryGetMessageType(out var messageType);

            var tags = CreateTags(queueName, discriminator, messageType);

            ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);

            if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
            {
                CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
            }

            return Task.CompletedTask;
        });
    }

    static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
    {
        headers.TryGetMessageType(out var messageType);

        var tags = CreateTags(queueName, discriminator, messageType);

        if (immediate)
        {
            ImmedidateRetries.Add(1, tags);
        }
        else
        {
            DelayedRetries.Add(1, tags);
        }
        Retries.Add(1, tags);

        return Task.CompletedTask;
    }

    static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
    {
        headers.TryGetMessageType(out var messageType);

        MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));

        return Task.CompletedTask;
    }

    static TagList CreateTags(string queueName, string discriminator, string messageType)
    {
        var tags = new TagList(
        [
            new(Tags.QueueName, queueName)
        ]);

        if (!string.IsNullOrWhiteSpace(discriminator))
        {
            tags.Add(Tags.EndpointDiscriminator, discriminator);
        }

        if (!string.IsNullOrWhiteSpace(messageType))
        {
            tags.Add(Tags.MessageType, messageType);
        }

        return tags;
    }

    static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");

    public static readonly Counter<long> ImmedidateRetries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate_retries", description: "Number of immediate retries performed by the endpoint.");

    public static readonly Counter<long> DelayedRetries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed_retries", description: "Number of delayed retries performed by the endpoint.");

    public static readonly Counter<long> Retries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");

    public static readonly Counter<long> MessageSentToErrorQueue =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.moved_to_error", description: "Number of messages sent to the error queue.");

    public static readonly Histogram<double> ProcessingTime =
        NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processingtime", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");

    public static readonly Histogram<double> CriticalTime =
        NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.criticaltime", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");

    public static class Tags
    {
        public const string EndpointDiscriminator = "nservicebus.discriminator";
        public const string QueueName = "nservicebus.queue";
        public const string MessageType = "nservicebus.message_type";
    }
}

Related Articles

  • OpenTelemetry
    Observability of NServiceBus endpoints with OpenTelemetry.