Transactions in Azure Cosmos DB persistence

By default, the persister does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the Cosmos DB transactional batch API, saga data and/or business data can be atomically committed if everything is stored in the same partition within a container.

A custom behavior must be introduced to identify and insert the PartitionKey value into the pipeline context for use by storage operations that occur during the processing of a given message.

Do not use a message mutator to identify the partition key. Message mutators do not offer the necessary control or timing to reliably interact with this persistence.

The custom behavior can be introduced in one of the two stages:

ITransportReceiveContext stage

This is the earliest stage in the message processing pipeline. At this stage only the message ID, the headers and a byte array representation of the message body are available.

4-pre NServiceBus.Persistence.AzureTable
class PartitionKeyTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next().ConfigureAwait(false);
    }
}
3.x NServiceBus.Persistence.AzureTable
class PartitionKeyTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next().ConfigureAwait(false);
    }
}
0.x NServiceBus.Persistence.CosmosDB
class PartitionKeyTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(new PartitionKey("PartitionKeyValue"));

        await next().ConfigureAwait(false);
    }
}

Interaction with outbox

This stage occurs before the outbox logic is executed. Identifying the PartitionKey during this stage allows the outbox feature to work entirely as intended. In the case where the PartitionKey cannot be identified using only the message headers, a behavior in the IIncomingLogicalMessageContext stage can be introduced instead.

IIncomingLogicalMessageContext stage

This is the first stage in the pipeline that allows access to the deserialized message body. At this stage both the message headers and deserialized message object are available.

4-pre NServiceBus.Persistence.AzureTable
class PartitionKeyIncomingLogicalMessageContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next().ConfigureAwait(false);
    }
}
3.x NServiceBus.Persistence.AzureTable
class PartitionKeyIncomingLogicalMessageContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(new TableEntityPartitionKey("PartitionKeyValue"));

        await next().ConfigureAwait(false);
    }
}
0.x NServiceBus.Persistence.CosmosDB
class PartitionKeyIncomingLogicalMessageContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(new PartitionKey("PartitionKeyValue"));

        await next().ConfigureAwait(false);
    }
}

Interaction with outbox

Outbox message guarantees work by bypassing the remaining message processing pipeline when an outbox record is located. Since this stage occurs after the normal bypass logic is executed, no PartitionKey is available to locate an existing outbox record.

Cosmos DB persistence introduces a new LogicalOutboxBehavior to locate the outbox record and bypass the remaining message processing pipeline in the same IIncomingLogicalMessageContext stage as the custom PartitionKey behavior. As a result, the custom behavior must be inserted into the pipeline before the LogicalOutboxBehavior.

To specify the ordering for the custom PartitionKey behavior:

4-pre NServiceBus.Persistence.AzureTable
public class RegisterMyBehavior : RegisterStep
{
    public RegisterMyBehavior() :
        base(stepId: nameof(PartitionKeyIncomingLogicalMessageContextBehavior),
        behavior: typeof(PartitionKeyIncomingLogicalMessageContextBehavior),
        description: "Determines the PartitionKey from the logical message",
        factoryMethod: b => new PartitionKeyIncomingLogicalMessageContextBehavior())
    {
        InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
    }
}
3.x NServiceBus.Persistence.AzureTable
public class RegisterMyBehavior : RegisterStep
{
    public RegisterMyBehavior() :
        base(stepId: nameof(PartitionKeyIncomingLogicalMessageContextBehavior),
        behavior: typeof(PartitionKeyIncomingLogicalMessageContextBehavior),
        description: "Determines the PartitionKey from the logical message",
        factoryMethod: b => new PartitionKeyIncomingLogicalMessageContextBehavior())
    {
        InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
    }
}
0.x NServiceBus.Persistence.CosmosDB
public class RegisterMyBehavior : RegisterStep
{
    public RegisterMyBehavior() :
        base(stepId: nameof(PartitionKeyIncomingLogicalMessageContextBehavior),
        behavior: typeof(PartitionKeyIncomingLogicalMessageContextBehavior),
        description: "Determines the PartitionKey from the logical message",
        factoryMethod: b => new PartitionKeyIncomingLogicalMessageContextBehavior())
    {
        InsertBeforeIfExists(nameof(LogicalOutboxBehavior));
    }
}

To register the custom PartitionKey behavior:

endpointConfiguration.Pipeline.Register(new RegisterMyBehavior());
Caution must be used when custom behaviors have been introduced in the pipeline that dispatch messages immediately. If these behaviors execute before the LogicalOutboxBehavior the outbox message guarantees may be broken.

Sharing the transaction

Once a behavior is introduced to identify the partition key for a given message, it is possible to share a Cosmos DB TransactionalBatch between both the Saga persistence and business data. The shared TransactionalBatch can then be used to persist document updates for both concerns atomically.

The shared TransactionalBatch will not perform any actions when ExecuteAsync() is called. This allows NServiceBus to safely manage the unit of work. ExecuteAsync does not need to be called within the handler. All stream resources passed to the shared transaction will be properly disposed when NServiceBus executes the batch.

Within a handler method using IMessageHandlerContext

To use the shared TransactionalBatch in a message handler:

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
    //setup the items for the batch...

    var session = context.SynchronizedStorageSession.CosmosPersistenceSession();

    session.Batch
            .CreateItem(test1)
            .ReplaceItem(test2.id, test2)
            .UpsertItem(test3)
            .DeleteItem("/item/id");

    return Task.CompletedTask;
}

Testing

The TestableCosmosSynchronizedStorageSession class in the NServiceBus.Testing namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.

With dependency injection

For custom types that require access to the shared TransactionalBatch:

4-pre NServiceBus.Persistence.AzureTable
class MyCustomDependency
{
    public MyCustomDependency(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public void DeleteInAzureTable(string itemId)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = itemId
        };

        storageSession.Batch.Add(TableOperation.Insert(entity));
    }

    private readonly IAzureTableStorageSession storageSession;
}

class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
    public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
    {
        this.customDependency = customDependency;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        customDependency.DeleteInAzureTable(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly MyCustomDependency customDependency;
}
3.x NServiceBus.Persistence.AzureTable
class MyCustomDependency
{
    public MyCustomDependency(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public void DeleteInAzureTable(string itemId)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = itemId
        };

        storageSession.Batch.Add(TableOperation.Insert(entity));
    }

    private readonly IAzureTableStorageSession storageSession;
}

class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
    public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
    {
        this.customDependency = customDependency;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        customDependency.DeleteInAzureTable(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly MyCustomDependency customDependency;
}
0.x NServiceBus.Persistence.CosmosDB
class MyCustomDependency
{
    private readonly TransactionalBatch transactionalBatch;

    public MyCustomDependency(ICosmosStorageSession storageSession)
    {
        transactionalBatch = storageSession.Batch;
    }

    public void DeleteItemInCosmos(string itemId)
    {
        transactionalBatch.DeleteItem(itemId);
    }
}

class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
    public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
    {
        this.customDependency = customDependency;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        customDependency.DeleteItemInCosmos(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly MyCustomDependency customDependency;
}

And alternatively to using the the extension method IMessageHandlerContext.SynchronizedStorageSession.GetSharedTransactionalBatch():

4-pre NServiceBus.Persistence.AzureTable
public class ToDoActivity : TableEntity
{
    public string Description { get; set; }
}

class MyHandler : IHandleMessages<MyMessage>
{
    public MyHandler(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = "RowKey"
        };

        storageSession.Batch.Add(TableOperation.Insert(entity));

        return Task.CompletedTask;
    }

    private readonly IAzureTableStorageSession storageSession;
}
3.x NServiceBus.Persistence.AzureTable
public class ToDoActivity : TableEntity
{
    public string Description { get; set; }
}

class MyHandler : IHandleMessages<MyMessage>
{
    public MyHandler(IAzureTableStorageSession storageSession)
    {
        this.storageSession = storageSession;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var entity = new ToDoActivity
        {
            PartitionKey = storageSession.PartitionKey,
            RowKey = "RowKey"
        };

        storageSession.Batch.Add(TableOperation.Insert(entity));

        return Task.CompletedTask;
    }

    private readonly IAzureTableStorageSession storageSession;
}
0.x NServiceBus.Persistence.CosmosDB
class MyHandler : IHandleMessages<MyMessage>
{
    public MyHandler(ICosmosStorageSession storageSession)
    {
        transactionalBatch = storageSession.Batch;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        transactionalBatch.DeleteItem(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly TransactionalBatch transactionalBatch;
}

Related Articles

  • Outbox
    Reliable messaging without distributed transactions.

Last modified