This article describes how to achieve consistency when modifying business data and sending messages, similar to the outbox, outside the context of an NServiceBus message handler.
The consistency problem
Consider an ASP.NET Core controller that creates a user in the business database and publishes a UserCreated
event. If a failure occurs during the execution of the request, two scenarios may occur, depending on the order of operations.
- Zombie record: The controller creates the
User
in the database first, then publishes theUserCreated
event. If a failure occurs between these two operations:- The user is created in the database, but the
UserCreated
event is not published. - This results in a user in the database, known as a zombie record, which is never announced to the rest of the system.
- The user is created in the database, but the
- Ghost message: The controller publishes the
UserCreated
event first, then creates the user in the database. If a failure occurs between these two operations:- The
UserCreated
event is published, but the user is not created in the database. - The rest of the system is notified about the creation of the user, but the user record is never created. This inconsistency causes errors, as parts of the system expect the record to exist in the database.
- The
In the context of a message handler, the NServiceBus Outbox feature can mitigate this problem. However, these scenarios require that the operations occur within a message handler.
A common technique to address this problem on the client side is to defer all operations to a message handler. This entails sending a message to create the user and publishing the UserCreated
-event from within a message handler. However, this approach is not always feasible. Here are two examples:
- Existing applications often have non-trivial processing logic in controllers. Moving all of it into dedicated message handlers could entail significant effort.
- Processing logic in the controller may assume specific side effects to occur within the scope of the request, e.g., validation, notifications, or error handling. The logic may, therefore, not be ready for the asynchronous, fire-and-forget approach required when offloading work into message handlers.
The TransactionalSession
, when combined with outbox, solves this problem for messages sent and/or published outside the context of a message handler.
Usage
To use the transactional session, first install the transactional session package for a supported persister in the project.
Next, enable the session integration on the endpoint as follows:
//Each persistence has a specific Configure method
var persistence = config.UsePersistence<MyPersistence>();
persistence.EnableTransactionalSession();
To ensure atomic consistency across database and message operations, enable the outbox:
config.EnableOutbox();
The transactional session can be resolved from the container, and must be opened:
var session = scope.ServiceProvider.GetRequiredService<ITransactionalSession>();
await session.Open(new MyPersistenceOpenSessionOptions(),
cancellationToken: cancellationToken);
Sending messages in an atomic manner is done through the ITransactionalSession
instance:
await session.SendLocal(new MyMessage(), cancellationToken);
The persistence-specific database session is accessible via the transactionalSession.
property or via dependency injection. See the persistence-specific documentation for more details.
Once all the operations that are part of the atomic request have been executed, the session should be committed:
await session.Commit(cancellationToken);
Disposing the transactional session without committing will roll back any changes that were made.
The Commit
operation may fail and throw an exception for reasons outlined in the failure scenarios section.
Advanced configuration
Maximum commit duration
The maximum commit duration limits the amount of time it can take for a transaction to commit the changes before the operation times out. The value can be configured when opening the session.
The default value for the maximum commit duration is TimeSpan.
.
await session.Open(new MyPersistenceOpenSessionOptions
{
MaximumCommitDuration = TimeSpan.FromSeconds(15)
},
cancellationToken: cancellationToken);
The maximum commit duration does not represent the total transaction time, but rather the time it takes to complete the commit operation (as observed from the perspective of the control message). In practice, the observed total commit time might be longer due to delays in the transport caused by latency, delayed delivery, load on the input queue, endpoint concurrency limits, and more.
When the control message is consumed, but the outbox record is not yet available in storage, the following formula is applied to delay the message (see Phase 2):
CommitDelayIncrement = 2 * CommitDelayIncrement;
RemainingCommitDuration = RemainingCommitDuration -
(CommitDelayIncrement > RemainingCommitDuration ? RemainingCommitDuration : CommitDelayIncrement)
The default commit delay increment is set to Timespan.
and cannot be overridden.
CommitDelayIncrement
The time increment used to delay the commit of the transactional session when the outbox record is not yet in the storage (more details see Phase 2).
The default value for the commit delay increment is TimeSpan.
.
await session.Open(new MyPersistenceOpenSessionOptions
{
CommitDelayIncrement = TimeSpan.FromSeconds(1)
},
cancellationToken: cancellationToken);
Metadata
It is possible to add metadata (e.g. tenant information) transactional session control message via custom headers. These headers can be accessed by a custom behavior when the control message is received in the TransportReceive
part of the pipeline.
await session.Open(new MyPersistenceOpenSessionOptions
{
Metadata =
{
{ "SomeKey", "SomeValue" }
}
},
cancellationToken: cancellationToken);
Requirements
The transactional session feature requires a supported persistence package to store outgoing messages. This feature is currently supported for the following persistence packages:
Transaction consistency
To guarantee atomic consistency across database and message operations, the transactional session requires the outbox to be enabled. This combination of features provides the strongest consistency guarantees and is, therefore, the recommended, safe-by-default configuration.
The outbox must be enabled explicitly on the endpoint configuration.
With the outbox disabled, database and message operations are not applied until the session is committed. All database operations share the same database transaction and are committed first. When the database operations complete successfully, the message operations are batch-dispatched by the transport. The message operations and the database changes are not guaranteed to be atomic. This might lead to zombie records or ghost messages in case of a failure during the commit phase.
How it works
The transactional session feature guarantees that all outgoing message operations are eventually consistent with the data operations.
Returning to the earlier example of a message handler which creates a User
and then publishes a UserCreated
event, the following process occurs. Details are described following the diagram.
Internally, the transactional session doesn't use a single transaction that spans all the operations. The transactional session acknowledgement occurs in two separate phases:
Phase 1
- The user opens a transactional session.
- A set of
PendingOperations
is initialized and collects the message operations. - A transaction is started on the storage seam.
- The user can execute any required message operations using the transactional session.
- The user can store any data using the persistence-specific session, which is accessible through the transactional session.
- When all operations are registered, the user calls
Commit
on the transactional session. - A control message to complete the transaction is dispatched to the local queue. The control message is independent of the message operations and is not stored in the outbox record.
- The message operations are converted and stored into an outbox record.
- The transaction is committed, and the outbox record and business data modifications are stored atomically.
Steps 7 and 8 are skipped, and as a consequence Phase 2, when no message operations are executed in the transactional session to avoid wasting queue and database resources.
Phase 2
The endpoint receives the control message and processes it as follows:
- Find the outbox record.
- If it exists, and it hasn't been marked as dispatched, and there are pending operations they are dispatched, and the outbox record is set as dispatched.
- If it doesn't exist yet, delay the processing of the control message.
Failure scenarios
The transactional session provides atomic store-and-send guarantees, similar to the outbox feature (except for incoming message de-duplication). That said, it cannot rely on the recoverability mechanism used by the outbox, which uses retries to ensure outgoing messages are dispatched when failures occur. Instead, the control message is used to ensure that exactly one of the following outcomes occur:
- Transaction finishes with data being stored, and outgoing messages eventually sent - when the
Commit
path successfully stores theOutboxRecord
- Transaction finishes with no visible side effects - when the control message stores the
OutboxRecord
Sending the control message first ensures that eventually, the transaction will have an atomic outcome. If the Commit
of the OutboxRecord
succeeds, the control message will ensure the outgoing operations are sent. If the Commit
fails, the control message will (after the maximum commit duration elapses) eventually be consumed, leaving no side effects.
If dispatching the control message fails, the transactional session changes will roll back, and an error will be raised to the user committing the session.
Limitations
- The transactional session cannot be used in send-only endpoints. A full endpoint is required to send a control message to the local queue.
- The transport must have the same or higher availability guarantees as the database.