This sample shows a client/server scenario using a dynamic table configuration for certain saga types with a fallback to the default table.
Prerequisites
Ensure that an instance of the latest Azure Storage Emulator or Azure Cosmos DB Emulator is running.
Projects
SharedMessages
- The shared message contracts used by all endpoints.
Client
- Sends the
StartOrdermessage toServer. - Receives and handles the
OrderCompletedevent.
Server
- Receive the
StartOrdermessage and initiate anOrderSaga. OrderSagasends aShipOrdercommand toShipOrderSagaShipOrderSagarequests a timeout with an instance ofCompleteOrderwith the saga data.ShipOrderSagareplies withCompleteOrderwhen theCompleteOrdertimeout fires.OrderSagapublishes anOrderCompletedevent when theCompleteOrdermessage arrives.
Persistence config
Configure the endpoint to use Azure Table Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Table.Server");
var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();
var connection = useStorageTable ? "UseDevelopmentStorage=true" :
"TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
var account = CloudStorageAccount.Parse(connection);
var cloudTableClient = account.CreateCloudTableClient();
persistence.UseCloudTableClient(cloudTableClient);
persistence.DefaultTable("OrderSagaData");
In the non-transactional mode the saga id is used as a partition key.
Behaviors
For all messages destined to go to the ShipOrderSaga the table is overridden at runtime to use the ShipOrderSagaData table.
class BehaviorProvidingDynamicTable : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is ShipOrder ||
context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
{
Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' table.");
context.Extensions.Set(new TableInformation("ShipOrderSagaData"));
}
else
{
Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(OrderSaga)}' will the default table.");
}
return next();
}
static readonly ILog Log = LogManager.GetLogger<BehaviorProvidingDynamicTable>();
}
The behavior needs to be registered in the pipeline
endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicTable(), "Provides a non-default table for sagas started by ship order message");
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
static ILog log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.OrderId = message.OrderId;
Data.OrderDescription = $"The saga for order {message.OrderId}";
log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
return context.SendLocal(shipOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}
ShipOrder saga data
public class ShipOrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
}
ShipOrder saga
public class ShipOrderSaga :
Saga<ShipOrderSagaData>,
IAmStartedByMessages<ShipOrder>,
IHandleTimeouts<CompleteOrder>
{
static ILog log = LogManager.GetLogger<ShipOrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
}
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
log.Info($"Order Shipped. OrderId {message.OrderId}");
Data.OrderId = message.OrderId;
log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder();
return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} about to complete");
MarkAsComplete();
state.OrderId = Data.OrderId;
return ReplyToOriginator(context, state);
}
}