Azure Cosmos DB Persistence

Source
NuGet Package NServiceBus.Persistence.CosmosDB (0.x) | License
This is a Preview project.
Target NServiceBus Version: 7.x

Uses the Azure Cosmos DB NoSQL database service for storage.

It is important to read and understand partitioning in Azure Cosmos DB before using NServiceBus.Persistence.CosmosDB.

Persistence at a glance

For a description of each feature, see the persistence at a glance legend.

Feature
Supported storage typesSagas, Outbox
TransactionsUsing TransactionalBatch, with caveats
Concurrency controlOptimistic concurrency
Scripted deploymentNot supported
InstallersContainer is created by installers.
The Outbox feature requires partition planning.

Usage

Add a NuGet package reference to NServiceBus.Persistence.CosmosDB. Configure the endpoint to use the persistence through the following configuration API:

endpointConfiguration.UsePersistence<CosmosPersistence>()
    .CosmosClient(new CosmosClient("ConnectionString"));

Customizing the database used

By default, the persister will store records in a database named NServiceBus and use a container per endpoint using the endpoint name as to name the container.

Customize the database name using the following configuration API:

endpointConfiguration.UsePersistence<CosmosPersistence>()
    .CosmosClient(new CosmosClient("ConnectionString"))
    .DatabaseName("DatabaseName");

Customizing the container used

Setting the default container used using the following configuration API

endpointConfiguration.UsePersistence<CosmosPersistence>()
    .CosmosClient(new CosmosClient("ConnectionString"))
    .DefaultContainer(
        containerName: "ContainerName",
        partitionKeyPath: "/partition/key/path");

When installers are enabled the default container will be created if it doesn't exist. To opt-out from creating the default container either disable the installers or use

endpointConfiguration.UsePersistence<CosmosPersistence>()
    .CosmosClient(new CosmosClient("ConnectionString"))
    .DefaultContainer(
        containerName: "ContainerName",
        partitionKeyPath: "/partition/key/path")
    .DisableContainerCreation();

Advanced container customization

When the container name and partition key path are provided at runtime it takes precedence over any default container configured using configuration API. The container specified at runtime has to exist and be configured properly to order to work.

The container name can be provided using a custom behavior at the physical stage

class ContainerInfoTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(
            new ContainerInformation(
                containerName: "containerName",
                partitionKeyPath: new PartitionKeyPath("partitionKeyPath")));

        await next().ConfigureAwait(false);
    }
}

or at the logical stage

class ContainerInfoLogicalReceiveContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(
            new ContainerInformation(
                containerName: "containerName",
                partitionKeyPath: new PartitionKeyPath("partitionKeyPath")));

        await next().ConfigureAwait(false);
    }
}

Customizing the CosmosClient provider

In cases when the CosmosClient is configured and used via dependency injection a custom provider can be implemented

class CustomCosmosClientProvider
    : IProvideCosmosClient
{
    // get fully configured via DI
    public CustomCosmosClientProvider(CosmosClient cosmosClient)
    {
        Client = cosmosClient;
    }
    public CosmosClient Client { get; }
}

and registered on the container

endpointConfiguration.RegisterComponents(c => c.ConfigureComponent<CustomCosmosClientProvider>(DependencyLifecycle.SingleInstance));

Transactions

The Cosmos DB persister supports using the Cosmos DB transactional batch API. However, Cosmos DB only allows operations to be batched if all operations are performed within the same logical partition key. This is due to the distributed nature of the Cosmos DB service, which does not support distributed transactions.

The transactions documentation provides additional details on how to configure NServiceBus to resolve the incoming message to a specific partition key to take advantage of this Cosmos DB feature.

Outbox cleanup

When the outbox is enabled, the deduplication data is kept for seven days by default. To customize this time frame, use the following API:

var outbox = endpointConfiguration.EnableOutbox();
outbox.TimeToKeepOutboxDeduplicationData(TimeSpan.FromDays(7));

Saga concurrency

Due to the distributed nature of Cosmos DB optimistic concurrency control is always used when updating or deleting saga data. When simultaneously handling messages, conflicts may occur. See below for examples of the exceptions which are thrown. Saga concurrency explains how these conflicts are handled, and contains guidance for high-load scenarios.

This means that the relevant Handle method on the saga will be invoked, even though the message might be later rolled back. Hence it is important to ensure not to perform any work in saga handlers that can't roll back together with the message. This also means that should there be high levels of concurrency there will be N-1 rollbacks where N is the number of concurrent messages. This can cause throughput issues and might require design changes.

Starting a saga

Example exception:

The 'OrderSagaData' saga with id '7ac4d199-6560-4d1a-b83a-b3dad94b0802' could not be created possibly due to a concurrency conflict.

Updating or deleting saga data

Example exception:

The 'OrderSagaData' saga with id '7ac4d199-6560-4d1a-b83a-b3dad94b0802' was updated by another process or no longer exists.

Samples

Related Articles


Last modified