CosmosDB Persistence Usage with non-default container

Component: NServiceBus.Persistence.CosmosDB
NuGet Package NServiceBus.Persistence.CosmosDB (0.x)
This is a Preview project
Target NServiceBus Version: 7.x

This sample shows a client/server scenario using a dynamic container configuration for certain saga types with a fallback to the default container.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga sends a ShipOrder command to ShipOrderSaga
  • ShipOrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • ShipOrderSaga replies with CompleteOrder when the CompleteOrder timeout fires.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder message arrives.

Persistence config

Configure the endpoint to use CosmosDB Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Container.Server");

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Container");
var cosmosClient = new CosmosClient(connection);
persistence.CosmosClient(cosmosClient);
persistence.DefaultContainer("OrderSagaData", "/id");

In the non-transactional mode the saga id is used as a partition key and thus the container needs to use /id as the partition key path.

Behaviors

For all messages destined to go to the ShipOrderSaga the container is overridden at runtime to use ShipOrderSagaData container.

class BehaviorProvidingDynamicContainer : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is ShipOrder ||
            context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
        {
            Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' container.");

            context.Extensions.Set(new ContainerInformation("ShipOrderSagaData", new PartitionKeyPath("/id")));
        }
        else
        {
            Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(OrderSaga)}' will the default container.");
        }

        return next();
    }

    static readonly ILog Log = LogManager.GetLogger<BehaviorProvidingDynamicContainer>();
}

The behavior needs to be registered in the pipeline

endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicContainer(), "Provides a non-default container for sagas started by ship order message");

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessage<CompleteOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.OrderDescription = $"The saga for order {message.OrderId}";

        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        return context.SendLocal(shipOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

ShipOrder saga data

public class ShipOrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
}

ShipOrder saga

public class ShipOrderSaga :
    Saga<ShipOrderSagaData>,
    IAmStartedByMessages<ShipOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<ShipOrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
    }

    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        log.Info($"Order Shipped. OrderId {message.OrderId}");
        Data.OrderId = message.OrderId;

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder();
        return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} about to complete");
        MarkAsComplete();

        state.OrderId = Data.OrderId;

        return ReplyToOriginator(context, state);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified