Session filter pipeline extension

Component: NServiceBus
NuGet Package NServiceBus (7.x)

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add session filters to an endpoint. An endpoint will accept messages only from a sending endpoint if they share a session key.

This technique can be useful in testing scenarios where left over messages from previous test runs should be ignored.

Code walk-through

The solution contains two endpoints, Sender and Receiver, which exchange instances of SomeMessage. Each endpoint contains an instance of a session key provider:

public interface ISessionKeyProvider
{
    string SessionKey { get; }
}

In the sample, there is a simple implementation that provides a limited set of session keys:

public class RotatingSessionKeyProvider : ISessionKeyProvider
{
    readonly string[] sessionKeys = {
        "Particular",
        "Messaging",
        "NServiceBus",
        "Sagas"
    };

    int currentKeyIndex;

    public void NextKey()
    {
        currentKeyIndex = (currentKeyIndex + 1) % sessionKeys.Length;
    }

    public string SessionKey => sessionKeys[currentKeyIndex];
}

Each endpoint creates an instance of this session key provider and adds it to the endpoint configuration:

var sessionKeyProvider = new RotatingSessionKeyProvider();

endpointConfiguration.ApplySessionFilter(sessionKeyProvider);

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);

The ApplySessionFilter extension method adds two behaviors to the endpoint pipeline:

public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration, ISessionKeyProvider sessionKeyProvider)
{
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(new ApplySessionFilterHeader(sessionKeyProvider), "Adds session key to outgoing messages");
    pipeline.Register(new FilterIncomingMessages(sessionKeyProvider), "Filters out messages that don't match the current session key");
}

The first behavior adds the session key as a header to all outgoing messages:

public class ApplySessionFilterHeader : Behavior<IRoutingContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override Task Invoke(IRoutingContext context, Func<Task> next)
    {
        context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
        return next();
    }
}

The second behavior checks incoming messages for the session key header and only processes messages that have a matching session key:

public class FilterIncomingMessages : Behavior<ITransportReceiveContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        if (IsFromCurrentSession(context.Message))
        {
            await next().ConfigureAwait(false);
        }
        else
        {
            Log.Debug($"Dropping message {context.Message.MessageId} as it does not match the current session");
        }
    }

    bool IsFromCurrentSession(IncomingMessage message)
        => message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
           && sessionKey == sessionKeyProvider.SessionKey;

    static ILog Log = LogManager.GetLogger<FilterIncomingMessages>();
}

Running the Code

  • Run the solution.
  • Verify that each endpoint is using the same session key
  • Send some messages from the sender to the receiver
  • Verify that the messages are sent and received correctly
  • Change the session key for the receiver
  • Send more messages from the sender to the receiver
  • Note that the messages are dropped and not processed
  • Change the session key for the sender to match the receiver
  • Send a final batch of messages
  • Verify that the new batch of messages are received

Related Articles


Last modified