This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add session filters to an endpoint. An endpoint will accept messages only from a sending endpoint if they share a session key.
This technique can be useful in testing scenarios where left over messages from previous test runs should be ignored.
Code walk-through
The solution contains two endpoints, Sender and Receiver, which exchange instances of SomeMessage
. Each endpoint contains an instance of a session key provider:
public interface ISessionKeyProvider
{
string SessionKey { get; }
}
In the sample, there is a simple implementation that provides a limited set of session keys:
public class RotatingSessionKeyProvider : ISessionKeyProvider
{
readonly string[] sessionKeys = {
"Particular",
"Messaging",
"NServiceBus",
"Sagas"
};
int currentKeyIndex;
public void NextKey()
{
currentKeyIndex = (currentKeyIndex + 1) % sessionKeys.Length;
}
public string SessionKey => sessionKeys[currentKeyIndex];
}
Each endpoint creates an instance of this session key provider and adds it to the endpoint configuration:
var sessionKeyProvider = new RotatingSessionKeyProvider();
endpointConfiguration.ApplySessionFilter(sessionKeyProvider);
var endpointInstance = await Endpoint.Start(endpointConfiguration);
The ApplySessionFilter
extension method adds two behaviors to the endpoint pipeline:
public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration, ISessionKeyProvider sessionKeyProvider)
{
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new ApplySessionFilterHeader(sessionKeyProvider), "Adds session key to outgoing messages");
pipeline.Register(new FilterIncomingMessages(sessionKeyProvider), "Filters out messages that don't match the current session key");
}
The first behavior adds the session key as a header to all outgoing messages:
public class ApplySessionFilterHeader : Behavior<IRoutingContext>
{
readonly ISessionKeyProvider sessionKeyProvider;
public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
{
this.sessionKeyProvider = sessionKeyProvider;
}
public override Task Invoke(IRoutingContext context, Func<Task> next)
{
context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
return next();
}
}
The second behavior checks incoming messages for the session key header and only processes messages that have a matching session key:
public class FilterIncomingMessages : Behavior<ITransportReceiveContext>
{
readonly ISessionKeyProvider sessionKeyProvider;
public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider)
{
this.sessionKeyProvider = sessionKeyProvider;
}
public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
if (IsFromCurrentSession(context.Message))
{
await next();
}
else
{
Log.Debug($"Dropping message {context.Message.MessageId} as it does not match the current session");
}
}
bool IsFromCurrentSession(IncomingMessage message)
=> message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
&& sessionKey == sessionKeyProvider.SessionKey;
static ILog Log = LogManager.GetLogger<FilterIncomingMessages>();
}
Running the Code
- Run the solution.
- Verify that each endpoint is using the same session key
- Send some messages from the sender to the receiver
- Verify that the messages are sent and received correctly
- Change the session key for the receiver
- Send more messages from the sender to the receiver
- Note that the messages are dropped and not processed
- Change the session key for the sender to match the receiver
- Send a final batch of messages
- Verify that the new batch of messages are received