CosmosDB Persistence Usage with transactions

Component: NServiceBus.Persistence.CosmosDB
NuGet Package NServiceBus.Persistence.CosmosDB (0.x)
This is a Preview project
Target NServiceBus Version: 7.x

This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • Receive the OrderShipped message with a custom header.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

Persistence config

Configure the endpoint to use CosmosDB Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Transactions.Server");
endpointConfiguration.EnableOutbox();

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint = https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Transactions");
persistence.CosmosClient(new CosmosClient(connection));
persistence.DefaultContainer("Server", "/OrderId");

Because the order id is used as a partition key it has to be used in the partition key path as well.

Behaviors

Most messages implement IProvideOrderId and thus a logical behavior can use the provided order identification as a partition key.

class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is IProvideOrderId provideOrderId)
        {
            var partitionKeyValue = provideOrderId.OrderId.ToString();

            Log.Info($"Found partition key '{partitionKeyValue}' from '{nameof(IProvideOrderId)}'");

            context.Extensions.Set(new PartitionKey(partitionKeyValue));
        }

        return next();
    }

    public class Registration : RegisterStep
    {
        public Registration() :
            base(nameof(OrderIdAsPartitionKeyBehavior),
                typeof(OrderIdAsPartitionKeyBehavior),
                "Determines the PartitionKey from the logical message",
                b => new OrderIdAsPartitionKeyBehavior())
        {
            InsertBefore(nameof(LogicalOutboxBehavior));
        }
    }

    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}

One handler publishes an event that doesn't implement IProvideOrderId but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation by participating in the transactional batch provided by NServiceBus.

public class ShipOrderHandler :
    IHandleMessages<ShipOrder>
{
    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        var orderShippingInformation = new OrderShippingInformation
        {
            Id = Guid.NewGuid(),
            OrderId = message.OrderId,
            ShippedAt = DateTimeOffset.UtcNow,
        };

        Store(orderShippingInformation, context);

        Log.Info($"Order Shipped. OrderId {message.OrderId}");

        var options = new PublishOptions();
        options.SetHeader("Sample.CosmosDB.Transaction.OrderId", message.OrderId.ToString());

        return context.Publish(new OrderShipped { OrderId = orderShippingInformation.OrderId, ShippingDate = orderShippingInformation.ShippedAt }, options);
    }

    private static void Store(OrderShippingInformation orderShippingInformation, IMessageHandlerContext context)
    {
        var session = context.SynchronizedStorageSession.CosmosPersistenceSession();
        var requestOptions = new TransactionalBatchItemRequestOptions
        {
            EnableContentResponseOnWrite = false,
        };

        session.Batch.CreateItem(orderShippingInformation, requestOptions);
    }

    static ILog Log = LogManager.GetLogger<ShipOrderHandler>();
}

The header can be used to determine the partition key in the transport receive context

class OrderIdHeaderAsPartitionKeyBehavior : Behavior<ITransportReceiveContext>
{
    public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        if (context.Message.Headers.TryGetValue("Sample.CosmosDB.Transaction.OrderId", out var orderId))
        {
            Log.Info($"Found partition key '{orderId}' from header 'Sample.CosmosDB.Transaction'");

            context.Extensions.Set(new PartitionKey(orderId));
        }

        return next();
    }

    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}

Finally the above behaviors are registered in the pipeline.

endpointConfiguration.Pipeline.Register(new OrderIdHeaderAsPartitionKeyBehavior(), "Extracts a partition key from a header");
endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<OrderShipped>,
    IHandleTimeouts<CompleteOrder>
{
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessageHeader<OrderShipped>("Sample.CosmosDB.Transaction.OrderId");
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        Log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription,
            OrderId = Data.OrderId,
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Handle(OrderShipped message, IMessageHandlerContext context)
    {
        Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
        return Task.CompletedTask;
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        Log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified