Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Unit of work using custom pipeline behavior

Component: NServiceBus
NuGet Package: NServiceBus (7.x)

This sample uses a custom pipeline behavior to manage a unit of work. The unit of work in this sample is a custom session object. In a real-world application, this is often a database session represented by a low-level storage connection/transaction or session, or an ORM session.

Examples include:

  • a SQL Server or PostgreSQL connection and/or transaction object
  • an ORM session, like Entity Framework DbContext or an NHibernate session
  • a storage session provided by a storage SDK client, like MongoDB, RavenDB, CosmosDB, etc.

Any information from the incoming message headers and body can be used to create or initialize the custom unit of work which is common in multi-tenant or partitioned environments

  • Use header/body information to utilize a specific connection string
  • Use header/body information to set a query filter on an ORM to only return data for a specific tenant

Code walk-through

Creating the behavior

The behavior will wrap handler invocations and:

  1. Open a session to the database.
  2. Make the session available to the message handlers.
  3. Commit/rollback the session depending on the outcome of the handler.
  4. Dispose the session.
class MyUowBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    MySessionProvider sessionProvider;

    public MyUowBehavior(MySessionProvider sessionProvider)
    {
        this.sessionProvider = sessionProvider;
    }

    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        using (var session = await sessionProvider.Open())
        {
            context.Extensions.Set<IMySession>(session);

            try
            {
                await next();

                await session.Commit();
            }
            catch (Exception)
            {
                await session.Rollback();

                throw;
            }
        }
    }
}

Note that the injected session factory is responsible for creating the session and that the session is registered in the pipeline context using context.Extensions.Set<IMySession>(session);. This will be used later to provide the session to the handlers.

Registering the behavior

The following code is needed to register the behavior in the receive pipeline.

var sessionProvider = new MySessionProvider();

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new MyUowBehavior(sessionProvider), "Manages the session");

Providing the session to handlers

While it is possible to use the code context.Extensions.Get<IMySession> in the handler, it is better to provide extension methods on IMessageHandlerContext to allow for a more terse syntax. In this sample the methods .Store<T> a .GetSession are provided:

static class MySessionContextExtensions
{
    public static IMySession GetSession(this IMessageHandlerContext context)
    {
        return context.Extensions.Get<IMySession>();
    }

    public static Task Store<T>(this IMessageHandlerContext context, T entity)
    {
        return context.GetSession().Store(entity);
    }
}

Message handlers

One of the benefits of a unit of work is that multiple handlers for the same message will share the same session and commit/rollback together. This is how the handlers look:

class MyMessageHandler1 :
    IHandleMessages<MyMessage>
{
    static ILog log = LogManager.GetLogger<MyMessageHandler1>();

    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await context.Store(new MyEntity());

        log.Info($"{context.MessageId} got UoW instance {context.GetSession().GetHashCode()}");
    }
}

Running the sample

Run the sample. Once running, press any key to send messages. Note that for each given message the two message handlers will get the same session instance.

Related Articles