Session filter pipeline extension

Component: NServiceBus
NuGet Package: NServiceBus (7.x)

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add session filters to an endpoint. An endpoint will accept messages only from a sending endpoint if they share a session key.

This technique can be useful in testing scenarios where left over messages from previous test runs should be ignored.

Code walk-through

The solution contains two endpoints, Sender and Receiver, which exchange instances of SomeMessage. Each endpoint contains an instance of a session key provider:

public interface ISessionKeyProvider
{
    string SessionKey { get; }
}

In the sample, there is a simple implementation that provides a limited set of session keys:

public class RotatingSessionKeyProvider : ISessionKeyProvider
{
    readonly string[] sessionKeys = {
        "Particular",
        "Messaging",
        "NServiceBus",
        "Sagas"
    };

    int currentKeyIndex;

    public void NextKey()
    {
        currentKeyIndex = (currentKeyIndex + 1) % sessionKeys.Length;
    }

    public string SessionKey => sessionKeys[currentKeyIndex];
}

Each endpoint creates an instance of this session key provider and adds it to the endpoint configuration:

var sessionKeyProvider = new RotatingSessionKeyProvider();

endpointConfiguration.ApplySessionFilter(sessionKeyProvider);

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);

The ApplySessionFilter extension method adds two behaviors to the endpoint pipeline:

public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration, ISessionKeyProvider sessionKeyProvider)
{
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(new ApplySessionFilterHeader(sessionKeyProvider), "Adds session key to outgoing messages");
    pipeline.Register(new FilterIncomingMessages(sessionKeyProvider), "Filters out messages that don't match the current session key");
}

The first behavior adds the session key as a header to all outgoing messages:

public class ApplySessionFilterHeader : Behavior<IRoutingContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override Task Invoke(IRoutingContext context, Func<Task> next)
    {
        context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
        return next();
    }
}

The second behavior checks incoming messages for the session key header and only processes messages that have a matching session key:

public class FilterIncomingMessages : Behavior<ITransportReceiveContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        if (IsFromCurrentSession(context.Message))
        {
            await next().ConfigureAwait(false);
        }
        else
        {
            Log.Debug($"Dropping message {context.Message.MessageId} as it does not match the current session");
        }
    }

    bool IsFromCurrentSession(IncomingMessage message)
        => message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
           && sessionKey == sessionKeyProvider.SessionKey;

    static ILog Log = LogManager.GetLogger<FilterIncomingMessages>();
}

Running the Code

  • Run the solution.
  • Verify that each endpoint is using the same session key
  • Send some messages from the sender to the receiver
  • Verify that the messages are sent and received correctly
  • Change the session key for the receiver
  • Send more messages from the sender to the receiver
  • Note that the messages are dropped and not processed
  • Change the session key for the sender to match the receiver
  • Send a final batch of messages
  • Verify that the new batch of messages are received

Related Articles


Last modified