Introduction
This sample shows how to extend the NServiceBus message processing pipeline with custom behavior to add notifications whenever a message is dispatched to the underlying transport.
Code walk-through
The solution contains a single endpoint with the dispatch notifications turned on. Dispatch notifications are handled by classes that implement the following interface:
interface IDispatchNotifier
{
Task Notify(IEnumerable<TransportOperation> operations);
}
The sample endpoint contains a dispatch notifier that writes the details of dispatch operations to the console:
class SampleDispatchNotifier :
IDispatchNotifier
{
public async Task Notify(IEnumerable<TransportOperation> operations)
{
foreach (var operation in operations)
{
await Console.Out.WriteLineAsync($"Dispatched {operation.Message.MessageId} to {Read(operation.AddressTag)}");
}
}
string Read(AddressTag addressTag)
{
return addressTag switch
{
UnicastAddressTag u => $"Unicast: {u.Destination}",
MulticastAddressTag m => $"Multicast: {m.MessageType}",
_ => throw new ArgumentException(message: "addressTag is not a recognized address type", paramName: nameof(addressTag))
};
}
}
An instance of this notifier is added to the endpoint:
var endpointConfiguration = new EndpointConfiguration("Samples.DispatchNotification");
endpointConfiguration.UseTransport<LearningTransport>();
endpointConfiguration.NotifyDispatch(new SampleDispatchNotifier());
This enables the underlying feature and adds the notifier to a list which is tracked in the config settings:
static class ConfigExtensions
{
public static void NotifyDispatch(this EndpointConfiguration endpointConfiguration, IDispatchNotifier watch)
{
var settings = endpointConfiguration.GetSettings();
settings.EnableFeatureByDefault<DispatchNotificationFeature>();
settings.GetOrCreate<List<IDispatchNotifier>>().Add(watch);
}
}
Using EnableByDefault
means that the feature can still be explicitly disabled in code.
The feature (if enabled) is called during the endpoint startup:
class DispatchNotificationFeature :
Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
var watches = context.Settings.Get<List<IDispatchNotifier>>();
var behavior = new DispatchNotificationBehavior(watches);
context.Pipeline.Register(behavior, "Notifies dispatch notifiers when a message is dispatched");
}
}
The feature injects the notifiers configured by the user into a new pipeline behavior which sits in the Dispatch Context:
class DispatchNotificationBehavior :
Behavior<IDispatchContext>
{
List<IDispatchNotifier> watches;
public DispatchNotificationBehavior(List<IDispatchNotifier> watches)
{
this.watches = watches;
}
public override async Task Invoke(IDispatchContext context, Func<Task> next)
{
await next();
await Task.WhenAll(watches.Select(watch => watch.Notify(context.Operations)));
}
}
The behavior notifies all of the notifiers after the transport operations have been dispatched. For all dispatch operations that failed the notifiers will not be called because the exception would bubble out of the await next()
call.
Running the Code
- Run the solution.
- Press any key other than Escape to send a message
- As the message is dispatched to the transport, the registered notifiers are invoked. One writes the details of the dispatch operations to the console.