Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Unit of work using the pipeline

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

Introduction

This sample leverages the pipeline provided unit of work management for message handlers. Using the pipeline abstraction is necessary when access to the incoming message and/or headers is required.

Code walk-through

Creating the Behavior

The behavior will wrap handler invocations and:

  1. Open a session to the database.
  2. Make the session available to the message handlers.
  3. Commit/Rollback the session depending on the outcome of the handler.
  4. Dispose the session.
class MyUowBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    MySessionProvider sessionProvider;

    public MyUowBehavior(MySessionProvider sessionProvider)
    {
        this.sessionProvider = sessionProvider;
    }

    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        using (var session = await sessionProvider.Open())
        {
            context.Extensions.Set<IMySession>(session);

            try
            {
                await next();

                await session.Commit();
            }
            catch (Exception)
            {
                await session.Rollback();

                throw;
            }
        }
    }
}

Note that the injected session factory is responsible for creating the session and that the session is registered in the pipeline context using context.Extensions.Set<IMySession>(session);. This will be used later to provide the session to the handlers.

Registering the behavior

The following code is needed to register the behavior in the receive pipeline.

var sessionProvider = new MySessionProvider();

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new MyUowBehavior(sessionProvider), "Manages the session");

Providing the session to handlers

While it is possible to use the code contex.Extensions.Get<IMySession> in the handler, it is better to provide extension methods on IMessageHandlerContext to allow for a more terse syntax. In this sample the methods .Store<T> a .GetSession are provided:

static class MySessionContextExtensions
{
    public static IMySession GetSession(this IMessageHandlerContext context)
    {
        return context.Extensions.Get<IMySession>();
    }

    public static Task Store<T>(this IMessageHandlerContext context, T entity)
    {
        return context.GetSession().Store(entity);
    }
}

Message handlers

One of the benefits of a unit of work is that multiple handlers for the same message will share the same session and commit/rollback together. This is how the handlers look:

class MyMessageHandler1 :
    IHandleMessages<MyMessage>
{
    static ILog log = LogManager.GetLogger<MyMessageHandler1>();

    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await context.Store(new MyEntity());

        log.Info($"{context.MessageId} got UoW instance {context.GetSession().GetHashCode()}");
    }
}

Running the sample

Run the sample. Once running, press any key to send messages. Note that for each given message the two message handlers will get the same session instance.

Related Articles