Getting Started

Dispatch notification pipeline extension

Component: NServiceBus
NuGet Package: NServiceBus (8.x)


This sample shows how to extend the NServiceBus message processing pipeline with custom behavior to add notifications whenever a message is dispatched to the underlying transport.

Code walk-through

The solution contains a single endpoint with the dispatch notifications turned on. Dispatch notifications are handled by classes that implement a simple interface:

interface IWatchDispatches
    Task Notify(IEnumerable<TransportOperation> operations);

The sample endpoint contains a dispatch watcher that writes the details of dispatch operations to the console:

class SampleDispatchWatcher :
    public Task Notify(IEnumerable<TransportOperation> operations)
        foreach (var operation in operations)
            Console.WriteLine($"Dispatched {operation.Message.MessageId} to {Read(operation.AddressTag)}");
        return Task.CompletedTask;

    string Read(AddressTag addressTag)
        if (addressTag is UnicastAddressTag u)
            return $"Unicast: {u.Destination}";
        if (addressTag is MulticastAddressTag m)
            return $"Multicast: {m.MessageType}";
        throw new ArgumentException(
            message: "addressTag is not a recognized address type",
            paramName: nameof(addressTag));

An instance of this watcher is added to the endpoint:

var endpointConfiguration = new EndpointConfiguration("Samples.DispatchNotification");
endpointConfiguration.UseTransport(new LearningTransport());
endpointConfiguration.NotifyDispatch(new SampleDispatchWatcher());

This enables the underlying feature and adds the watcher to a list which is tracked in the config settings:

static class ConfigExtensions
    public static void NotifyDispatch(this EndpointConfiguration endpointConfiguration, IWatchDispatches watch)
        var settings = endpointConfiguration.GetSettings();
Using EnableByDefault means that the feature can still be explicitly disabled in code.

The feature (if enabled) is called during the endpoint startup:

class DispatchNotificationFeature :
    protected override void Setup(FeatureConfigurationContext context)
        var watches = context.Settings.Get<List<IWatchDispatches>>();
        var behavior = new DispatchNotificationBehavior(watches);
        context.Pipeline.Register(behavior, "Notifies watches after a dispatch operation");

The feature injects the watches configured by the user into a new pipeline behavior which sits in the Dispatch Context:

class DispatchNotificationBehavior :
    List<IWatchDispatches> watches;

    public DispatchNotificationBehavior(List<IWatchDispatches> watches)
    { = watches;

    public override async Task Invoke(IDispatchContext context, Func<Task> next)
        await next();
        var tasks = watches.Select(watch => watch.Notify(context.Operations));
        await Task.WhenAll(tasks);

The behavior notifies all of the watches after the transport operations have been dispatched.

Running the Code

  • Run the solution.
  • Press any key other than Escape to send a message
  • As the message is dispatched to the transport, the registered watches are invoked. One writes the details of the dispatch operations to the console.

Related Articles