By default, the persister does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the Cosmos DB transactional batch API, saga data and/or business data can be atomically committed if everything is stored in the same partition within a container.
The Cosmos DB persistence provides the several ways to specify the partition and Container used per message using message headers or the message contents to allow the use of transactions.
Using message headers only has the advantage being able to identify the partition or Container before the outbox logic is executed, which allows the outbox feature to work entirely as intended.
In the case where the partition or Container cannot be identified using only the message headers, the message contents can be used. This works because the Cosmos DB persistence introduces additional outbox logic to locate the outbox record and bypass the remaining message processing pipeline at a later stage of processing.
Specifying the PartitionKey to use for the transaction
All operations in a Azure Cosmos DB transaction must be executed with documents contained in the same Container partition, identified by the PartitionKey.
Using message header values
A single message header value can be used to specify the PartitionKey for the partition:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeader("PartitionKeyHeader");
Multiple message header values can also be used. Additionally overloads exist that allow a state object to be provided and passed when the extractor is called to avoid unnecessary allocations:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeaders(headers => new PartitionKey(headers["PartitionKeyHeader"]));
A custom class that implements IPartitionKeyFromHeadersExtractor can be implemented to specify the PartitionKey using message headers:
public class CustomPartitionKeyFromHeadersExtractor : IPartitionKeyFromHeadersExtractor
{
public bool TryExtract(IReadOnlyDictionary<string, string> headers, out PartitionKey? partitionKey)
{
if (headers.TryGetValue("PartitionKeyHeader", out var headerVal))
{
partitionKey = new PartitionKey(headerVal);
return true;
}
partitionKey = null;
return false;
}
}
The IPartitionKeyFromHeadersExtractor implementation can be configured via the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeaders(new CustomPartitionKeyFromHeadersExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomPartitionKeyFromHeadersExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Besides those API methods shown here, additional overloads are available for extracting PartitionKey.
Using the message contents
The message contents can be accessed to specify the PartitionKey of the partition for the transaction:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessage<MyMessage>(message => new PartitionKey(message.ItemId));
A custom class that implements IPartitionKeyFromMessageExtractor can be implemented that can access the message contents and headers to specify the partition to use for the transaction:
class CustomPartitionKeyFromMessageExtractor : IPartitionKeyFromMessageExtractor
{
public bool TryExtract(object message, IReadOnlyDictionary<string, string> headers, out PartitionKey? partitionKey)
{
if (message is MyMessage myMessage)
{
partitionKey = new PartitionKey(myMessage.ItemId);
return true;
}
partitionKey = null;
return false;
}
}
The IPartitionKeyFromMessageExtractor implementation can be configured using the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessages(new CustomPartitionKeyFromMessageExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomPartitionKeyFromMessageExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Additional overloads are available for extracting PartitionKey.
Specifying the Container to use for the transaction
The container that is used by default for all incoming messages is specified via the DefaultContainer(. configuration API:
endpointConfiguration.UsePersistence<CosmosPersistence>()
.CosmosClient(new CosmosClient("ConnectionString"))
.DefaultContainer(
containerName: "ContainerName",
partitionKeyPath: "/partition/key/path");
Added in version 3.2.1: By default, message container extractors cannot override the configured default container. To allow extractors to override the default container, enable the EnableContainerFromMessageExtractor flag:
config.UsePersistence<CosmosPersistence>()
.EnableContainerFromMessageExtractor();
When this flag is enabled and multiple extractors are configured, the last extractor in the pipeline determines the final container. For example, if both a Header Extractor (physical stage) and a Message Extractor (logical stage) are configured, the Message Extractor takes precedence.
If an extractor fails to retrieve container information, the system falls back to the next available source in this order: Message Extractor → Header Extractor → configured default container. If no default container is configured and all extractors fail, an exception is thrown.
For users upgrading from version 3.1 or older: Enabling this flag may require you to migrate data from the default container to the container configured in the message extractor. This action prevents message duplication.
When installers are enabled, this (default) container will be created if it doesn't exist. To opt-out of creating the default container, either disable the installers or use DisableContainerCreation():
endpointConfiguration.UsePersistence<CosmosPersistence>()
.CosmosClient(new CosmosClient("ConnectionString"))
.DefaultContainer(
containerName: "ContainerName",
partitionKeyPath: "/partition/key/path")
.DisableContainerCreation();
Any other containers that are resolved by extracting partition information from incoming messages need to be manually created in Azure.
Optionally, the Container to use can specified during message processing by providing the Container name and partition key path using the ContainerInformation object.
When the container name and partition key path are provided during message processing it takes precedence over any default container configured.
Using message header values
The presence of a header value can be used to specify the container:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeader("ContainerKey", new ContainerInformation("ContainerName", new PartitionKeyPath("/partitionKey")));
A single message header value can be used to specify the container:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeader("ContainerKey", headerValue => new ContainerInformation(headerValue, new PartitionKeyPath("/partitionKey")));
Multiple message header values can also be used. Additionally overloads exist that allow a state object to be passed when the extractor is called to avoid unnecessary allocations:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeaders(headers => new ContainerInformation(headers["ContainerNameHeader"], new PartitionKeyPath("/partitionKeyPath")));
A custom class that implements IContainerInformationFromHeadersExtractor can be implemented to specify the Container using message headers:
public class CustomContainerInformationFromHeadersExtractor : IContainerInformationFromHeadersExtractor
{
public bool TryExtract(IReadOnlyDictionary<string, string> headers, out ContainerInformation? containerInformation)
{
if (headers.TryGetValue("ContainerInformationHeader", out var headerVal))
{
containerInformation = new ContainerInformation(headerVal, new PartitionKeyPath("/partitionKey"));
return true;
}
containerInformation = null;
return false;
}
}
The IContainerInformationFromHeadersExtractor implementation can be configured using the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeaders(new CustomContainerInformationFromHeadersExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomContainerInformationFromHeadersExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Besides those API methods shown here, additional overloads are available for extracting ContainerInformation from headers.
Using the message contents
A container can be specified on a per-message type basis:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<MyMessage>(new ContainerInformation("ContainerName", new PartitionKeyPath("/partitionKey")));
The message contents can be accessed to specify the container to use for the transaction:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<MyMessage>(message => new ContainerInformation(message.ItemId.ToString(), new PartitionKeyPath("/partitionKey")));
A custom class that implements IContainerInformationFromMessagesExtractor can be implemented that makes use of the messages and headers to specify the container to use for the transaction:
class CustomContainerInformationFromMessagesExtractor : IContainerInformationFromMessagesExtractor
{
public bool TryExtract(object message, IReadOnlyDictionary<string, string> headers, out ContainerInformation? containerInformation)
{
if (message is MyMessage myMessage)
{
containerInformation = new ContainerInformation("ContainerNameForMyMessage", new PartitionKeyPath("/partitionKeyPath"));
return true;
}
containerInformation = null;
return false;
}
}
The IContainerInformationFromMessagesExtractor implementation can be configured using the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage(new CustomContainerInformationFromMessagesExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomContainerInformationFromMessagesExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Additional overloads are available for extracting ContainerInformation from the message.
Sharing the transaction
It is possible to share a Cosmos DB TransactionalBatch between both the Saga persistence and business data. The shared TransactionalBatch can then be used to persist document updates for both concerns atomically.
The shared TransactionalBatch will not perform any actions when ExecuteAsync() is called. This allows NServiceBus to safely manage the unit of work. ExecuteAsync does not need to be called within the handler. All stream resources passed to the shared transaction will be properly disposed when NServiceBus executes the batch.
Within a handler method using IMessageHandlerContext
To use the shared TransactionalBatch in a message handler:
Testing
The TestableCosmosSynchronizedStorageSession class in the NServiceBus. namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.
With dependency injection
For custom types that require access to the shared TransactionalBatch:
class MyCustomDependency
{
private readonly TransactionalBatch transactionalBatch;
public MyCustomDependency(ICosmosStorageSession storageSession)
{
transactionalBatch = storageSession.Batch;
}
public void DeleteItemInCosmos(string itemId)
{
transactionalBatch.DeleteItem(itemId);
}
}
class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
{
this.customDependency = customDependency;
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
customDependency.DeleteItemInCosmos(message.ItemId);
return Task.CompletedTask;
}
private readonly MyCustomDependency customDependency;
}
And alternatively to using the the extension method IMessageHandlerContext.:
class MyHandler : IHandleMessages<MyMessage>
{
public MyHandler(ICosmosStorageSession storageSession)
{
transactionalBatch = storageSession.Batch;
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
transactionalBatch.DeleteItem(message.ItemId);
return Task.CompletedTask;
}
private readonly TransactionalBatch transactionalBatch;
}