The Particular Service Platform has built-in support for metrics, which are captured by the NServiceBus.Metrics package. These metrics can be exposed to an observability backend using OpenTelemetry. In versions of NServiceBus prior to 9.1, there was no native OpenTelemetry-support for the following metrics:
- Critical time
- Processing time
- Handler time
- Retries
Starting from version 9.1, NServiceBus natively exposes these metrics via OpenTelemetry.
It is possible to expose these metrics via OpenTelemetry by using a shim.
This sample shows how to set up a shim that exposes NServiceBus.Metrics in an OpenTelemetry format, and exports them to App Insights.
Prerequisites
This sample requires an App Insights connection string.
Although the sample uses Azure Application Insights, the solution itself does not require an Azure message transport. This example uses the Learning Transport but could be modified to run on any transport.
Running the sample
- Create an Application Insights resource in Azure
- Copy the connection string from the Azure portal dashboard into the sample
- Start the sample endpoint
- Press any key to send a message, or ESC to quit
Reviewing meters
Navigate to Monitoring → Metrics on the Azure portal dashboard for the configured Application Insight instance to start creating graphs.
It may take a few minutes for the meter data to populate to Azure. Meters will only appear on the dashboard once they have reported at least one value.
Code walk-through
The OpenTelemetry instrumentation is enabled on the endpoint.
var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.EnableOpenTelemetry();
Next, metrics are enabled.
var meterProvider = Sdk.CreateMeterProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddMeter("NServiceBus.Core")
.AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
.AddConsoleExporter()
.Build();
The shim
The shim is responsible for converting NServiceBus.Metrics to OpenTelemetry metrics.
class EmitNServiceBusMetrics : Feature
{
public EmitNServiceBusMetrics()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
if (context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
{
// there are no metrics relevant for send only endpoints yet
return;
}
var queueName = context.LocalQueueAddress().BaseAddress;
var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;
var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance,
null, new Type[] { typeof(SettingsHolder) },
null).Invoke(new object[] { (SettingsHolder)context.Settings });
recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));
context.Pipeline.OnReceivePipelineCompleted((e, _) =>
{
e.TryGetMessageType(out var messageType);
var tags = CreateTags(queueName, discriminator, messageType);
ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);
if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
{
CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
}
return Task.CompletedTask;
});
}
static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
{
headers.TryGetMessageType(out var messageType);
var tags = CreateTags(queueName, discriminator, messageType);
if (immediate)
{
ImmedidateRetries.Add(1, tags);
}
else
{
DelayedRetries.Add(1, tags);
}
Retries.Add(1, tags);
return Task.CompletedTask;
}
static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
{
headers.TryGetMessageType(out var messageType);
MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));
return Task.CompletedTask;
}
static TagList CreateTags(string queueName, string discriminator, string messageType)
{
var tags = new TagList(new KeyValuePair<string, object>[] { new(Tags.QueueName, queueName) });
if (!string.IsNullOrWhiteSpace(discriminator))
{
tags.Add(Tags.EndpointDiscriminator, discriminator);
}
if (!string.IsNullOrWhiteSpace(messageType))
{
tags.Add(Tags.MessageType, messageType);
}
return tags;
}
static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");
public static readonly Counter<long> ImmedidateRetries =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate", description: "Number of immediate retries performed by the endpoint.");
public static readonly Counter<long> DelayedRetries =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed", description: "Number of delayed retries performed by the endpoint.");
public static readonly Counter<long> Retries =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");
public static readonly Counter<long> MessageSentToErrorQueue =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.error", description: "Number of messages sent to the error queue.");
public static readonly Histogram<double> ProcessingTime =
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processing_time", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");
public static readonly Histogram<double> CriticalTime =
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.critical_time", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");
public static class Tags
{
public const string EndpointDiscriminator = "nservicebus.discriminator";
public const string QueueName = "nservicebus.queue";
public const string MessageType = "nservicebus.message_type";
}
}
The shim passes QueueName
as a custom dimension which allows filtering the graphs in Application Insights. Multi-dimensional metrics are not enabled by default. Check the Azure Monitor documentation for instructions on how to enable this feature.