Dispatch notification pipeline extension

Component: NServiceBus
NuGet Package NServiceBus (8-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

Introduction

This sample shows how to extend the NServiceBus message processing pipeline with custom behavior to add notifications whenever a message is dispatched to the underlying transport.

Code walk-through

The solution contains a single endpoint with the dispatch notifications turned on. Dispatch notifications are handled by classes that implement a simple interface:

interface IWatchDispatches
{
    Task Notify(IEnumerable<TransportOperation> operations);
}

The sample endpoint contains a dispatch watcher that writes the details of dispatch operations to the console:

class SampleDispatchWatcher :
    IWatchDispatches
{
    public Task Notify(IEnumerable<TransportOperation> operations)
    {
        foreach (var operation in operations)
        {
            Console.WriteLine($"Dispatched {operation.Message.MessageId} to {Read(operation.AddressTag)}");
        }
        return Task.CompletedTask;
    }

    string Read(AddressTag addressTag)
    {
        if (addressTag is UnicastAddressTag u)
        {
            return $"Unicast: {u.Destination}";
        }
        if (addressTag is MulticastAddressTag m)
        {
            return $"Multicast: {m.MessageType}";
        }
        throw new ArgumentException(
            message: "addressTag is not a recognized address type",
            paramName: nameof(addressTag));
    }
}

An instance of this watcher is added to the endpoint:

var endpointConfiguration = new EndpointConfiguration("Samples.DispatchNotification");
endpointConfiguration.UseTransport(new LearningTransport());
endpointConfiguration.NotifyDispatch(new SampleDispatchWatcher());

This enables the underlying feature and adds the watcher to a list which is tracked in the config settings:

static class ConfigExtensions
{
    public static void NotifyDispatch(this EndpointConfiguration endpointConfiguration, IWatchDispatches watch)
    {
        var settings = endpointConfiguration.GetSettings();
        settings.EnableFeatureByDefault<DispatchNotificationFeature>();
        settings.GetOrCreate<List<IWatchDispatches>>().Add(watch);
    }
}
Using EnableByDefault means that the feature can still be explicitly disabled in code.

The feature (if enabled) is called during the endpoint startup:

class DispatchNotificationFeature :
    Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        var watches = context.Settings.Get<List<IWatchDispatches>>();
        var behavior = new DispatchNotificationBehavior(watches);
        context.Pipeline.Register(behavior, "Notifies watches after a dispatch operation");
    }
}

The feature injects the watches configured by the user into a new pipeline behavior which sits in the Dispatch Context:

class DispatchNotificationBehavior :
    Behavior<IDispatchContext>
{
    List<IWatchDispatches> watches;

    public DispatchNotificationBehavior(List<IWatchDispatches> watches)
    {
        this.watches = watches;
    }

    public override async Task Invoke(IDispatchContext context, Func<Task> next)
    {
        await next()
            .ConfigureAwait(false);
        var tasks = watches.Select(watch => watch.Notify(context.Operations));
        await Task.WhenAll(tasks)
            .ConfigureAwait(false);
    }
}

The behavior notifies all of the watches after the transport operations have been dispatched.

Running the Code

  • Run the solution.
  • Press any key other than Escape to send a message
  • As the message is dispatched to the transport, the registered watches are invoked. One writes the details of the dispatch operations to the console.

Related Articles


Last modified