Cosmos DB Persistence Usage with transactions

Component: NServiceBus.Persistence.CosmosDB
NuGet Package NServiceBus.Persistence.CosmosDB (1.x)
Target NServiceBus Version: 7.x

This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.



The shared message contracts used by all endpoints.


  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • Receive the OrderShipped message with a custom header.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

Persistence config

Configure the endpoint to use Cosmos DB Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Transactions.Server");

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint = https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.CosmosClient(new CosmosClient(connection));
persistence.DefaultContainer("Server", "/OrderId");

Because the order id is used as a partition key it has to be used in the partition key path as well.


Most messages implement IProvideOrderId and thus a logical behavior can use the provided order identification as a partition key.

class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
        if (context.Message.Instance is IProvideOrderId provideOrderId)
            var partitionKeyValue = provideOrderId.OrderId.ToString();

            Log.Info($"Found partition key '{partitionKeyValue}' from '{nameof(IProvideOrderId)}'");

            context.Extensions.Set(new PartitionKey(partitionKeyValue));

        return next();

    public class Registration : RegisterStep
        public Registration() :
                "Determines the PartitionKey from the logical message",
                b => new OrderIdAsPartitionKeyBehavior())

    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();

One handler publishes an event that doesn't implement IProvideOrderId but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation by participating in the transactional batch provided by NServiceBus.

public class ShipOrderHandler :
    public Task Handle(ShipOrder message, IMessageHandlerContext context)
        var orderShippingInformation = new OrderShippingInformation
            Id = Guid.NewGuid(),
            OrderId = message.OrderId,
            ShippedAt = DateTimeOffset.UtcNow,

        Store(orderShippingInformation, context);

        Log.Info($"Order Shipped. OrderId {message.OrderId}");

        var options = new PublishOptions();
        options.SetHeader("Sample.CosmosDB.Transaction.OrderId", message.OrderId.ToString());

        return context.Publish(new OrderShipped { OrderId = orderShippingInformation.OrderId, ShippingDate = orderShippingInformation.ShippedAt }, options);

    private static void Store(OrderShippingInformation orderShippingInformation, IMessageHandlerContext context)
        var session = context.SynchronizedStorageSession.CosmosPersistenceSession();
        var requestOptions = new TransactionalBatchItemRequestOptions
            EnableContentResponseOnWrite = false,

        session.Batch.CreateItem(orderShippingInformation, requestOptions);

    static ILog Log = LogManager.GetLogger<ShipOrderHandler>();

The header can be used to determine the partition key in the transport receive context

class OrderIdHeaderAsPartitionKeyBehavior : Behavior<ITransportReceiveContext>
    public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
        if (context.Message.Headers.TryGetValue("Sample.CosmosDB.Transaction.OrderId", out var orderId))
            Log.Info($"Found partition key '{orderId}' from header 'Sample.CosmosDB.Transaction'");

            context.Extensions.Set(new PartitionKey(orderId));

        return next();

    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();

Finally the above behaviors are registered in the pipeline.

endpointConfiguration.Pipeline.Register(new OrderIdHeaderAsPartitionKeyBehavior(), "Extracts a partition key from a header");
endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());

Order saga data

public class OrderSagaData :
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }

Order saga

public class OrderSaga :
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)

    public Task Handle(StartOrder message, IMessageHandlerContext context)
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
            OrderId = message.OrderId

        Log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
            OrderDescription = orderDescription,
            OrderId = Data.OrderId,

        return Task.WhenAll(
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)

    public Task Handle(OrderShipped message, IMessageHandlerContext context)
        Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
        return Task.CompletedTask;

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
        Log.Info($"Saga with OrderId {Data.OrderId} completed");
        var orderCompleted = new OrderCompleted
            OrderId = Data.OrderId
        return context.Publish(orderCompleted);

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified