Unit of work management using the pipeline

Component: NServiceBus
NuGet NServiceBus (7-pre)
This page targets a pre-release version and is subject to change prior to the final release.

Introduction

This sample leverages the pipeline provide unit of work management for message handlers. Using the pipeline instead of the IManageUnitsOfWork abstraction is necessary when access to the incoming message and/or headers is required.

This sample simulates a multi-tenant solution where the session provide to handlers is connects to individual tenant databases based on the value of a tenant header on the incoming message.

Code Walk Through

Creating the Behavior

The behavior will wrap handler invocations and:

  1. Open a session to the relevant tenant database.
  2. Make the session available to the message handlers.
  3. Commit/Rollback the session depending on the outcome of the handler.
  4. Dispose the session.
class MyUowBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    MySessionProvider sessionProvider;

    public MyUowBehavior(MySessionProvider sessionProvider)
    {
        this.sessionProvider = sessionProvider;
    }

    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var tenant = context.MessageHeaders["tenant"];

        using (var session = await sessionProvider.Open(tenant).ConfigureAwait(false))
        {
            context.Extensions.Set<IMySession>(session);

            try
            {
                await next().ConfigureAwait(false);

                await session.Commit().ConfigureAwait(false);
            }
            catch (Exception)
            {
                await session.Rollback().ConfigureAwait(false);

                throw;
            }
        }
    }
}

Note the injected session factory is responsible for creating the session and that the session is registered the pipeline context using context.Extensions.Set<IMySession>(session);. This will be used later to provide the session to the handlers.

Registering the behavior

The following code is needed to register the behavior in the receive pipeline.

var sessionProvider = new MySessionProvider();

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new MyUowBehavior(sessionProvider), "Manages the session");

Providing the session to handlers

While it is possible to use the code contex.Extensions.Get<IMySession> in the handler, it is better to provide extension methods on IMessageHandlerContext to allow for a more terse syntax. In this sample the methods .Store<T> a .GetSession are provided:

static class MySessionContextExtensions
{
    public static IMySession GetSession(this IMessageHandlerContext context)
    {
        return context.Extensions.Get<IMySession>();
    }

    public static Task Store<T>(this IMessageHandlerContext context, T entity)
    {
        return context.GetSession().Store(entity);
    }
}

Message handlers

One of the benefits of a unit of work is that multiple handlers for the same message will share the same session and commit/rollback together. This is how the handlers look:

class MyMessageHandler1 :
    IHandleMessages<MyMessage>
{
    static ILog log = LogManager.GetLogger<MyMessageHandler1>();

    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await context.Store(new MyEntity())
            .ConfigureAwait(false);

        log.Info($"{nameof(MyMessageHandler1)}({context.MessageId}) got UOW instance {context.GetSession()}");
    }
}

Running the sample

Run the sample. Once running press any key to send messages. Note that for each given message the two message handlers will get the same session instance and that the instance is connected to the given tenant specified on the incoming message.

Related Articles


Last modified