Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Modernization
Samples

Simple Cosmos DB Persistence Usage

NuGet Package: NServiceBus.Persistence.CosmosDB (3.x)
Target Version: NServiceBus 9.x

This sample shows a client/server scenario using non-transactional saga persistence.

Prerequisites

Ensure that an instance of the latest Azure Cosmos DB Emulator is running.

Sample structure

This sample contains three projects, SharedMessages, Client and Server.

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

Implementation highlights

Persistence config

In Program.cs of the Server project, the endpoint is configured to use Cosmos DB Persistence:

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Simple.Server");

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = Environment.GetEnvironmentVariable("COSMOS_CONNECTION_STRING") 
    ?? @"AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Simple");
persistence.CosmosClient(new CosmosClient(connection));
persistence.DefaultContainer("Server", "/id");

In the non-transactional mode, the saga id is used as a partition key, and thus, the container needs to use /id as the partition key path.

Order saga data

The data stored on the saga is defined on the OrderSagaData.cs file inside the Server project:

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

The handler for this data is on the OrderSaga.cs file on the Server project:

public class OrderSaga(ILogger<OrderSaga> logger) :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
   
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId).ToMessage<StartOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        logger.LogInformation($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        logger.LogInformation("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription,
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        logger.LogInformation($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.