Cosmos DB Persistence Usage with transactions

NuGet Package: NServiceBus.Persistence.CosmosDB (2-pre)
Target Version: NServiceBus 8.x
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • Receive the OrderShipped message with a custom header.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

Persistence config

Configure the endpoint to use Cosmos DB Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Transactions.Server");
endpointConfiguration.EnableOutbox();

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint = https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Transactions");
persistence.CosmosClient(new CosmosClient(connection));
persistence.DefaultContainer("Server", "/OrderId");

Because the order id is used as a partition key it has to be used in the partition key path as well.

Transaction Information

Most messages implement IProvideOrderId and thus it is possible to use the provided order identification as a partition key.

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessage<IProvideOrderId>(provideOrderId =>
{
    Log.Info($"Found partition key '{provideOrderId.OrderId}' from '{nameof(IProvideOrderId)}'");
    return new PartitionKey(provideOrderId.OrderId.ToString());
});

One handler publishes an event that doesn't implement IProvideOrderId but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation by participating in the transactional batch provided by NServiceBus.

transactionInformation.ExtractPartitionKeyFromHeader("Sample.CosmosDB.Transaction.OrderId", orderId =>
{
    Log.Info($"Found partition key '{orderId}' from header 'Sample.CosmosDB.Transaction'");
    return orderId;
});

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<OrderShipped>,
    IHandleTimeouts<CompleteOrder>
{
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessageHeader<OrderShipped>("Sample.CosmosDB.Transaction.OrderId");
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        Log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription,
            OrderId = Data.OrderId,
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Handle(OrderShipped message, IMessageHandlerContext context)
    {
        Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
        return Task.CompletedTask;
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        Log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified