This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrder
message toServer
. - Receives and handles the
OrderCompleted
event.
Server projects
- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.- Receive the
OrderShipped
message with a custom header. OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
timeout fires.
Persistence config
Configure the endpoint to use Azure Table Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Transactions.Server");
endpointConfiguration.EnableOutbox();
var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
var connection = useStorageTable ? "UseDevelopmentStorage=true" :
"TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
var account = CloudStorageAccount.Parse(connection);
var cloudTableClient = account.CreateCloudTableClient();
persistence.UseCloudTableClient(cloudTableClient);
persistence.DefaultTable("Server");
The order id is used as a partition key.
Behaviors
Most messages implement IProvideOrderId
and thus a logical behavior can use the provided order identification as a partition key.
class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
public OrderIdAsPartitionKeyBehavior(IProvidePartitionKeyFromSagaId partitionKeyFromSagaId)
{
partitionKeyFromSagaId1 = partitionKeyFromSagaId;
}
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var correlationProperty = SagaCorrelationProperty.None;
if (context.Message.Instance is IProvideOrderId provideOrderId)
{
var partitionKeyValue = provideOrderId.OrderId;
correlationProperty = new SagaCorrelationProperty("OrderId", partitionKeyValue);
}
await partitionKeyFromSagaId1.SetPartitionKey<OrderSagaData>(context, correlationProperty);
if (context.Headers.TryGetValue(Headers.SagaId, out var sagaIdHeader))
{
Log.Info($"Saga Id Header: {sagaIdHeader}");
}
if (context.Extensions.TryGet<TableInformation>(out var tableInformation))
{
Log.Info($"Table Information: {tableInformation.TableName}");
}
Log.Info($"Found partition key '{context.Extensions.Get<TableEntityPartitionKey>().PartitionKey}' from '{nameof(IProvideOrderId)}'");
await next();
}
public class Registration : RegisterStep
{
public Registration() :
base(nameof(OrderIdAsPartitionKeyBehavior),
typeof(OrderIdAsPartitionKeyBehavior),
"Determines the PartitionKey from the logical message",
provider => new OrderIdAsPartitionKeyBehavior(provider.GetRequiredService<IProvidePartitionKeyFromSagaId>()))
{
InsertBefore(nameof(LogicalOutboxBehavior));
}
}
IProvidePartitionKeyFromSagaId partitionKeyFromSagaId1;
static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}
One handler publishes an event that doesn't implement IProvideOrderId
but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation
by participating in the transactional batch provided by NServiceBus.
public class ShipOrderHandler :
IHandleMessages<ShipOrder>
{
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
var orderShippingInformation = new OrderShippingInformation
{
Id = Guid.NewGuid(),
OrderId = message.OrderId,
ShippedAt = DateTimeOffset.UtcNow,
};
Store(orderShippingInformation, context);
Log.Info($"Order Shipped. OrderId {message.OrderId}");
var options = new PublishOptions();
options.SetHeader("Sample.AzureTable.Transaction.OrderId", message.OrderId.ToString());
return context.Publish(new OrderShipped { OrderId = orderShippingInformation.OrderId, ShippingDate = orderShippingInformation.ShippedAt }, options);
}
private static void Store(OrderShippingInformation orderShippingInformation, IMessageHandlerContext context)
{
var session = context.SynchronizedStorageSession.AzureTablePersistenceSession();
orderShippingInformation.PartitionKey = session.PartitionKey;
session.Batch.Add(TableOperation.Insert(orderShippingInformation));
}
static ILog Log = LogManager.GetLogger<ShipOrderHandler>();
}
The header can be used to determine the partition key in the transport receive context
class OrderIdHeaderAsPartitionKeyBehavior : Behavior<ITransportReceiveContext>
{
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
if (context.Message.Headers.TryGetValue("Sample.AzureTable.Transaction.OrderId", out var orderId))
{
Log.Info($"Found partition key '{orderId}' from header 'Sample.AzureTable.Transaction'");
context.Extensions.Set(new TableEntityPartitionKey(orderId));
}
return next();
}
static readonly ILog Log = LogManager.GetLogger<OrderIdHeaderAsPartitionKeyBehavior>();
}
Finally the above behaviors are registered in the pipeline.
endpointConfiguration.Pipeline.Register(new OrderIdHeaderAsPartitionKeyBehavior(), "Extracts a partition key from a header");
endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<OrderShipped>,
IHandleTimeouts<CompleteOrder>
{
static readonly ILog Log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessageHeader<OrderShipped>("Sample.AzureTable.Transaction.OrderId");
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
Log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription,
OrderId = Data.OrderId,
};
return Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Handle(OrderShipped message, IMessageHandlerContext context)
{
Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
return Task.CompletedTask;
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
Log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}