By default, the persister does not attempt to atomically commit saga data and/or business data and uses the saga id as partition key to store sagas. Through the use of the Cosmos DB transactional batch API, saga data and/or business data can be atomically committed if everything is stored in the same partition within a container.
The Cosmos DB persistence provides the several ways to specify the partition and Container used per message using message headers or the message contents to allow the use of transactions.
Using message headers only has the advantage being able to identify the partition or Container
before the outbox logic is executed, which allows the outbox feature to work entirely as intended.
In the case where the partition or Container cannot be identified using only the message headers, the message contents can be used. This works because the Cosmos DB persistence introduces additional outbox logic to locate the outbox record and bypass the remaining message processing pipeline at a later stage of processing.
Specifying the PartitionKey
to use for the transaction
All operations in a Azure Cosmos DB transaction must be executed with documents contained in the same Container
partition, identified by the PartitionKey
.
Using message header values
A single message header value can be used to specify the PartitionKey
for the partition:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeader("PartitionKeyHeader");
Multiple message header values can also be used. Additionally overloads exist that allow a state object to be provided and passed when the extractor is called to avoid unnecessary allocations:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeaders(headers => new PartitionKey(headers["PartitionKeyHeader"]));
A custom class that implements IPartitionKeyFromHeadersExtractor
can be implemented to specify the PartitionKey
using message headers:
public class CustomPartitionKeyFromHeadersExtractor : IPartitionKeyFromHeadersExtractor
{
public bool TryExtract(IReadOnlyDictionary<string, string> headers, out PartitionKey? partitionKey)
{
if (headers.TryGetValue("PartitionKeyHeader", out var headerVal))
{
partitionKey = new PartitionKey(headerVal);
return true;
}
partitionKey = null;
return false;
}
}
The IPartitionKeyFromHeadersExtractor
implementation can be configured via the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeaders(new CustomPartitionKeyFromHeadersExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomPartitionKeyFromHeadersExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Besides those API methods shown here, additional overloads are available for extracting PartitionKey
.
Using the message contents
The message contents can be accessed to specify the PartitionKey
of the partition for the transaction:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessage<MyMessage>(message => new PartitionKey(message.ItemId));
A custom class that implements IPartitionKeyFromMessageExtractor
can be implemented that can access the message contents and headers to specify the partition to use for the transaction:
class CustomPartitionKeyFromMessageExtractor : IPartitionKeyFromMessageExtractor
{
public bool TryExtract(object message, IReadOnlyDictionary<string, string> headers, out PartitionKey? partitionKey)
{
if (message is MyMessage myMessage)
{
partitionKey = new PartitionKey(myMessage.ItemId);
return true;
}
partitionKey = null;
return false;
}
}
The IPartitionKeyFromMessageExtractor
implementation can be configured using the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessages(new CustomPartitionKeyFromMessageExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomPartitionKeyFromMessageExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Additional overloads are available for extracting PartitionKey
.
Specifying the Container
to use for the transaction
The Container to use can be specified by defining a default container:
Set the default container using the following configuration API:
endpointConfiguration.UsePersistence<CosmosPersistence>()
.CosmosClient(new CosmosClient("ConnectionString"))
.DefaultContainer(
containerName: "ContainerName",
partitionKeyPath: "/partition/key/path");
The container that is used by default for all incoming messages is specified via DefaultContainer(.
. When installers are enabled, this (default) container will be created if it doesn't exist.
To opt-out of creating the default container, either disable the installers or use
endpointConfiguration.UsePersistence<CosmosPersistence>()
.CosmosClient(new CosmosClient("ConnectionString"))
.DefaultContainer(
containerName: "ContainerName",
partitionKeyPath: "/partition/key/path")
.DisableContainerCreation();
Any other containers that are resolved by extracing partition information from incoming messages need to be manually created in Azure
Optionally, the Container
to use can specified during message processing by providing the Container
name and partition key path using the ContainerInformation
object.
When the container name and partition key path are provided during message processing it takes precedence over any default container configured.
Using message header values
The presence of a header value can be used to specify the container:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeader("ContainerKey", new ContainerInformation("ContainerName", new PartitionKeyPath("/partitionKey")));
A single message header value can be used to specify the container:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeader("ContainerKey", headerValue => new ContainerInformation(headerValue, new PartitionKeyPath("/partitionKey")));
Multiple message header values can also be used. Additionally overloads exist that allow a state object to be passed when the extractor is called to avoid unnecessary allocations:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeaders(headers => new ContainerInformation(headers["ContainerNameHeader"], new PartitionKeyPath("/partitionKeyPath")));
A custom class that implements IContainerInformationFromHeadersExtractor
can be implemented to specify the Container
using message headers:
public class CustomContainerInformationFromHeadersExtractor : IContainerInformationFromHeadersExtractor
{
public bool TryExtract(IReadOnlyDictionary<string, string> headers, out ContainerInformation? containerInformation)
{
if (headers.TryGetValue("ContainerInformationHeader", out var headerVal))
{
containerInformation = new ContainerInformation(headerVal, new PartitionKeyPath("/partitionKey"));
return true;
}
containerInformation = null;
return false;
}
}
The IContainerInformationFromHeadersExtractor
implementation can be configured using the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromHeaders(new CustomContainerInformationFromHeadersExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomContainerInformationFromHeadersExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Besides those API methods shown here, additional overloads are available for extracting ContainerInformation
from headers.
Using the message contents
A container can be specified on a per-message type basis:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<MyMessage>(new ContainerInformation("ContainerName", new PartitionKeyPath("/partitionKey")));
The message contents can be accessed to specify the container to use for the transaction:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<MyMessage>(message => new ContainerInformation(message.ItemId.ToString(), new PartitionKeyPath("/partitionKey")));
A custom class that implements IContainerInformationFromMessagesExtractor
can be implemented that makes use of the messages and headers to specify the container to use for the transaction:
class CustomContainerInformationFromMessagesExtractor : IContainerInformationFromMessagesExtractor
{
public bool TryExtract(object message, IReadOnlyDictionary<string, string> headers, out ContainerInformation? containerInformation)
{
if (message is MyMessage myMessage)
{
containerInformation = new ContainerInformation("ContainerNameForMyMessage", new PartitionKeyPath("/partitionKeyPath"));
return true;
}
containerInformation = null;
return false;
}
}
The IContainerInformationFromMessagesExtractor
implementation can be configured using the API:
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage(new CustomContainerInformationFromMessagesExtractor());
or registered via dependency injection:
endpointConfiguration.RegisterComponents(s => s.AddSingleton<CustomContainerInformationFromMessagesExtractor>());
Extractors registered via dependency injection containers are executed after extractors and extraction rules that are registered using the transaction API methods.
Additional overloads are available for extracting ContainerInformation
from the message.
Sharing the transaction
It is possible to share a Cosmos DB TransactionalBatch between both the Saga persistence and business data. The shared TransactionalBatch
can then be used to persist document updates for both concerns atomically.
The shared TransactionalBatch
will not perform any actions when ExecuteAsync()
is called. This allows NServiceBus to safely manage the unit of work. ExecuteAsync
does not need to be called within the handler. All stream resources passed to the shared transaction will be properly disposed when NServiceBus executes the batch.
Within a handler method using IMessageHandlerContext
To use the shared TransactionalBatch
in a message handler:
Testing
The TestableCosmosSynchronizedStorageSession
class in the NServiceBus.
namespace has been provided to facilitate testing a handler that utilizes the shared transaction feature.
With dependency injection
For custom types that require access to the shared TransactionalBatch
:
class MyCustomDependency
{
private readonly TransactionalBatch transactionalBatch;
public MyCustomDependency(ICosmosStorageSession storageSession)
{
transactionalBatch = storageSession.Batch;
}
public void DeleteItemInCosmos(string itemId)
{
transactionalBatch.DeleteItem(itemId);
}
}
class MyHandlerWithCustomDependency : IHandleMessages<MyMessage>
{
public MyHandlerWithCustomDependency(MyCustomDependency customDependency)
{
this.customDependency = customDependency;
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
customDependency.DeleteItemInCosmos(message.ItemId);
return Task.CompletedTask;
}
private readonly MyCustomDependency customDependency;
}
And alternatively to using the the extension method IMessageHandlerContext.
:
class MyHandler : IHandleMessages<MyMessage>
{
public MyHandler(ICosmosStorageSession storageSession)
{
transactionalBatch = storageSession.Batch;
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
transactionalBatch.DeleteItem(message.ItemId);
return Task.CompletedTask;
}
private readonly TransactionalBatch transactionalBatch;
}