Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Cosmos DB Persistence Usage with non-default container

NuGet Package: NServiceBus.Persistence.CosmosDB (2.x)
Target Version: NServiceBus 8.x

This sample shows a client/server scenario using a dynamic container configuration for certain saga types with a fallback to the default container.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga sends a ShipOrder command to ShipOrderSaga
  • ShipOrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • ShipOrderSaga replies with CompleteOrder when the CompleteOrder timeout fires.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder message arrives.

Persistence config

Configure the endpoint to use Cosmos DB Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Container.Server");

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Container");
var cosmosClient = new CosmosClient(connection);
persistence.CosmosClient(cosmosClient);
persistence.DefaultContainer("OrderSagaData", "/id");

In the non-transactional mode the saga id is used as a partition key and thus the container needs to use /id as the partition key path.

Container Mapping

For ShipOrder messages destined to go to the ShipOrderSaga the container is overridden at runtime to use ShipOrderSagaData container.

var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<ShipOrder>(m =>
{
    Log.Info($"Message '{m.GetType().AssemblyQualifiedName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' container.");
    return new ContainerInformation("ShipOrderSagaData", new PartitionKeyPath("/id"));
});

For all messages that have a saga type header ShipOrderSaga the container is overriden to use ShipOrderSagaData container too.

transactionInformation.ExtractContainerInformationFromHeaders(headers =>
{
    if (headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
    {
        Log.Info($"Message '{headers[Headers.EnclosedMessageTypes]}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' container.");

        return new ContainerInformation("ShipOrderSagaData", new PartitionKeyPath("/id"));
    }
    return null;
});

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessage<CompleteOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.OrderDescription = $"The saga for order {message.OrderId}";

        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        return context.SendLocal(shipOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

ShipOrder saga data

public class ShipOrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
}

ShipOrder saga

public class ShipOrderSaga :
    Saga<ShipOrderSagaData>,
    IAmStartedByMessages<ShipOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<ShipOrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
    }

    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        log.Info($"Order Shipped. OrderId {message.OrderId}");
        Data.OrderId = message.OrderId;

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder();
        return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} about to complete");
        MarkAsComplete();

        state.OrderId = Data.OrderId;

        return ReplyToOriginator(context, state);
    }
}

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.