Getting Started

Session filter pipeline extension

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add session filters to an endpoint. An endpoint will accept messages only from a sending endpoint if they share a session key.

Code walk-through

The solution contains two endpoints, Sender and Receiver, which exchange instances of SomeMessage. Each endpoint contains an instance of a session key provider:

public interface ISessionKeyProvider
    string SessionKey { get; }

In the sample, there is a simple implementation that provides a limited set of session keys:

public class RotatingSessionKeyProvider : ISessionKeyProvider
    readonly string[] sessionKeys = {

    int currentKeyIndex;

    public void NextKey()
        currentKeyIndex = (currentKeyIndex + 1) % sessionKeys.Length;

    public string SessionKey => sessionKeys[currentKeyIndex];

Each endpoint creates an instance of this session key provider and adds it to the endpoint configuration:

var sessionKeyProvider = new RotatingSessionKeyProvider();
var logger = new LoggerFactory().CreateLogger<FilterIncomingMessages>();

endpointConfiguration.ApplySessionFilter(sessionKeyProvider, logger);

The ApplySessionFilter extension method adds two behaviors to the endpoint pipeline:

public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration, ISessionKeyProvider sessionKeyProvider, ILogger<FilterIncomingMessages> logger)
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(new ApplySessionFilterHeader(sessionKeyProvider), "Adds session key to outgoing messages");
    pipeline.Register(new FilterIncomingMessages(sessionKeyProvider, logger), "Filters out messages that don't match the current session key");

The first behavior adds the session key as a header to all outgoing messages:

public class ApplySessionFilterHeader : Behavior<IRoutingContext>
    readonly ISessionKeyProvider sessionKeyProvider;

    public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
        this.sessionKeyProvider = sessionKeyProvider;

    public override Task Invoke(IRoutingContext context, Func<Task> next)
        context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
        return next();

The second behavior checks incoming messages for the session key header and only processes messages that have a matching session key:

public class FilterIncomingMessages: Behavior<ITransportReceiveContext>
    readonly ISessionKeyProvider sessionKeyProvider;
    private readonly ILogger<FilterIncomingMessages> logger;

    public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider, ILogger<FilterIncomingMessages> logger)
        this.sessionKeyProvider = sessionKeyProvider;
        this.logger = logger;

    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
        if (IsFromCurrentSession(context.Message))
            await next();
            logger.LogDebug($"Dropping message {context.Message.MessageId} as it does not match the current session");

    bool IsFromCurrentSession(IncomingMessage message)
        => message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
           && sessionKey == sessionKeyProvider.SessionKey;


Running the Code

  • Run the solution.
  • Verify that each endpoint is using the same session key
  • Send some messages from the sender to the receiver
  • Verify that the messages are sent and received correctly
  • Change the session key for the receiver
  • Send more messages from the sender to the receiver
  • Note that the messages are dropped and not processed
  • Change the session key for the sender to match the receiver
  • Send a final batch of messages
  • Verify that the new batch of messages are received

Related Articles