Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Transactions in Azure Table persistence

Target Version: NServiceBus 9.x

By default, the persistence does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the TableTransactionAction API, saga data and/or business data can be atomically committed if everything is stored in the same partition within the same table.

A custom behavior must be introduced to identify and insert the TableEntityPartitionKey value into the pipeline context for use by storage operations that occur during the processing of a given message.

The custom behavior can be introduced in one of the two stages:

ITransportReceiveContext stage

This is the earliest stage in the message processing pipeline. At this stage only the message ID, the headers and a byte array representation of the message body are available.

class PartitionKeyTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next();
    }
}

Interaction with outbox

This stage occurs before the outbox logic is executed. Identifying the TableEntityPartitionKey during this stage allows the outbox feature to work entirely as intended. In the case where the TableEntityPartitionKey cannot be identified using only the message headers, a behavior in the IIncomingLogicalMessageContext stage can be introduced instead.

IIncomingLogicalMessageContext stage

This is the first stage in the pipeline that allows access to the deserialized message body. At this stage both the message headers and deserialized message object are available.

class PartitionKeyIncomingLogicalMessageContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next();
    }
}

Interaction with outbox

Outbox message guarantees work by bypassing the remaining message processing pipeline when an outbox record is located. Since this stage occurs after the normal bypass logic is executed, no TableEntityPartitionKey is available to locate an existing outbox record.

Azure Table persistence introduces a new LogicalOutboxBehavior to locate the outbox record and bypass the remaining message processing pipeline in the same IIncomingLogicalMessageContext stage as the custom TableEntityPartitionKey behavior. As a result, the custom behavior must be inserted into the pipeline before the LogicalOutboxBehavior.

To specify the ordering for the custom TableEntityPartitionKey behavior:

public class RegisterMyBehavior : RegisterStep
{
    public RegisterMyBehavior() :
        base(stepId: nameof(PartitionKeyIncomingLogicalMessageContextBehavior),
        behavior: typeof(PartitionKeyIncomingLogicalMessageContextBehavior),
        description: "Determines the PartitionKey from the logical message",
        factoryMethod: b => new PartitionKeyIncomingLogicalMessageContextBehavior())
    {
        InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
    }
}

To register the custom TableEntityPartitionKey behavior:

endpointConfiguration.Pipeline.Register(new RegisterMyBehavior());

Sharing the transaction

Once a behavior is introduced to identify the partition key for a given message, it is possible to share a batch, represented in Azure Table by List<TableTransactionAction> between both the Saga persistence and business data. This list can then be used to persist document updates for both concerns atomically.

Within a handler method using IMessageHandlerContext

To use the shared List<TableTransactionAction> in a message handler:

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
    var session = context.SynchronizedStorageSession.AzureTablePersistenceSession();

    var test1 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };
    var test2 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };
    var test3 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };
    var test4 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };

    session.Batch.Add(new TableTransactionAction(TableTransactionActionType.Add, test1));
    session.Batch.Add(new TableTransactionAction(TableTransactionActionType.UpdateReplace, test2));
    session.Batch.Add(new TableTransactionAction(TableTransactionActionType.UpsertReplace, test3));
    session.Batch.Add(new TableTransactionAction(TableTransactionActionType.Delete, test4));

    return Task.CompletedTask;
}

Testing

The TestableAzureTableStorageSession class in the NServiceBus.Testing namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.

With dependency injection

For custom types that require access to the shared List<TableTransactionAction>:

class MyCustomDependency
{
    public MyCustomDependency(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public void DeleteInAzureTable(string itemId)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = itemId
        };

        storageSession.Batch.Add(new TableTransactionAction(TableTransactionActionType.Add, entity));
    }

    private readonly IAzureTableStorageSession storageSession;
}

class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
    public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
    {
        this.customDependency = customDependency;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        customDependency.DeleteInAzureTable(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly MyCustomDependency customDependency;
}

And alternatively to using the the extension method IMessageHandlerContext.SynchronizedStorageSession.AzureTablePersistenceSession():

public class ToDoActivity : ITableEntity
{
    public string Description { get; set; }
    public string PartitionKey { get; set; }
    public string RowKey { get; set; }
    public DateTimeOffset? Timestamp { get; set; }
    public ETag ETag { get; set; }
}

class MyHandler : IHandleMessages<MyMessage>
{
    public MyHandler(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = "RowKey"
        };

        storageSession.Batch.Add(new TableTransactionAction(TableTransactionActionType.Add, entity));

        return Task.CompletedTask;
    }

    private readonly IAzureTableStorageSession storageSession;
}

Related Articles

  • Outbox
    Ensure message consistency with the NServiceBus Outbox, handling message deduplication and transactional integrity in distributed systems.