Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Dispatch notification pipeline extension

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

Introduction

This sample shows how to extend the NServiceBus message processing pipeline with custom behavior to add notifications whenever a message is dispatched to the underlying transport.

Code walk-through

The solution contains a single endpoint with the dispatch notifications turned on. Dispatch notifications are handled by classes that implement a simple interface:

interface IWatchDispatches
{
    Task Notify(IEnumerable<TransportOperation> operations);
}

The sample endpoint contains a dispatch watcher that writes the details of dispatch operations to the console:

class SampleDispatchWatcher :
    IWatchDispatches
{
    public Task Notify(IEnumerable<TransportOperation> operations)
    {
        foreach (var operation in operations)
        {
            Console.WriteLine($"Dispatched {operation.Message.MessageId} to {Read(operation.AddressTag)}");
        }
        return Task.CompletedTask;
    }

    string Read(AddressTag addressTag)
    {
        if (addressTag is UnicastAddressTag u)
        {
            return $"Unicast: {u.Destination}";
        }
        if (addressTag is MulticastAddressTag m)
        {
            return $"Multicast: {m.MessageType}";
        }
        throw new ArgumentException(
            message: "addressTag is not a recognized address type",
            paramName: nameof(addressTag));
    }
}

An instance of this watcher is added to the endpoint:

var endpointConfiguration = new EndpointConfiguration("Samples.DispatchNotification");
endpointConfiguration.UseTransport(new LearningTransport());
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.NotifyDispatch(new SampleDispatchWatcher());

This enables the underlying feature and adds the watcher to a list which is tracked in the config settings:

static class ConfigExtensions
{
    public static void NotifyDispatch(this EndpointConfiguration endpointConfiguration, IWatchDispatches watch)
    {
        var settings = endpointConfiguration.GetSettings();
        settings.EnableFeatureByDefault<DispatchNotificationFeature>();
        settings.GetOrCreate<List<IWatchDispatches>>().Add(watch);
    }
}
Using EnableByDefault means that the feature can still be explicitly disabled in code.

The feature (if enabled) is called during the endpoint startup:

class DispatchNotificationFeature :
    Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        var watches = context.Settings.Get<List<IWatchDispatches>>();
        var behavior = new DispatchNotificationBehavior(watches);
        context.Pipeline.Register(behavior, "Notifies watches after a dispatch operation");
    }
}

The feature injects the watches configured by the user into a new pipeline behavior which sits in the Dispatch Context:

class DispatchNotificationBehavior :
    Behavior<IDispatchContext>
{
    List<IWatchDispatches> watches;

    public DispatchNotificationBehavior(List<IWatchDispatches> watches)
    {
        this.watches = watches;
    }

    public override async Task Invoke(IDispatchContext context, Func<Task> next)
    {
        await next();
        var tasks = watches.Select(watch => watch.Notify(context.Operations));
        await Task.WhenAll(tasks);
    }
}

The behavior notifies all of the watches after the transport operations have been dispatched.

Running the Code

  • Run the solution.
  • Press any key other than Escape to send a message
  • As the message is dispatched to the transport, the registered watches are invoked. One writes the details of the dispatch operations to the console.

Related Articles