Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Dispatch notification pipeline extension

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

Introduction

This sample shows how to extend the NServiceBus message processing pipeline with custom behavior to add notifications whenever a message is dispatched to the underlying transport.

Code walk-through

The solution contains a single endpoint with the dispatch notifications turned on. Dispatch notifications are handled by classes that implement the following interface:

interface IDispatchNotifier
{
    Task Notify(IEnumerable<TransportOperation> operations);
}

The sample endpoint contains a dispatch notifier that writes the details of dispatch operations to the console:

class SampleDispatchNotifier :
    IDispatchNotifier
{
    public async Task Notify(IEnumerable<TransportOperation> operations)
    {
        foreach (var operation in operations)
        {
            await Console.Out.WriteLineAsync($"Dispatched {operation.Message.MessageId} to {Read(operation.AddressTag)}");
        }
    }

    static string Read(AddressTag addressTag)
    {
        return addressTag switch
        {
            UnicastAddressTag u => $"Unicast: {u.Destination}",
            MulticastAddressTag m => $"Multicast: {m.MessageType}",
            _ => throw new ArgumentException(message: "addressTag is not a recognized address type", paramName: nameof(addressTag))
        };
    }
}

An instance of this notifier is added to the endpoint:

var endpointConfiguration = new EndpointConfiguration("Samples.DispatchNotification");
endpointConfiguration.UseTransport(new LearningTransport());
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.NotifyDispatch(new SampleDispatchNotifier());

This enables the underlying feature and adds the notifier to a list which is tracked in the config settings:

static class ConfigExtensions
{
    public static void NotifyDispatch(this EndpointConfiguration endpointConfiguration, IDispatchNotifier watch)
    {
        var settings = endpointConfiguration.GetSettings();
        settings.EnableFeatureByDefault<DispatchNotificationFeature>();
        settings.GetOrCreate<List<IDispatchNotifier>>().Add(watch);
    }
}

The feature (if enabled) is called during the endpoint startup:

class DispatchNotificationFeature :
    Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        var watches = context.Settings.Get<List<IDispatchNotifier>>();
        var behavior = new DispatchNotificationBehavior(watches);
        context.Pipeline.Register(behavior, "Notifies dispatch notifiers when a message is dispatched");
    }
}

The feature injects the notifiers configured by the user into a new pipeline behavior which sits in the Dispatch Context:

class DispatchNotificationBehavior(List<IDispatchNotifier> watches) :
    Behavior<IDispatchContext>
{
    public override async Task Invoke(IDispatchContext context, Func<Task> next)
    {
        await next();

        await Task.WhenAll(watches.Select(watch => watch.Notify(context.Operations)));
    }
}

The behavior notifies all of the notifiers after the transport operations have been dispatched. For all dispatch operations that failed the notifiers will not be called because the exception would bubble out of the await next() call.

Running the Code

  • Run the solution.
  • Press any key other than Escape to send a message
  • As the message is dispatched to the transport, the registered notifiers are invoked. One writes the details of the dispatch operations to the console.

Related Articles