Accessing and modifying data

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

In most cases handlers are meant to modify the internal state of an application based on the received message. The scenarios below discuss in detail how NServiceBus transaction and persistence settings affect the way business data is stored.

Without using NServiceBus persistence

The simplest way to modify application data from code running inside NServiceBus (i.e. a message handler) is by using a user-managed connection and transaction. Transport transaction mode has to be taken into account when designing such code.

Transport in native transaction mode

When the selected transport is configured to use native transactions, either in ReceiveOnly or SendsAtomicWithReceive mode, the data access code in the handler can be executed multiple times for a single message. This can lead to data corruption if that code is not idempotent (ensuring side effects of message processing are the same, no matter how many times the code is invoked). Here's an example of data access code that is not idempotent:

Edit
public class NonIdempotentHandler :
    IHandleMessages<AddOrderLine>
{
    IMyOrm orm;

    public NonIdempotentHandler(IMyOrm orm)
    {
        this.orm = orm;
    }

    public async Task Handle(AddOrderLine message, IMessageHandlerContext context)
    {
        using (var session = orm.OpenSession())
        {
            var order = session.Get(message.OrderId);
            order.AddLine(message.Product, message.Quantity);
            session.Commit();
        }
    }
}

If the handler with the above code is invoked more than once, for example due to a transient problem with the transport while committing the receive transaction, then the data will get corrupted which might result in duplicate rows to be inserted. The following code shows how to mitigate the problem:

Edit
public class IdempotentHandler :
    IHandleMessages<AddOrderLine>
{
    IMyOrm orm;

    public IdempotentHandler(IMyOrm orm)
    {
        this.orm = orm;
    }

    public async Task Handle(AddOrderLine message, IMessageHandlerContext context)
    {
        using (var session = orm.OpenSession())
        {
            var order = session.Get(message.OrderId);
            if (order.HasLine(message.LineId))
            {
                return;
            }

            order.AddLine(message.Product, message.Quantity);
            session.Commit();
        }
    }
}

The downside of this approach is the fact that the code gets more complex. Sometimes it is hard to get enough information from the incoming message to create correct idempotent handling logic. This is especially true for commands which don't have natural identity in the same way events have.

Transport in distributed transaction mode

When the selected transport is configured to use distributed transactions via Distributed Transaction Coordinator (DTC) service, the handler is executed within an ambient transaction scope. If the data store supports enlisting in a distributed transaction (e.g. SQL Server, Oracle), the data modifications are guaranteed to be applied in a single atomic operation together with message receive operation.

Using NServiceBus persistence

Instead of managing of connections and transactions themselves, users can delegate the management of data store to NServiceBus persistence. This approach has a number of advantages:

  • NServiceBus guarantees best practices are followed when it comes to managing data store's connection.
  • Data access context is automatically shared between all handlers executed for a given message, making it easier to guarantee idempotency (no partial successes where one handler managed to commit the changes while other didn't).
  • Data access context is also shared with the Saga that might participate in handling a given message.

The downside to this approach is that, in order to share the same data access context across business data transactions and NServiceBus internal database actions, the database technology used must be one of the NServiceBus supported persistence options. NServiceBus supports SQL Server and Oracle via NServiceBus.NHibernate persistence and RavenDB via NServiceBus.RavenDB persistence.

There is support for accessing business data via NServiceBus Azure Storage persistence because Azure data stores support only single-entity operations.

Users can take advantage of this NServiceBus-managed data access context via SynchronizedStorageSession object returned by SynchronizedStorageSession() extension method on the message handling context. This object is a hook for persistence-specific extension methods that allow to obtain the connection to underlying data store.

The documentation below provides more detail on how to share the same data access context for business data and NServiceBus, when using:

Transport in native transaction mode

In this mode the NServiceBus-managed data store context can be committed multiple times for a single message and it is up to the user to guarantee idempotency. The difference between user-managed connections, though, is the fact that data store context is shared between the handlers so there might be one (possibly generic) handler that takes care of the idempotency, allowing others to focus on pure business problem.

Edit
public class IdempotencyEnforcer :
    IHandleMessages<OrderMessage>
{
    public Task Handle(OrderMessage message, IMessageHandlerContext context)
    {
        var session = context.SynchronizedStorageSession.MyOrmSession();
        var order = session.Get(message.OrderId);
        if (this.MessageHasAlreadyBeenProcessed(context.MessageId, order))
        {
            // Subsequent handlers are not invoked as the message has already been processed.
            context.DoNotContinueDispatchingCurrentMessageToHandlers();
        }
        else
        {
            this.MarkAsProcessed(context.MessageId, order);
        }
        return Task.CompletedTask;
    }
}

public class NonIdempotentHandler :
    IHandleMessages<AddOrderLine>
{
    public async Task Handle(AddOrderLine message, IMessageHandlerContext context)
    {
        var session = context.SynchronizedStorageSession.MyOrmSession();
        var order = session.Get(message.OrderId);
        order.AddLine(message.Product, message.Quantity);
    }
}

public void ConfigureEndpoint(EndpointConfiguration endpointConfiguration)
{
    endpointConfiguration.ExecuteTheseHandlersFirst(typeof(IdempotencyEnforcer));
    endpointConfiguration.UsePersistence<MyPersistence>();
}

Outbox

When using the Outbox NServiceBus itself guarantees the idempotency of data access operations executed via SynchronizedStorageSession. The Outbox is a generic implementation of the IdempotencyEnforcer handler from the previous section.

Related Articles


Last modified