Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Use a shim to expose NServiceBus.Metrics via OpenTelemetry

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

The Particular Service Platform has built-in support for metrics, which are captured by the NServiceBus.Metrics package. These metrics can be exposed to an observability backend using OpenTelemetry. In versions of NServiceBus prior to 9.1, there was no native OpenTelemetry-support for the following metrics:

  • Critical time
  • Processing time
  • Handler time
  • Retries

It is possible to expose these metrics via OpenTelemetry by using a shim.

This sample shows how to set up a shim that exposes NServiceBus.Metrics in an OpenTelemetry format, and exports them to App Insights.

Prerequisites

This sample requires an App Insights connection string.

Running the sample

  1. Create an Application Insights resource in Azure
  2. Copy the connection string from the Azure portal dashboard into the sample
  3. Start the sample endpoint
  4. Press any key to send a message, or ESC to quit

Reviewing meters

Navigate to MonitoringMetrics on the Azure portal dashboard for the configured Application Insight instance to start creating graphs.

Code walk-through

The OpenTelemetry instrumentation is enabled on the endpoint.

var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.EnableOpenTelemetry();

Next, metrics are enabled.

var meterProvider = Sdk.CreateMeterProviderBuilder()
    .SetResourceBuilder(resourceBuilder)
    .AddMeter("NServiceBus.Core*")
    .AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
    .AddConsoleExporter()
    .Build();

The shim

The shim is responsible for converting NServiceBus.Metrics to OpenTelemetry metrics.

class EmitNServiceBusMetrics : Feature
{
    public EmitNServiceBusMetrics()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        if (context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
        {
            // there are no metrics relevant for send only endpoints yet
            return;
        }

        var queueName = context.LocalQueueAddress().BaseAddress;
        var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;

        var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
              BindingFlags.NonPublic | BindingFlags.Instance,
              null, new Type[] { typeof(SettingsHolder) },
              null).Invoke(new object[] { (SettingsHolder)context.Settings });

        recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
        recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
        recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));

        context.Pipeline.OnReceivePipelineCompleted((e, _) =>
        {
            e.TryGetMessageType(out var messageType);

            var tags = CreateTags(queueName, discriminator, messageType);

            ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);

            if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
            {
                CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
            }

            return Task.CompletedTask;
        });
    }

    static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
    {
        headers.TryGetMessageType(out var messageType);

        var tags = CreateTags(queueName, discriminator, messageType);

        if (immediate)
        {
            ImmedidateRetries.Add(1, tags);
        }
        else
        {
            DelayedRetries.Add(1, tags);
        }
        Retries.Add(1, tags);

        return Task.CompletedTask;
    }

    static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
    {
        headers.TryGetMessageType(out var messageType);

        MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));

        return Task.CompletedTask;
    }

    static TagList CreateTags(string queueName, string discriminator, string messageType)
    {
        var tags = new TagList(new KeyValuePair<string, object>[] { new(Tags.QueueName, queueName) });

        if (!string.IsNullOrWhiteSpace(discriminator))
        {
            tags.Add(Tags.EndpointDiscriminator, discriminator);
        }

        if (!string.IsNullOrWhiteSpace(messageType))
        {
            tags.Add(Tags.MessageType, messageType);
        }

        return tags;
    }

    static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");

    public static readonly Counter<long> ImmedidateRetries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate", description: "Number of immediate retries performed by the endpoint.");

    public static readonly Counter<long> DelayedRetries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed", description: "Number of delayed retries performed by the endpoint.");

    public static readonly Counter<long> Retries =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");

    public static readonly Counter<long> MessageSentToErrorQueue =
        NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.error", description: "Number of messages sent to the error queue.");

    public static readonly Histogram<double> ProcessingTime =
        NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processing_time", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");

    public static readonly Histogram<double> CriticalTime =
        NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.critical_time", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");

    public static class Tags
    {
        public const string EndpointDiscriminator = "nservicebus.discriminator";
        public const string QueueName = "nservicebus.queue";
        public const string MessageType = "nservicebus.message_type";
    }
}

Related Articles

  • OpenTelemetry
    Observability of NServiceBus endpoints with OpenTelemetry.