By default, the persistence does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the TableTransactionAction API, saga data and/or business data can be atomically committed if everything is stored in the same partition within the same table.
A custom behavior must be introduced to identify and insert the TableEntityPartitionKey
value into the pipeline context for use by storage operations that occur during the processing of a given message.
Do not use a message mutator to identify the partition key. Message mutators do not offer the necessary control or timing to reliably interact with this persistence.
The custom behavior can be introduced in one of the two stages:
ITransportReceiveContext
stage
This is the earliest stage in the message processing pipeline. At this stage only the message ID, the headers and a byte array representation of the message body are available.
class PartitionKeyTransportReceiveContextBehavior
: Behavior<ITransportReceiveContext>
{
public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));
await next();
}
}
Interaction with outbox
This stage occurs before the outbox logic is executed. Identifying the TableEntityPartitionKey
during this stage allows the outbox feature to work entirely as intended. In the case where the TableEntityPartitionKey
cannot be identified using only the message headers, a behavior in the IIncomingLogicalMessageContext
stage can be introduced instead.
IIncomingLogicalMessageContext
stage
This is the first stage in the pipeline that allows access to the deserialized message body. At this stage both the message headers and deserialized message object are available.
class PartitionKeyIncomingLogicalMessageContextBehavior
: Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));
await next();
}
}
Interaction with outbox
Outbox message guarantees work by bypassing the remaining message processing pipeline when an outbox record is located. Since this stage occurs after the normal bypass logic is executed, no TableEntityPartitionKey
is available to locate an existing outbox record.
Azure Table persistence introduces a new LogicalOutboxBehavior
to locate the outbox record and bypass the remaining message processing pipeline in the same IIncomingLogicalMessageContext
stage as the custom TableEntityPartitionKey
behavior. As a result, the custom behavior must be inserted into the pipeline before the LogicalOutboxBehavior
.
To specify the ordering for the custom TableEntityPartitionKey
behavior:
public class RegisterMyBehavior : RegisterStep
{
public RegisterMyBehavior() :
base(stepId: nameof(PartitionKeyIncomingLogicalMessageContextBehavior),
behavior: typeof(PartitionKeyIncomingLogicalMessageContextBehavior),
description: "Determines the PartitionKey from the logical message",
factoryMethod: b => new PartitionKeyIncomingLogicalMessageContextBehavior())
{
InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
}
}
To register the custom TableEntityPartitionKey
behavior:
endpointConfiguration.Pipeline.Register(new RegisterMyBehavior());
Caution must be used when custom behaviors have been introduced in the pipeline that dispatch messages immediately. If these behaviors execute before the LogicalOutboxBehavior
the outbox message guarantees may be broken.
Sharing the transaction
Once a behavior is introduced to identify the partition key for a given message, it is possible to share a batch, represented in Azure Table by List
between both the Saga persistence and business data. This list can then be used to persist document updates for both concerns atomically.
Within a handler method using IMessageHandlerContext
To use the shared List
in a message handler:
Testing
The TestableAzureTableStorageSession
class in the NServiceBus.
namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.
With dependency injection
For custom types that require access to the shared List
:
class MyCustomDependency
{
public MyCustomDependency(IAzureTableStorageSession storageSession)
{
this.storageSession = storageSession;
}
public void DeleteInAzureTable(string itemId)
{
var entity = new ToDoActivity
{
PartitionKey = storageSession.PartitionKey,
RowKey = itemId
};
storageSession.Batch.Add(new TableTransactionAction(TableTransactionActionType.Add, entity));
}
private readonly IAzureTableStorageSession storageSession;
}
class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
{
this.customDependency = customDependency;
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
customDependency.DeleteInAzureTable(message.ItemId);
return Task.CompletedTask;
}
private readonly MyCustomDependency customDependency;
}
And alternatively to using the the extension method IMessageHandlerContext.
:
public class ToDoActivity : ITableEntity
{
public string Description { get; set; }
public string PartitionKey { get; set; }
public string RowKey { get; set; }
public DateTimeOffset? Timestamp { get; set; }
public ETag ETag { get; set; }
}
class MyHandler : IHandleMessages<MyMessage>
{
public MyHandler(IAzureTableStorageSession storageSession)
{
this.storageSession = storageSession;
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
var entity = new ToDoActivity
{
PartitionKey = storageSession.PartitionKey,
RowKey = "RowKey"
};
storageSession.Batch.Add(new TableTransactionAction(TableTransactionActionType.Add, entity));
return Task.CompletedTask;
}
private readonly IAzureTableStorageSession storageSession;
}