This sample shows a client/server scenario using a dynamic table configuration for certain saga types with a fallback to the default table.
Prerequisites
Ensure that an instance of the latest Azure Storage Emulator or Azure Cosmos DB Emulator is running.
Projects
SharedMessages
- The shared message contracts used by all endpoints.
Client
- Sends the
StartOrdermessage toServer. - Receives and handles the
OrderCompletedevent.
Server
- Receive the
StartOrdermessage and initiate anOrderSaga. OrderSagasends aShipOrdercommand toShipOrderSagaShipOrderSagarequests a timeout with an instance ofCompleteOrderwith the saga data.ShipOrderSagareplies withCompleteOrderwhen theCompleteOrdertimeout fires.OrderSagapublishes anOrderCompletedevent when theCompleteOrdermessage arrives.
Persistence config
Configure the endpoint to use Azure Table Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Table.Server");
var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
var connection = useStorageTable ? "UseDevelopmentStorage=true" :
"TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
var tableServiceClient = new TableServiceClient(connection);
persistence.UseTableServiceClient(tableServiceClient);
persistence.DefaultTable("OrderSagaData");
In the non-transactional mode the saga id is used as a partition key.
Behaviors
For all messages destined to go to the ShipOrderSaga the table is overridden at runtime to use the ShipOrderSagaData table.
class BehaviorProvidingDynamicTable(ILogger<BehaviorProvidingDynamicTable> logger) : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is ShipOrder ||
context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
{
logger.LogInformation("Message '{MessageType}' destined to be handled by '{SagaType}' will use '{TableName}' table.", context.Message.MessageType.FullName, nameof(ShipOrderSaga), "ShipOrderSagaData");
context.Extensions.Set(new TableInformation("ShipOrderSagaData"));
}
else
{
logger.LogInformation("Message '{MessageType}' destined to be handled by '{SagaType}' will use the default table.", context.Message.MessageType.FullName, nameof(OrderSaga));
}
return next();
}
}
The behavior needs to be registered in the pipeline
var serviceProvider = builder.Services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILogger<BehaviorProvidingDynamicTable>>();
endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicTable(logger), "Provides a non-default table for sagas started by ship order message");
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga(ILogger<OrderSaga> logger) :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.OrderDescription = $"The saga for order {message.OrderId}";
logger.LogInformation("Received StartOrder message {OrderId}. Starting Saga", Data.OrderId);
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
return context.SendLocal(shipOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
logger.LogInformation("Saga with OrderId {OrderId} completed", Data.OrderId);
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}
ShipOrder saga data
public class ShipOrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
}
ShipOrder saga
public class ShipOrderSaga(ILogger<ShipOrderSaga> logger) :
Saga<ShipOrderSagaData>,
IAmStartedByMessages<ShipOrder>,
IHandleTimeouts<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
}
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
logger.LogInformation("Order Shipped. OrderId {OrderId}", message.OrderId);
Data.OrderId = message.OrderId;
logger.LogInformation("Order will complete in {Seconds} seconds", 5);
var timeoutData = new CompleteOrder();
return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
logger.LogInformation("Saga with OrderId {OrderId} about to complete", Data.OrderId);
MarkAsComplete();
state.OrderId = Data.OrderId;
return ReplyToOriginator(context, state);
}
}