This sample shows a client/server scenario using saga and outbox persistence to store records atomically by leveraging transactions.
Prerequisites
The sample uses a DynamoDB local instance by default. See the AWS guidance on deploying DynamoDB local.
Projects
Client
- Sends the
StartOrder
message toServer
. - Receives and handles the
OrderCompleted
event.
Server projects
- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.- Receive the
OrderShipped
message with a custom header. OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
timeout fires.
SharedMessages
Contains the shared message contracts used by all endpoints.
Persistence config
Configure the endpoint to use Cosmos DB Persistence.
var amazonDynamoDbClient = new AmazonDynamoDBClient(
new BasicAWSCredentials("localdb", "localdb"),
new AmazonDynamoDBConfig
{
ServiceURL = "http://localhost:8000"
});
var endpointConfiguration = new EndpointConfiguration("Samples.DynamoDB.Transactions.Server");
var persistence = endpointConfiguration.UsePersistence<DynamoPersistence>();
persistence.DynamoClient(amazonDynamoDbClient);
persistence.UseSharedTable(new TableConfiguration
{
TableName = "Samples.DynamoDB.Transactions"
});
endpointConfiguration.EnableOutbox();
The handler also creates OrderShippingInformation
by participating in the transactional batch provided by NServiceBus.
var orderShippingInformation = new OrderShippingInformation
{
Id = Guid.NewGuid(),
OrderId = message.OrderId,
ShippedAt = DateTimeOffset.UtcNow,
};
var session = context.SynchronizedStorageSession.DynamoPersistenceSession();
session.Add(new TransactWriteItem
{
Put = new Put
{
TableName = "Samples.DynamoDB.Transactions",
Item = orderShippingInformation.ToMap()
}
});
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<OrderShipped>,
IHandleTimeouts<CompleteOrder>
{
static readonly ILog Log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<OrderShipped>(msg => msg.OrderId);
}
public async Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
Log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription,
OrderId = Data.OrderId,
};
await Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Handle(OrderShipped message, IMessageHandlerContext context)
{
Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
return Task.CompletedTask;
}
public async Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
Log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
await context.Publish(orderCompleted);
}
}