This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrdermessage toServer. - Receives and handles the
OrderCompletedevent.
Server projects
- Receive the
StartOrdermessage and initiate anOrderSaga. OrderSagarequests a timeout with an instance ofCompleteOrderwith the saga data.- Receive the
OrderShippedmessage with a custom header. OrderSagapublishes anOrderCompletedevent when theCompleteOrdertimeout fires.
Persistence config
Configure the endpoint to use Cosmos DB Persistence. The sample is configured to use the Docker CosmosDB emulator if a connection string is not found in the COSMOS_CONNECTION_STRING environment variable.
var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Transactions.Server");
endpointConfiguration.EnableOutbox();
var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = Environment.GetEnvironmentVariable("COSMOS_CONNECTION_STRING") ??
"""AccountEndpoint = http://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==""";
persistence.DatabaseName("Samples.CosmosDB.Transactions");
persistence.CosmosClient(new CosmosClient(connection, new CosmosClientOptions {
ConnectionMode = ConnectionMode.Gateway // prevents a BadRequest error on CosmosDB installation to local emulators
}));
// Because OrderId is used as a partition key, it has to be used in the partition key path as well.
persistence.DefaultContainer("Server", "/OrderId");
Transaction Information
Most messages implement IProvideOrderId and thus it is possible to use the provided order identification (OrderId) as a partition key.
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessage<IProvideOrderId>(provideOrderId =>
{
logger.LogInformation($"Found partition key '{provideOrderId.OrderId}' from '{nameof(IProvideOrderId)}'");
return new PartitionKey(provideOrderId.OrderId.ToString());
});
One handler publishes an event that doesn't implement IProvideOrderId but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation by participating in the transactional batch provided by NServiceBus.
transactionInformation.ExtractPartitionKeyFromHeader("Sample.CosmosDB.Transaction.OrderId", orderId =>
{
logger.LogInformation($"Found partition key '{orderId}' from header 'Sample.CosmosDB.Transaction.OrderId'");
return orderId;
});
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga(ILogger<OrderSaga> logger) :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<OrderShipped>,
IHandleTimeouts<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessageHeader<OrderShipped>("Sample.CosmosDB.Transaction.OrderId");
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
logger.LogInformation($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
logger.LogInformation("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription,
OrderId = Data.OrderId,
};
return Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Handle(OrderShipped message, IMessageHandlerContext context)
{
logger.LogInformation($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
return Task.CompletedTask;
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
logger.LogInformation($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}