This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions. The Saga ID is used as a partition key.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrder
message toServer
. - Receives and handles the
OrderCompleted
event.
Server projects
- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.- Receive the
OrderShipped
message with a custom header. OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
timeout fires.
Persistence config
Configure the endpoint to use Azure Table Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Transactions.Server");
endpointConfiguration.EnableOutbox();
var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
var connection = useStorageTable ? "UseDevelopmentStorage=true" :
"TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
var account = CloudStorageAccount.Parse(connection);
var cloudTableClient = account.CreateCloudTableClient();
persistence.UseCloudTableClient(cloudTableClient);
persistence.DefaultTable("Server");
The order id is used to derive the saga id from.
Behaviors
Most messages implement IProvideOrderId
. By default Saga IDs are deterministically derived from the saga data, the correlation property name and the correlation property value. IProvidePartitionKeyFromSagaId
is a helper that can be injected into behaviors in the logical pipeline stage if the Saga ID should be used as a partition key.
class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
public OrderIdAsPartitionKeyBehavior(IProvidePartitionKeyFromSagaId partitionKeyFromSagaId)
{
partitionKeyFromSagaId1 = partitionKeyFromSagaId;
}
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var correlationProperty = SagaCorrelationProperty.None;
if (context.Message.Instance is IProvideOrderId provideOrderId)
{
var partitionKeyValue = provideOrderId.OrderId;
correlationProperty = new SagaCorrelationProperty("OrderId", partitionKeyValue);
}
await partitionKeyFromSagaId1.SetPartitionKey<OrderSagaData>(context, correlationProperty);
if (context.Headers.TryGetValue(Headers.SagaId, out var sagaIdHeader))
{
Log.Info($"Saga Id Header: {sagaIdHeader}");
}
if (context.Extensions.TryGet<TableInformation>(out var tableInformation))
{
Log.Info($"Table Information: {tableInformation.TableName}");
}
Log.Info($"Found partition key '{context.Extensions.Get<TableEntityPartitionKey>().PartitionKey}' from '{nameof(IProvideOrderId)}'");
await next();
}
public class Registration : RegisterStep
{
public Registration() :
base(nameof(OrderIdAsPartitionKeyBehavior),
typeof(OrderIdAsPartitionKeyBehavior),
"Determines the PartitionKey from the logical message",
b => new OrderIdAsPartitionKeyBehavior(b.Build<IProvidePartitionKeyFromSagaId>()))
{
InsertBefore(nameof(LogicalOutboxBehavior));
}
}
IProvidePartitionKeyFromSagaId partitionKeyFromSagaId1;
static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}
One handler replies with a message that doesn't implement IProvideOrderId
. Transactionality can still be achieved because messages that are part of a saga conversation flow will get the Saga ID set as a header. In such cases no correlation property information needs to be extracted to derive the Saga ID from.
Finally the above behavior is registered in the pipeline.
endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<OrderShipped>,
IHandleTimeouts<CompleteOrder>
{
static readonly ILog Log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
Log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription,
OrderId = Data.OrderId,
};
return Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Handle(OrderShipped message, IMessageHandlerContext context)
{
Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
return Task.CompletedTask;
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
Log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}