In most cases, handlers are meant to modify the internal state of an application based on the received message. In any system, it is critical to ensure the state change is persisted exactly once. In a messaging system, this can be a challenge as exactly-once delivery is not guaranteed by any single queuing technology. NServiceBus provides several strategies to mitigate the risk of an inconsistent application state.
Accessing the application state can be achieved in a number of ways and should not be enforced or restricted by NServiceBus. The scenarios below provide guidance on several ways of accessing the application state by using the same data context as NServiceBus uses internally.
TransactionScope
This section applies only to .NET Framework as .NET Core does not support distributed transactions.
In .NET, using the TransactionScope makes a block of code transactional, i.e. any code that stores data will automatically enlist in the transaction.
This is configured on the transport and automatically includes any message handler.
transport.Transactions(TransportTransactionMode.TransactionScope);
As long as the database resource also supports distributed transactions, the code in a handler will automatically enlist in the transaction without additional configuration. If a handler fails in processing a message, the data in the datastore will be rolled back automatically.
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var connection = new SqlConnection(Configuration.ConnectionString);
var command = CreateStoreOrderCommand(message);
await command.ExecuteReaderAsync();
}
When using a transaction mode lower than TransactionScope
, NServiceBus also provides the ability to wrap handlers inside a TransactionScope to avoid partial updates.
Synchronized storage session
A synchronized storage session is an NServiceBus implementation of the unit of work pattern. It provides a data access context that is shared by all handlers that process a given message. The state change is committed after the execution of message handlers, provided that there are no exceptions during processing. The synchronized storage session is accessible via the IMessageHandlerContext
:
public Task Handle(MyMessage message,
IMessageHandlerContext context)
{
var session = context.SynchronizedStorageSession
.MyPersistenceSession();
//Business logic
return Task.CompletedTask;
}
The synchronized storage session feature is supported by most NServiceBus persistence packages, and additional guidance for accessing data can be found in the documentation for each persister:
Synchronized storage session by itself only guarantees that there will be no partial failures, i.e., cases where one of the handlers has modified its state while another has not. This guarantee extends to sagas as they are persisted using the synchronized storage session.
However, the synchronized storage session does not guarantee that each state change is persisted exactly once. To ensure exactly-once message processing, the synchronized storage session must support a de-duplication strategy.
Object/relational mappers
When using an object/relational mapper (ORM) like Entity Framework for data access, as seen in this sample configuration, there is the ability to either inject the data context object via dependency injection or create a data context on the fly and reuse the connection.
Creating a data context on the fly means that any other handler will work disconnected from that data context. This is a fairly simple approach, but it is not recommended when the same message is processed by multiple handlers.
An alternative option that works with multiple handlers processing a single message is to inject the data context of an ORM via dependency injection. More information can be found in the SQL persistence documentation about accessing data.
Message de-duplication strategies
NServiceBus supports multiple message de-duplication strategies that suit a wide range of message processing and data storage technologies.
Local transactions
The SQL Server and PostgreSQL transports allow using a single database transaction to modify application state and send/receive messages. The persistence detects if the message processing context contains an open transaction and the synchronized storage session joins that transaction. When the transaction is committed, the following are all committed atomically:
- State changes made by handlers
- Receipt of incoming messages
- Sending of outgoing messages
This guarantees that no duplicate messages are sent.
The SQL Server transport shares transaction context with SQL persistence in the ReceiveOnly
, SendsAtomicWithReceive
, and TransactionScope
transaction modes and with NHibernate persistence in the TransactionScope
transaction mode.
The PostgreSQL transport shares transaction context with SQL persistence in the ReceiveOnly
and SendsAtomicWithReceive
transaction modes.
Distributed transactions
Distributed transactions are atomic and durable transactions that span multiple transactional resources (like databases or queues). By enlisting both transport and persistence into the same distributed transaction, NServiceBus can guarantee exactly-once message processing by preventing duplicate messages from being created.
Distributed transactions are supported by the following transport and persistence components:
- SQL persistence
- NHibernate persistence
- SQL Server transport
- MSMQ transport through distributed transactions
To use this mode, the transport must be configured to use the TransactionScope
transport transaction mode. When using the SQL Server transport, the distributed transaction mode allows the use of separate SQL Server instances for message stores (queues) and for business data.
Outbox
The outbox is a pattern that provides an exactly-once message processing experience even when dealing with transports and databases that don't support distributed transactions, such as RabbitMQ and MongoDB. This is done by storing the incoming message ID and the outgoing messages in the same transaction as the business state change.
The outbox can be used with any transport and with any persistence component that supports synchronized storage sessions.
Instead of preventing the duplicates, the outbox detects them and ensures that the effects of processing duplicate messages are ignored and not persisted.
Manual de-duplication
In situations where neither of the built-in de-duplication strategies can be applied, the de-duplication of messages must be handled at the application level, in the message handler itself. In these cases, the synchronized storage session should not be used, and each handler should guarantee the idempotence of its behavior.
Idempotence caveats
Message-processing logic is idempotent if it can be applied multiple times, and the outcome is the same as if it were applied once. The outcome includes both the application state changes and the potential outgoing messages sent. Consider the following pseudocode that demonstrates how not to implement idempotent message handling:
public async Task Handle(MyMessage message,
IMessageHandlerContext context)
{
if (IsDuplicate(message))
{
return;
}
await context.Send(new MyOutgoingMessage());
await ModifyState();
}
and think about the behavior of the message processing:
- NServiceBus, by default, defers sending messages until the message handler has finished, so the behavior of the code above is as if the call to
Send
was after the call toModifyState
. - If outgoing messages are sent before the state change is committed (e.g., if the code above used immediate dispatch), there is a risk of creating ghost messages -- messages that carry the state change that has never been made durable.
- If outgoing messages are sent after the state change is committed, there is a risk of message loss if the send operation fails. To prevent this, the outgoing messages must be re-sent even if it appears to be a duplicate.
- If re-sending messages is implemented, multiple copies of the same message may be sent to the downstream endpoints.
- If message identity is used for de-duplication, message IDs must be generated deterministically.
- If outgoing messages depend on the application state, the code above is incorrect when messages can get re-ordered (e.g. by infrastructure failures, recoverability or competing consumers).