Transactions in Azure Cosmos DB persistence

NuGet Package: NServiceBus.Persistence.CosmosDB (1.x)
Target Version: NServiceBus 7.x

By default, the persister does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the Cosmos DB transactional batch API, saga data and/or business data can be atomically committed if everything is stored in the same partition within a container.

The Cosmos DB persistence provides the several ways to specify the partition and Container used per message using message headers or the message contents to allow the use of transactions.

Using message headers only has the advantage being able to identify the partition or Container before the outbox logic is executed, which allows the outbox feature to work entirely as intended.

In the case where the partition or Container cannot be identified using only the message headers, the message contents can be used. This works because the Cosmos DB persistence introduces additional outbox logic to locate the outbox record and bypass the remaining message processing pipeline at a later stage of processing.

Specifying the PartitionKey to use for the transaction

All operations in a Azure Cosmos DB transaction must be executed with documents contained in the same Container partition, identified by the PartitionKey.

Using message header values

A single message header value can be used to specify the PartitionKey for the partition:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeader("PartitionKeyHeader");

Multiple message header values can also be used. Additionally overloads exist that allow a state object to be provided and passed when the extractor is called to avoid unnecessary allocations:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeaders(headers => new PartitionKey(headers["PartitionKeyHeader"]));

A custom class that implements IPartitionKeyFromHeadersExtractor can be implemented to specify the PartitionKey using message headers:

public class CustomPartitionKeyFromHeadersExtractor : IPartitionKeyFromHeadersExtractor
{
    public bool TryExtract(IReadOnlyDictionary<string, string> headers, out PartitionKey? partitionKey)
    {
        if (headers.TryGetValue("PartitionKeyHeader", out var headerVal))
        {
            partitionKey = new PartitionKey(headerVal);
            return true;
        }

        partitionKey = null;
        return false;
    }
}

The IPartitionKeyFromHeadersExtractor implementation can be configured via the API:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeaders(new CustomPartitionKeyFromHeadersExtractor());

or registered via dependency injection:

endpointConfiguration.RegisterComponents(c => c.ConfigureComponent<CustomPartitionKeyFromHeadersExtractor>(DependencyLifecycle.SingleInstance));
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.

Besides those API methods shown here, additional overloads are available for extracting PartitionKey.

Using the message contents

The message contents can be accessed to specify the PartitionKey of the partition for the transaction:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessage<MyMessage>(message => new PartitionKey(message.ItemId));

A custom class that implements IPartitionKeyFromMessageExtractor can be implemented that can access the message contents and headers to specify the partition to use for the transaction:

class CustomPartitionKeyFromMessageExtractor : IPartitionKeyFromMessageExtractor
{
    public bool TryExtract(object message, IReadOnlyDictionary<string, string> headers, out PartitionKey? partitionKey)
    {
        if (message is MyMessage myMessage)
        {
            partitionKey = new PartitionKey(myMessage.ItemId);
            return true;
        }

        partitionKey = null;
        return false;
    }
}

The IPartitionKeyFromMessageExtractor implementation can be configured using the API:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessages(new CustomPartitionKeyFromMessageExtractor());

or registered via dependency injection:

endpointConfiguration.RegisterComponents(c => c.ConfigureComponent<CustomPartitionKeyFromMessageExtractor>(DependencyLifecycle.SingleInstance));
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.

Additional overloads are available for extracting PartitionKey.

Specifying the Container to use for the transaction

The Container to use can be specified by defining a default container:

Set the default container using the following configuration API:

endpointConfiguration.UsePersistence<CosmosPersistence>()
    .CosmosClient(new CosmosClient("ConnectionString"))
    .DefaultContainer(
        containerName: "ContainerName",
        partitionKeyPath: "/partition/key/path");

When installers are enabled the default container will be created if it doesn't exist. To opt-out from creating the default container either disable the installers or use

endpointConfiguration.UsePersistence<CosmosPersistence>()
    .CosmosClient(new CosmosClient("ConnectionString"))
    .DefaultContainer(
        containerName: "ContainerName",
        partitionKeyPath: "/partition/key/path")
    .DisableContainerCreation();

Optionally, the Container to use can specified during message processing by providing the Container name and partition key path using the ContainerInformation object.

When the container name and partition key path are provided during message processing it takes precedence over any default container configured.

Using message header values

The presence of a header value can be used to specify the container:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeader("ContainerKey", new ContainerInformation("ContainerName", new PartitionKeyPath("/partitionKey")));

A single message header value can be used to specify the container:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeader("ContainerKey", headerValue => new ContainerInformation(headerValue, new PartitionKeyPath("/partitionKey")));

Multiple message header values can also be used. Additionally overloads exist that allow a state object to be passed when the extractor is called to avoid unnecessary allocations:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeaders(headers => new ContainerInformation(headers["ContainerNameHeader"], new PartitionKeyPath("/partitionKeyPath")));

A custom class that implements IContainerInformationFromHeadersExtractor can be implemented to specify the Container using message headers:

public class CustomContainerInformationFromHeadersExtractor : IContainerInformationFromHeadersExtractor
{
    public bool TryExtract(IReadOnlyDictionary<string, string> headers, out ContainerInformation? containerInformation)
    {
        if (headers.TryGetValue("ContainerInformationHeader", out var headerVal))
        {
            containerInformation = new ContainerInformation(headerVal, new PartitionKeyPath("/partitionKey"));
            return true;
        }

        containerInformation = null;
        return false;
    }
}

The IContainerInformationFromHeadersExtractor implementation can be configured using the API:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeaders(new CustomContainerInformationFromHeadersExtractor());

or registered via dependency injection:

endpointConfiguration.RegisterComponents(c => c.ConfigureComponent<CustomContainerInformationFromHeadersExtractor>(DependencyLifecycle.SingleInstance));
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.

Besides those API methods shown here, additional overloads are available for extracting ContainerInformation from headers.

Using the message contents

A container can be specified on a per-message type basis:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<MyMessage>(new ContainerInformation("ContainerName", new PartitionKeyPath("/partitionKey")));

The message contents can be accessed to specify the container to use for the transaction:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<MyMessage>(message => new ContainerInformation(message.ItemId.ToString(), new PartitionKeyPath("/partitionKey")));

A custom class that implements IContainerInformationFromMessagesExtractor can be implemented that makes use of the messages and headers to specify the container to use for the transaction:

class CustomContainerInformationFromMessagesExtractor : IContainerInformationFromMessagesExtractor
{
    public bool TryExtract(object message, IReadOnlyDictionary<string, string> headers, out ContainerInformation? containerInformation)
    {
        if (message is MyMessage myMessage)
        {
            containerInformation = new ContainerInformation("ContainerNameForMyMessage", new PartitionKeyPath("/partitionKeyPath"));
            return true;
        }

        containerInformation = null;
        return false;
    }
}

The IContainerInformationFromMessagesExtractor implementation can be configured using the API:

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage(new CustomContainerInformationFromMessagesExtractor());

or registered via dependency injection:

endpointConfiguration.RegisterComponents(c => c.ConfigureComponent<CustomContainerInformationFromMessagesExtractor>(DependencyLifecycle.SingleInstance));
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.

Additional overloads are available for extracting ContainerInformation from the message.

Sharing the transaction

It is possible to share a Cosmos DB TransactionalBatch between both the Saga persistence and business data. The shared TransactionalBatch can then be used to persist document updates for both concerns atomically.

The shared TransactionalBatch will not perform any actions when ExecuteAsync() is called. This allows NServiceBus to safely manage the unit of work. ExecuteAsync does not need to be called within the handler. All stream resources passed to the shared transaction will be properly disposed when NServiceBus executes the batch.

Within a handler method using IMessageHandlerContext

To use the shared TransactionalBatch in a message handler:

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
    //setup the items for the batch...

    var session = context.SynchronizedStorageSession.CosmosPersistenceSession();

    session.Batch
            .CreateItem(test1)
            .ReplaceItem(test2.id, test2)
            .UpsertItem(test3)
            .DeleteItem("/item/id");

    return Task.CompletedTask;
}

Testing

The TestableCosmosSynchronizedStorageSession class in the NServiceBus.Testing namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.

With dependency injection

For custom types that require access to the shared TransactionalBatch:

class MyCustomDependency
{
    private readonly TransactionalBatch transactionalBatch;

    public MyCustomDependency(ICosmosStorageSession storageSession)
    {
        transactionalBatch = storageSession.Batch;
    }

    public void DeleteItemInCosmos(string itemId)
    {
        transactionalBatch.DeleteItem(itemId);
    }
}

class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
    public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
    {
        this.customDependency = customDependency;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        customDependency.DeleteItemInCosmos(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly MyCustomDependency customDependency;
}

And alternatively to using the the extension method IMessageHandlerContext.SynchronizedStorageSession.GetSharedTransactionalBatch():

class MyHandler : IHandleMessages<MyMessage>
{
    public MyHandler(ICosmosStorageSession storageSession)
    {
        transactionalBatch = storageSession.Batch;
    }

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        transactionalBatch.DeleteItem(message.ItemId);

        return Task.CompletedTask;
    }

    private readonly TransactionalBatch transactionalBatch;
}

Related Articles

  • Outbox
    Reliable messaging without distributed transactions.

Last modified