This sample shows a client/server scenario using a dynamic container configuration for certain saga types with a fallback to the default container.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrder
message toServer
. - Receives and handles the
OrderCompleted
event.
Server projects
- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
sends aShipOrder
command toShipOrderSaga
ShipOrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.ShipOrderSaga
replies withCompleteOrder
when theCompleteOrder
timeout fires.OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
message arrives.
Persistence config
Configure the endpoint to use CosmosDB Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Container.Server");
var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint=https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Container");
var cosmosClient = new CosmosClient(connection);
persistence.CosmosClient(cosmosClient);
persistence.DefaultContainer("OrderSagaData", "/id");
In the non-transactional mode the saga id is used as a partition key and thus the container needs to use /
as the partition key path.
Behaviors
For all messages destined to go to the ShipOrderSaga
the container is overridden at runtime to use ShipOrderSagaData
container.
class BehaviorProvidingDynamicContainer : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is ShipOrder ||
context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
{
Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' container.");
context.Extensions.Set(new ContainerInformation("ShipOrderSagaData", new PartitionKeyPath("/id")));
}
else
{
Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(OrderSaga)}' will the default container.");
}
return next();
}
static readonly ILog Log = LogManager.GetLogger<BehaviorProvidingDynamicContainer>();
}
The behavior needs to be registered in the pipeline
endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicContainer(), "Provides a non-default container for sagas started by ship order message");
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
static ILog log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.OrderDescription = $"The saga for order {message.OrderId}";
log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
return context.SendLocal(shipOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}
ShipOrder saga data
public class ShipOrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
}
ShipOrder saga
public class ShipOrderSaga :
Saga<ShipOrderSagaData>,
IAmStartedByMessages<ShipOrder>,
IHandleTimeouts<CompleteOrder>
{
static ILog log = LogManager.GetLogger<ShipOrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
}
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
log.Info($"Order Shipped. OrderId {message.OrderId}");
Data.OrderId = message.OrderId;
log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder();
return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} about to complete");
MarkAsComplete();
state.OrderId = Data.OrderId;
return ReplyToOriginator(context, state);
}
}