Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Transactions in Azure Table persistence

Target Version: NServiceBus 7.x

By default, the persistence does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the TableBatchOperation API, saga data and/or business data can be atomically committed if everything is stored in the same partition within the same table.

A custom behavior must be introduced to identify and insert the TableEntityPartitionKey value into the pipeline context for use by storage operations that occur during the processing of a given message.

The custom behavior can be introduced in one of the two stages:

ITransportReceiveContext stage

This is the earliest stage in the message processing pipeline. At this stage only the message ID, the headers and a byte array representation of the message body are available.

class PartitionKeyTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next();
    }
}

Interaction with outbox

This stage occurs before the outbox logic is executed. Identifying the TableEntityPartitionKey during this stage allows the outbox feature to work entirely as intended. In the case where the TableEntityPartitionKey cannot be identified using only the message headers, a behavior in the IIncomingLogicalMessageContext stage can be introduced instead.

IIncomingLogicalMessageContext stage

This is the first stage in the pipeline that allows access to the deserialized message body. At this stage both the message headers and deserialized message object are available.

class PartitionKeyIncomingLogicalMessageContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next();
    }
}

Interaction with outbox

Outbox message guarantees work by bypassing the remaining message processing pipeline when an outbox record is located. Since this stage occurs after the normal bypass logic is executed, no TableEntityPartitionKey is available to locate an existing outbox record.

Azure Table persistence introduces a new LogicalOutboxBehavior to locate the outbox record and bypass the remaining message processing pipeline in the same IIncomingLogicalMessageContext stage as the custom TableEntityPartitionKey behavior. As a result, the custom behavior must be inserted into the pipeline before the LogicalOutboxBehavior.

To specify the ordering for the custom TableEntityPartitionKey behavior:

public class RegisterMyBehavior : RegisterStep
{
    public RegisterMyBehavior() :
        base(stepId: nameof(PartitionKeyIncomingLogicalMessageContextBehavior),
        behavior: typeof(PartitionKeyIncomingLogicalMessageContextBehavior),
        description: "Determines the PartitionKey from the logical message",
        factoryMethod: b => new PartitionKeyIncomingLogicalMessageContextBehavior())
    {
        InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
    }
}

To register the custom TableEntityPartitionKey behavior:

endpointConfiguration.Pipeline.Register(new RegisterMyBehavior());

Sharing the transaction

Once a behavior is introduced to identify the partition key for a given message, it is possible to share a Azure Table TableBatchOperation between both the Saga persistence and business data. The shared TableBatchOperation can then be used to persist document updates for both concerns atomically.

Within a handler method using IMessageHandlerContext

To use the shared TableBatchOperation in a message handler:

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
    var session = context.SynchronizedStorageSession.AzureTablePersistenceSession();

    var test1 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };
    var test2 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };
    var test3 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };
    var test4 = new ToDoActivity { PartitionKey = session.PartitionKey, RowKey = Guid.NewGuid().ToString() };

    session.Batch.Add(TableOperation.Insert(test1));
    session.Batch.Add(TableOperation.Replace(test2));
    session.Batch.Add(TableOperation.InsertOrReplace(test3));
    session.Batch.Add(TableOperation.Delete(test4));

    return Task.CompletedTask;
}

Testing

The TestableAzureTableStorageSession class in the NServiceBus.Testing namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.

With dependency injection

For custom types that require access to the shared TableBatchOperation:

class MyCustomDependency
{
    public MyCustomDependency(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public void DeleteInAzureTable(string itemId)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = itemId
        };

        storageSession.Batch.Add(TableOperation.Insert(entity));
    }

    private readonly IAzureTableStorageSession storageSession;
}

class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
    public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
    {
        this.customDependency = customDependency;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        customDependency.DeleteInAzureTable(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly MyCustomDependency customDependency;
}

And alternatively to using the the extension method IMessageHandlerContext.SynchronizedStorageSession.AzureTablePersistenceSession():

public class ToDoActivity : TableEntity
{
    public string Description { get; set; }
}

class MyHandler : IHandleMessages<MyMessage>
{
    public MyHandler(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = "RowKey"
        };

        storageSession.Batch.Add(TableOperation.Insert(entity));

        return Task.CompletedTask;
    }

    private readonly IAzureTableStorageSession storageSession;
}

Related Articles

  • Outbox
    Ensure message consistency with the NServiceBus Outbox, handling message deduplication and transactional integrity in distributed systems.