This sample shows a client/server scenario using a dynamic container configuration for certain saga types with a fallback to the default container.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrdermessage toServer. - Receives and handles the
OrderCompletedevent.
Server projects
- Receive the
StartOrdermessage and initiate anOrderSaga. OrderSagasends aShipOrdercommand toShipOrderSagaShipOrderSagarequests a timeout with an instance ofCompleteOrderwith the saga data.ShipOrderSagareplies withCompleteOrderwhen theCompleteOrdertimeout fires.OrderSagapublishes anOrderCompletedevent when theCompleteOrdermessage arrives.
Persistence config
Configure the endpoint to use Cosmos DB Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Container.Server");
var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
// Get connection string from environment variable, fallback to local emulator
var connection = Environment.GetEnvironmentVariable("COSMOS_CONNECTION_STRING")
?? @"AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
logger.LogInformation($"Using Cosmos DB connection: {(Environment.GetEnvironmentVariable("COSMOS_CONNECTION_STRING") != null ? "Azure service" : "Local emulator")}");
persistence.DatabaseName("Samples.CosmosDB.Container");
var cosmosClient = new CosmosClient(connection);
persistence.CosmosClient(cosmosClient);
persistence.DefaultContainer("OrderSagaData", "/id");
In the non-transactional mode the saga id is used as a partition key and thus the container needs to use / as the partition key path.
Container Mapping
For ShipOrder messages destined to go to the ShipOrderSaga the container is overridden at runtime to use ShipOrderSagaData container.
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractContainerInformationFromMessage<ShipOrder>(m =>
{
logger.LogInformation($"Message '{m.GetType().AssemblyQualifiedName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' container.");
return new ContainerInformation("ShipOrderSagaData", new PartitionKeyPath("/id"));
});
For all messages that have a saga type header ShipOrderSaga the container is overriden to use ShipOrderSagaData container too.
transactionInformation.ExtractContainerInformationFromHeaders(headers =>
{
if (headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
{
logger.LogInformation($"Message '{headers[Headers.EnclosedMessageTypes]}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' container.");
return new ContainerInformation("ShipOrderSagaData", new PartitionKeyPath("/id"));
}
return null;
});
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga(ILogger<OrderSaga> logger) :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.OrderDescription = $"The saga for order {message.OrderId}";
logger.LogInformation($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
return context.SendLocal(shipOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
logger.LogInformation($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}
ShipOrder saga data
public class ShipOrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
}
ShipOrder saga
public class ShipOrderSaga(ILogger<ShipOrderSaga> logger) :
Saga<ShipOrderSagaData>,
IAmStartedByMessages<ShipOrder>,
IHandleTimeouts<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
}
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
logger.LogInformation($"Order Shipped. OrderId {message.OrderId}");
Data.OrderId = message.OrderId;
logger.LogInformation("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder();
return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
logger.LogInformation($"Saga with OrderId {Data.OrderId} about to complete");
MarkAsComplete();
state.OrderId = Data.OrderId;
return ReplyToOriginator(context, state);
}
}