This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrder
message toServer
. - Receives and handles the
OrderCompleted
event.
Server projects
- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.- Receive the
OrderShipped
message with a custom header. OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
timeout fires.
Persistence config
Configure the endpoint to use Cosmos DB Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Transactions.Server");
endpointConfiguration.EnableOutbox();
var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint = https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Transactions");
persistence.CosmosClient(new CosmosClient(connection));
persistence.DefaultContainer("Server", "/OrderId");
Because the order id is used as a partition key it has to be used in the partition key path as well.
Transaction Information
Most messages implement IProvideOrderId
and thus it is possible to use the provided order identification as a partition key.
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromMessage<IProvideOrderId>(provideOrderId =>
{
Log.Info($"Found partition key '{provideOrderId.OrderId}' from '{nameof(IProvideOrderId)}'");
return new PartitionKey(provideOrderId.OrderId.ToString());
});
One handler publishes an event that doesn't implement IProvideOrderId
but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation
by participating in the transactional batch provided by NServiceBus.
transactionInformation.ExtractPartitionKeyFromHeader("Sample.CosmosDB.Transaction.OrderId", orderId =>
{
Log.Info($"Found partition key '{orderId}' from header 'Sample.CosmosDB.Transaction'");
return orderId;
});
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<OrderShipped>,
IHandleTimeouts<CompleteOrder>
{
static readonly ILog Log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessageHeader<OrderShipped>("Sample.CosmosDB.Transaction.OrderId");
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
Log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription,
OrderId = Data.OrderId,
};
return Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Handle(OrderShipped message, IMessageHandlerContext context)
{
Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
return Task.CompletedTask;
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
Log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}