Introduction
This sample shows how to extend the NServiceBus message processing pipeline with custom behavior to add notifications whenever a message is dispatched to the underlying transport.
Code walk-through
The solution contains a single endpoint with the dispatch notifications turned on. Dispatch notifications are handled by classes that implement a simple interface:
interface IWatchDispatches
{
Task Notify(IEnumerable<TransportOperation> operations);
}
The sample endpoint contains a dispatch watcher that writes the details of dispatch operations to the console:
class SampleDispatchWatcher :
IWatchDispatches
{
public Task Notify(IEnumerable<TransportOperation> operations)
{
foreach (var operation in operations)
{
Console.WriteLine($"Dispatched {operation.Message.MessageId} to {Read(operation.AddressTag)}");
}
return Task.CompletedTask;
}
string Read(AddressTag addressTag)
{
if (addressTag is UnicastAddressTag u)
{
return $"Unicast: {u.Destination}";
}
if (addressTag is MulticastAddressTag m)
{
return $"Multicast: {m.MessageType}";
}
throw new ArgumentException(
message: "addressTag is not a recognized address type",
paramName: nameof(addressTag));
}
}
An instance of this watcher is added to the endpoint:
var endpointConfiguration = new EndpointConfiguration("Samples.DispatchNotification");
endpointConfiguration.UseTransport(new LearningTransport());
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.NotifyDispatch(new SampleDispatchWatcher());
This enables the underlying feature and adds the watcher to a list which is tracked in the config settings:
static class ConfigExtensions
{
public static void NotifyDispatch(this EndpointConfiguration endpointConfiguration, IWatchDispatches watch)
{
var settings = endpointConfiguration.GetSettings();
settings.EnableFeatureByDefault<DispatchNotificationFeature>();
settings.GetOrCreate<List<IWatchDispatches>>().Add(watch);
}
}
EnableByDefault
means that the feature can still be explicitly disabled in code.The feature (if enabled) is called during the endpoint startup:
class DispatchNotificationFeature :
Feature
{
protected override void Setup(FeatureConfigurationContext context)
{
var watches = context.Settings.Get<List<IWatchDispatches>>();
var behavior = new DispatchNotificationBehavior(watches);
context.Pipeline.Register(behavior, "Notifies watches after a dispatch operation");
}
}
The feature injects the watches configured by the user into a new pipeline behavior which sits in the Dispatch Context:
class DispatchNotificationBehavior :
Behavior<IDispatchContext>
{
List<IWatchDispatches> watches;
public DispatchNotificationBehavior(List<IWatchDispatches> watches)
{
this.watches = watches;
}
public override async Task Invoke(IDispatchContext context, Func<Task> next)
{
await next()
.ConfigureAwait(false);
var tasks = watches.Select(watch => watch.Notify(context.Operations));
await Task.WhenAll(tasks)
.ConfigureAwait(false);
}
}
The behavior notifies all of the watches after the transport operations have been dispatched.
Running the Code
- Run the solution.
- Press any key other than Escape to send a message
- As the message is dispatched to the transport, the registered watches are invoked. One writes the details of the dispatch operations to the console.