Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

DynamoDB Persistence Usage with transactions

NuGet Package: NServiceBus.Persistence.DynamoDB (1.x)
Target Version: NServiceBus 8.x

This sample shows a client/server scenario using saga and outbox persistence to store records atomically by leveraging transactions.

Prerequisites

The sample uses a DynamoDB local instance by default. See the AWS guidance on deploying DynamoDB local.

Alternatively with Docker installed locally, execute the following command in the solution directory:

docker-compose up -d

the data is only kept in memory and will be gone when the container is removed. It is possible to inspect the data written to DynamoDB by using the NoSQL Workbench for DynamoDB. The below screenshot shows the PK and SK created for the not yet completed Saga, the outbox entries and the transactionally inserted OrderShippingInformation.

Projects

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • Receive the OrderShipped message with a custom header.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

SharedMessages

Contains the shared message contracts used by all endpoints.

Persistence config

Configure the endpoint to use DynamoDB Persistence.

var amazonDynamoDbClient = new AmazonDynamoDBClient(
    new BasicAWSCredentials("localdb", "localdb"),
    new AmazonDynamoDBConfig
    {
        ServiceURL = "http://localhost:8000"
    });

var endpointConfiguration = new EndpointConfiguration("Samples.DynamoDB.Transactions.Server");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();

var persistence = endpointConfiguration.UsePersistence<DynamoPersistence>();
persistence.DynamoClient(amazonDynamoDbClient);
persistence.UseSharedTable(new TableConfiguration
{
    TableName = "Samples.DynamoDB.Transactions"
});

endpointConfiguration.EnableOutbox();

The handler also creates OrderShippingInformation by participating in the transactional batch provided by NServiceBus.

var orderShippingInformation = new OrderShippingInformation
{
    Id = Guid.NewGuid(),
    OrderId = message.OrderId,
    ShippedAt = DateTimeOffset.UtcNow,
};

var session = context.SynchronizedStorageSession.DynamoPersistenceSession();

session.Add(new TransactWriteItem
{
    Put = new Put
    {
        TableName = "Samples.DynamoDB.Transactions",
        Item = orderShippingInformation.ToMap()
    }
});

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<OrderShipped>,
    IHandleTimeouts<CompleteOrder>
{
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessage<OrderShipped>(msg => msg.OrderId);
    }

    public async Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        Log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription,
            OrderId = Data.OrderId,
        };

        await Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Handle(OrderShipped message, IMessageHandlerContext context)
    {
        Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
        return Task.CompletedTask;
    }

    public async Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        Log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        await context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.