Session filter pipeline extension

Component: NServiceBus
NuGet Package NServiceBus (7-pre)
This page targets a pre-release version and is subject to change prior to the final release.

Introduction

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add session filters to an endpoint. An endpoint will only accept messages from a sending endpoint if they share a session key.

This technique can be useful in testing scenarios where left over messages from previous test runs should be ignored.

Code Walk Through

The solution contains two endpoints Sender and Receiver which exchange SomeMessage instances. Each endpoint contains an instance of a session key provider:

public interface ISessionKeyProvider
{
    string SessionKey { get; }
}

In the case of the sample, there is a simple implementation that provides a limited set of session keys:

public class RotatingSessionKeyProvider : ISessionKeyProvider
{
    readonly string[] sessionKeys = {
        "Particular",
        "Messaging",
        "NServiceBus",
        "Sagas"
    };

    int currentKeyIndex;

    public void NextKey()
    {
        currentKeyIndex = (currentKeyIndex + 1) % sessionKeys.Length;
    }

    public string SessionKey => sessionKeys[currentKeyIndex];
}

Each endpoint creates an instance of this session key provider and adds it to the endpoint configuration:

var sessionKeyProvider = new RotatingSessionKeyProvider();

endpointConfiguration.ApplySessionFilter(sessionKeyProvider);

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);

The ApplySessionFilter extension method adds two behaviors to the endpoint pipeline:

public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration, ISessionKeyProvider sessionKeyProvider)
{
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(new ApplySessionFilterHeader(sessionKeyProvider), "Adds session key to outgoing messages");
    pipeline.Register(new FilterIncomingMessages(sessionKeyProvider), "Filters out messages that don't match the current session key");
}

The first behavior adds the session key as a header to all outgoing messages:

public class ApplySessionFilterHeader : Behavior<IRoutingContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override Task Invoke(IRoutingContext context, Func<Task> next)
    {
        context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
        return next();
    }
}

The second behavior checks incoming messages for the session key header and only processes messages with a matching session key:

public class FilterIncomingMessages : Behavior<ITransportReceiveContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        if (IsFromCurrentSession(context.Message))
        {
            await next().ConfigureAwait(false);
        }
        else
        {
            Log.Debug($"Dropping message {context.Message.MessageId} as it does not match the current session");
        }
    }

    bool IsFromCurrentSession(IncomingMessage message)
        => message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
           && sessionKey == sessionKeyProvider.SessionKey;

    static ILog Log = LogManager.GetLogger<FilterIncomingMessages>();
}

Running the Code

  • Run the solution.
  • Verify that each endpoint is using the same session key
  • Send some messages from the sender to the receiver
  • Verify that the messages are being sent and received correctly
  • Change the session key for the receiver
  • Send some more messages from the sender to the receiver
  • Note that the messages are being dropped and not processed
  • Change the session key for the sender to match the receiver
  • Send a final batch of messages
  • Verify that the new batch of messages are being received

Related Articles


Last modified