Azure Application Insights (App Insights) provides monitoring and alerting capabilities that can be leveraged to monitor the health of NServiceBus endpoints.
This sample shows how to capture NServiceBus OpenTelemetry traces and export them to App Insights. The sample simulates message load as well as a 10% failure rate on processing messages.
Prerequisites
This sample requires an App Insights connection string.
Although the sample uses Azure Application Insights, the solution itself does not require an Azure message transport. This example uses the Learning Transport but could be modified to run on any transport.
Running the sample
- Create an Application Insights resource in Azure
- Copy the connection string from the Azure portal dashboard into the sample
- Start the sample endpoint
- Press any key to send a message, or ESC to quit
Reviewing traces
- On the Azure portal dashboard, open the Investigate → Performance panel
- Drill into the samples
- Review the custom properties
Reviewing meters
Navigate to Monitoring → Metrics on the Azure portal dashboard for the configured Application Insight instance to start creating graphs.
It may take a few minutes for the meter data to populate to Azure. Meters will only appear on the dashboard once they have reported at least one value.
Message processing counters
To monitor the rate of messages being fetched from the queuing system, processed successfully, retried, and failed for the endpoint use:
nservicebus.
messaging. fetches nservicebus.
messaging. successes nservicebus.
messaging. failures
Recoverability
To monitor recoverability metrics use:
nservicebus.
recoverability. immediate_retries nservicebus.
recoverability. delayed_retries nservicebus.
recoverability. retries nservicebus.
recoverability. sent_to_error
Critical time and processing time
To monitor critical time and processing time (in milliseconds) for successfully processed messages use:
nservicebus.
messaging. processingtime nservicebus.
messaging. criticaltime
Code walk-through
The OpenTelemetry instrumentation is enabled on the endpoint.
var endpointConfiguration = new EndpointConfiguration(endpointName);
endpointConfiguration.EnableOpenTelemetry();
Tracing
The endpoint configures an OpenTelemetry trace provider that includes the NServiceBus.
source and exports collected traces to Azure Monitor.
var traceProvider = Sdk.CreateTracerProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddSource("NServiceBus.Core")
.AddAzureMonitorTraceExporter(o => o.ConnectionString = appInsightsConnectionString)
.AddConsoleExporter()
.Build();
Meters
The endpoint configures an OpenTelemetry meter provider that includes the NServiceBus.
meter and exports metric data to Azure Monitor.
var telemetryConfiguration = TelemetryConfiguration.CreateDefault();
telemetryConfiguration.ConnectionString = appInsightsConnectionString;
var telemetryClient = new TelemetryClient(telemetryConfiguration);
telemetryClient.Context.GlobalProperties["Endpoint"] = endpointName;
var meterProvider = Sdk.CreateMeterProviderBuilder()
.SetResourceBuilder(resourceBuilder)
.AddMeter("NServiceBus.Core")
.AddAzureMonitorMetricExporter(o => o.ConnectionString = appInsightsConnectionString)
.AddConsoleExporter()
.Build();
Critical time and processing time
Critical time and processing time captured by the metrics package are not yet supported in OpenTelemetry's native format (using System.Diagnostics), so a shim is required to expose them as OpenTelemetry metrics.
class EmitNServiceBusMetrics : Feature
{
public EmitNServiceBusMetrics()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
if(context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
{
// there are no metrics relevant for send only endpoints yet
return;
}
var queueName = context.LocalQueueAddress().BaseAddress;
var discriminator = context.InstanceSpecificQueueAddress()?.Discriminator;
var recoverabilitySettings = (RecoverabilitySettings)typeof(RecoverabilitySettings).GetConstructor(
BindingFlags.NonPublic | BindingFlags.Instance,
null, [typeof(SettingsHolder)],
null).Invoke([(SettingsHolder)context.Settings]);
recoverabilitySettings.Immediate(i => i.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, true)));
recoverabilitySettings.Delayed(d => d.OnMessageBeingRetried((m, _) => RecordRetry(m.Headers, queueName, discriminator, false)));
recoverabilitySettings.Failed(f => f.OnMessageSentToErrorQueue((m, _) => RecordFailure(m.Headers, queueName, discriminator)));
context.Pipeline.OnReceivePipelineCompleted((e, _) =>
{
e.TryGetMessageType(out var messageType);
var tags = CreateTags(queueName, discriminator, messageType);
ProcessingTime.Record((e.CompletedAt - e.StartedAt).TotalMilliseconds, tags);
if (e.TryGetDeliverAt(out DateTimeOffset startTime) || e.TryGetTimeSent(out startTime))
{
CriticalTime.Record((e.CompletedAt - startTime).TotalMilliseconds, tags);
}
return Task.CompletedTask;
});
}
static Task RecordRetry(Dictionary<string, string> headers, string queueName, string discriminator, bool immediate)
{
headers.TryGetMessageType(out var messageType);
var tags = CreateTags(queueName, discriminator, messageType);
if (immediate)
{
ImmedidateRetries.Add(1, tags);
}
else
{
DelayedRetries.Add(1, tags);
}
Retries.Add(1, tags);
return Task.CompletedTask;
}
static Task RecordFailure(Dictionary<string, string> headers, string queueName, string discriminator)
{
headers.TryGetMessageType(out var messageType);
MessageSentToErrorQueue.Add(1, CreateTags(queueName, discriminator, messageType));
return Task.CompletedTask;
}
static TagList CreateTags(string queueName, string discriminator, string messageType)
{
var tags = new TagList(
[
new(Tags.QueueName, queueName)
]);
if (!string.IsNullOrWhiteSpace(discriminator))
{
tags.Add(Tags.EndpointDiscriminator, discriminator);
}
if (!string.IsNullOrWhiteSpace(messageType))
{
tags.Add(Tags.MessageType, messageType);
}
return tags;
}
static readonly Meter NServiceBusMeter = new Meter("NServiceBus.Core", "0.1.0");
public static readonly Counter<long> ImmedidateRetries =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.immediate_retries", description: "Number of immediate retries performed by the endpoint.");
public static readonly Counter<long> DelayedRetries =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.delayed_retries", description: "Number of delayed retries performed by the endpoint.");
public static readonly Counter<long> Retries =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.retries", description: "Number of retries performed by the endpoint.");
public static readonly Counter<long> MessageSentToErrorQueue =
NServiceBusMeter.CreateCounter<long>("nservicebus.recoverability.moved_to_error", description: "Number of messages sent to the error queue.");
public static readonly Histogram<double> ProcessingTime =
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.processingtime", "ms", "The time in milliseconds between when the message was pulled from the queue until processed by the endpoint.");
public static readonly Histogram<double> CriticalTime =
NServiceBusMeter.CreateHistogram<double>("nservicebus.messaging.criticaltime", "ms", "The time in milliseconds between when the message was sent until processed by the endpoint.");
public static class Tags
{
public const string EndpointDiscriminator = "nservicebus.discriminator";
public const string QueueName = "nservicebus.queue";
public const string MessageType = "nservicebus.message_type";
}
}
The shim passes QueueName
as a custom dimension which allows filtering the graphs in Application Insights. Multi-dimensional metrics are not enabled by default. Check the Azure Monitor documentation for instructions on how to enable this feature.