Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Modernization
Samples

Session filter pipeline extension

Component: NServiceBus
NuGet Package: NServiceBus (10-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add session filters to an endpoint. An endpoint will accept messages only from a sending endpoint if they share a session key.

Code walk-through

The solution contains two endpoints, Sender and Receiver, which exchange instances of SomeMessage. Each endpoint contains an instance of a session key provider:

public interface ISessionKeyProvider
{
    void NextKey();
    string SessionKey { get; }
}

In the sample, there is a simple implementation that provides a limited set of session keys:

public class RotatingSessionKeyProvider : ISessionKeyProvider
{
    readonly string[] sessionKeys = {
        "Particular",
        "Messaging",
        "NServiceBus",
        "Sagas"
    };

    int currentKeyIndex;

    public void NextKey()
    {
        currentKeyIndex = (currentKeyIndex + 1) % sessionKeys.Length;
    }

    public string SessionKey => sessionKeys[currentKeyIndex];
}

Each endpoint registers the session key provider:

builder.Services.AddSingleton<ISessionKeyProvider, RotatingSessionKeyProvider>();

This is used by the pipeline behaviors that are added by the ApplySessionFilter extension method:

public static void ApplySessionFilter(this EndpointConfiguration endpointConfiguration)
{
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(typeof(ApplySessionFilterHeader), "Adds session key to outgoing messages");
    pipeline.Register(typeof(FilterIncomingMessages), "Filters out messages that don't match the current session key");
}

The first behavior adds the session key as a header to all outgoing messages:

public class ApplySessionFilterHeader : Behavior<IRoutingContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;

    public ApplySessionFilterHeader(ISessionKeyProvider sessionKeyProvider)
    {
        this.sessionKeyProvider = sessionKeyProvider;
    }

    public override Task Invoke(IRoutingContext context, Func<Task> next)
    {
        context.Message.Headers["NServiceBus.SessionKey"] = sessionKeyProvider.SessionKey;
        return next();
    }
}

The second behavior checks incoming messages for the session key header and only processes messages that have a matching session key:

public class FilterIncomingMessages: Behavior<ITransportReceiveContext>
{
    readonly ISessionKeyProvider sessionKeyProvider;
    private readonly ILogger<FilterIncomingMessages> logger;

    public FilterIncomingMessages(ISessionKeyProvider sessionKeyProvider, ILogger<FilterIncomingMessages> logger)
    {
        this.sessionKeyProvider = sessionKeyProvider;
        this.logger = logger;
    }

    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        if (IsFromCurrentSession(context.Message))
        {
            await next();
        }
        else
        {
            logger.LogInformation("Dropping message {MessageId} as it does not match the current session", context.Message.MessageId);
        }
    }

    bool IsFromCurrentSession(IncomingMessage message)
        => message.Headers.TryGetValue("NServiceBus.SessionKey", out string sessionKey)
           && sessionKey == sessionKeyProvider.SessionKey;

}

Running the Code

  • Run the solution.
  • Verify that each endpoint is using the same session key
  • Send some messages from the sender to the receiver
  • Verify that the messages are sent and received correctly
  • Change the session key for the receiver
  • Send more messages from the sender to the receiver
  • Note that the messages are dropped and not processed
  • Change the session key for the sender to match the receiver
  • Send a final batch of messages
  • Verify that the new batch of messages are received

Related Articles