Introduction
This sample leverages the pipeline provided unit of work management for message handlers. Using the pipeline abstraction is necessary when access to the incoming message and/or headers is required.
Code walk-through
Creating the Behavior
The behavior will wrap handler invocations and:
- Open a session to the database.
- Make the session available to the message handlers.
- Commit/Rollback the session depending on the outcome of the handler.
- Dispose the session.
class MyUowBehavior :
Behavior<IIncomingPhysicalMessageContext>
{
MySessionProvider sessionProvider;
public MyUowBehavior(MySessionProvider sessionProvider)
{
this.sessionProvider = sessionProvider;
}
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
using (var session = await sessionProvider.Open().ConfigureAwait(false))
{
context.Extensions.Set<IMySession>(session);
try
{
await next().ConfigureAwait(false);
await session.Commit().ConfigureAwait(false);
}
catch (Exception)
{
await session.Rollback().ConfigureAwait(false);
throw;
}
}
}
}
Note that the injected session factory is responsible for creating the session and that the session is registered in the pipeline context using context.
. This will be used later to provide the session to the handlers.
Registering the behavior
The following code is needed to register the behavior in the receive pipeline.
var sessionProvider = new MySessionProvider();
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new MyUowBehavior(sessionProvider), "Manages the session");
Providing the session to handlers
While it is possible to use the code contex.
in the handler, it is better to provide extension methods on IMessageHandlerContext
to allow for a more terse syntax. In this sample the methods .
a .
are provided:
static class MySessionContextExtensions
{
public static IMySession GetSession(this IMessageHandlerContext context)
{
return context.Extensions.Get<IMySession>();
}
public static Task Store<T>(this IMessageHandlerContext context, T entity)
{
return context.GetSession().Store(entity);
}
}
Message handlers
One of the benefits of a unit of work is that multiple handlers for the same message will share the same session and commit/rollback together. This is how the handlers look:
class MyMessageHandler1 :
IHandleMessages<MyMessage>
{
static ILog log = LogManager.GetLogger<MyMessageHandler1>();
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
await context.Store(new MyEntity())
.ConfigureAwait(false);
log.Info($"{context.MessageId} got UoW instance {context.GetSession().GetHashCode()}");
}
}
Running the sample
Run the sample. Once running, press any key to send messages. Note that for each given message the two message handlers will get the same session instance.