This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions.
Projects
SharedMessages
The shared message contracts used by all endpoints.
Client
- Sends the
StartOrder
message toServer
. - Receives and handles the
OrderCompleted
event.
Server projects
- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.- Receive the
OrderShipped
message with a custom header. OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
timeout fires.
Persistence config
Configure the endpoint to use CosmosDB Persistence.
var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDB.Transactions.Server");
endpointConfiguration.EnableOutbox();
var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();
var connection = @"AccountEndpoint = https://localhost:8081/;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";
persistence.DatabaseName("Samples.CosmosDB.Transactions");
persistence.CosmosClient(new CosmosClient(connection));
persistence.DefaultContainer("Server", "/OrderId");
Because the order id is used as a partition key it has to be used in the partition key path as well.
Behaviors
Most messages implement IProvideOrderId
and thus a logical behavior can use the provided order identification as a partition key.
class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is IProvideOrderId provideOrderId)
{
var partitionKeyValue = provideOrderId.OrderId.ToString();
Log.Info($"Found partition key '{partitionKeyValue}' from '{nameof(IProvideOrderId)}'");
context.Extensions.Set(new PartitionKey(partitionKeyValue));
}
return next();
}
public class Registration : RegisterStep
{
public Registration() :
base(nameof(OrderIdAsPartitionKeyBehavior),
typeof(OrderIdAsPartitionKeyBehavior),
"Determines the PartitionKey from the logical message",
b => new OrderIdAsPartitionKeyBehavior())
{
InsertBefore(nameof(LogicalOutboxBehavior));
}
}
static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}
One handler publishes an event that doesn't implement IProvideOrderId
but adds a custom header to indicate the order identification. The handler also creates OrderShippingInformation
by participating in the transactional batch provided by NServiceBus.
public class ShipOrderHandler :
IHandleMessages<ShipOrder>
{
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
var orderShippingInformation = new OrderShippingInformation
{
Id = Guid.NewGuid(),
OrderId = message.OrderId,
ShippedAt = DateTimeOffset.UtcNow,
};
Store(orderShippingInformation, context);
Log.Info($"Order Shipped. OrderId {message.OrderId}");
var options = new PublishOptions();
options.SetHeader("Sample.CosmosDB.Transaction.OrderId", message.OrderId.ToString());
return context.Publish(new OrderShipped { OrderId = orderShippingInformation.OrderId, ShippingDate = orderShippingInformation.ShippedAt }, options);
}
private static void Store(OrderShippingInformation orderShippingInformation, IMessageHandlerContext context)
{
var session = context.SynchronizedStorageSession.CosmosPersistenceSession();
var requestOptions = new TransactionalBatchItemRequestOptions
{
EnableContentResponseOnWrite = false,
};
session.Batch.CreateItem(orderShippingInformation, requestOptions);
}
static ILog Log = LogManager.GetLogger<ShipOrderHandler>();
}
The header can be used to determine the partition key in the transport receive context
class OrderIdHeaderAsPartitionKeyBehavior : Behavior<ITransportReceiveContext>
{
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
if (context.Message.Headers.TryGetValue("Sample.CosmosDB.Transaction.OrderId", out var orderId))
{
Log.Info($"Found partition key '{orderId}' from header 'Sample.CosmosDB.Transaction'");
context.Extensions.Set(new PartitionKey(orderId));
}
return next();
}
static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}
Finally the above behaviors are registered in the pipeline.
endpointConfiguration.Pipeline.Register(new OrderIdHeaderAsPartitionKeyBehavior(), "Extracts a partition key from a header");
endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<OrderShipped>,
IHandleTimeouts<CompleteOrder>
{
static readonly ILog Log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessageHeader<OrderShipped>("Sample.CosmosDB.Transaction.OrderId");
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
Log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription,
OrderId = Data.OrderId,
};
return Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Handle(OrderShipped message, IMessageHandlerContext context)
{
Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
return Task.CompletedTask;
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
Log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}