Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Azure Table Persistence Usage with Transactions

This sample demonstrates a client/server scenario using sagas and outbox persistences to store records atomically by leveraging transactions.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • Receive the OrderShipped message with a custom header.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout is triggered.

Persistence config

Configure the endpoint to use Azure Table Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Transactions.Server");
endpointConfiguration.EnableOutbox();

var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();

var connection = useStorageTable ? "UseDevelopmentStorage=true" :
    "TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

var tableServiceClient = new TableServiceClient(connection);
persistence.UseTableServiceClient(tableServiceClient);

persistence.DefaultTable("Server");

The OrderId is used as the partition key.

Using Behaviors

The following shows two different ways to provide OrderIDs to the saga using behaviors.

  1. Most messages implement IProvideOrderId allowing the OrderId to be used as the partition key.
class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public OrderIdAsPartitionKeyBehavior(IProvidePartitionKeyFromSagaId partitionKeyFromSagaId)
    {
        partitionKeyFromSagaId1 = partitionKeyFromSagaId;
    }

    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var correlationProperty = SagaCorrelationProperty.None;
        if (context.Message.Instance is IProvideOrderId provideOrderId)
        {
            var partitionKeyValue = provideOrderId.OrderId;
            correlationProperty = new SagaCorrelationProperty("OrderId", partitionKeyValue);
        }

        await partitionKeyFromSagaId1.SetPartitionKey<OrderSagaData>(context, correlationProperty);

        if (context.Headers.TryGetValue(Headers.SagaId, out var sagaIdHeader))
        {
            Log.Info($"Saga Id Header: {sagaIdHeader}");
        }

        if (context.Extensions.TryGet<TableInformation>(out var tableInformation))
        {
            Log.Info($"Table Information: {tableInformation.TableName}");
        }

        Log.Info($"Found partition key '{context.Extensions.Get<TableEntityPartitionKey>().PartitionKey}' from '{nameof(IProvideOrderId)}'");

        await next();
    }

    public class Registration : RegisterStep
    {
        public Registration() :
            base(nameof(OrderIdAsPartitionKeyBehavior),
                typeof(OrderIdAsPartitionKeyBehavior),
                "Determines the PartitionKey from the logical message",
                provider => new OrderIdAsPartitionKeyBehavior(provider.GetRequiredService<IProvidePartitionKeyFromSagaId>()))
        {
            InsertBefore(nameof(LogicalOutboxBehavior));
        }
    }

    IProvidePartitionKeyFromSagaId partitionKeyFromSagaId1;
    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}
  1. One handler publishes an event that does not implement IProvideOrderId but adds a custom header containing the OrderId. The handler also creates OrderShippingInformation as part of the transactional batch provided by NServiceBus.
public class ShipOrderHandler :
    IHandleMessages<ShipOrder>
{
    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        var orderShippingInformation = new OrderShippingInformation
        {
            Id = Guid.NewGuid(),
            OrderId = message.OrderId,
            ShippedAt = DateTimeOffset.UtcNow,
        };

        Store(orderShippingInformation, context);

        Log.Info($"Order Shipped. OrderId {message.OrderId}");

        var options = new PublishOptions();
        options.SetHeader("Sample.AzureTable.Transaction.OrderId", message.OrderId.ToString());

        return context.Publish(new OrderShipped { OrderId = orderShippingInformation.OrderId, ShippingDate = orderShippingInformation.ShippedAt }, options);
    }

    private static void Store(OrderShippingInformation orderShippingInformation, IMessageHandlerContext context)
    {
        var session = context.SynchronizedStorageSession.AzureTablePersistenceSession();

        orderShippingInformation.PartitionKey = session.PartitionKey;

        session.Batch.Add(new TableTransactionAction(TableTransactionActionType.Add, orderShippingInformation));
    }

    static ILog Log = LogManager.GetLogger<ShipOrderHandler>();
}

The custom header added then allows the partition key to be determined within OrderIdHeaderAsPartitionKeyBehavior.

class OrderIdHeaderAsPartitionKeyBehavior : Behavior<ITransportReceiveContext>
{
    public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        if (context.Message.Headers.TryGetValue("Sample.AzureTable.Transaction.OrderId", out var orderId))
        {
            Log.Info($"Found partition key '{orderId}' from header 'Sample.AzureTable.Transaction'");

            context.Extensions.Set(new TableEntityPartitionKey(orderId));
        }

        return next();
    }

    static readonly ILog Log = LogManager.GetLogger<OrderIdHeaderAsPartitionKeyBehavior>();
}

Finally, the above behaviors are registered in the pipeline.

endpointConfiguration.Pipeline.Register(new OrderIdHeaderAsPartitionKeyBehavior(), "Extracts a partition key from a header");
endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<OrderShipped>,
    IHandleTimeouts<CompleteOrder>
{
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessageHeader<OrderShipped>("Sample.AzureTable.Transaction.OrderId");
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        Log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription,
            OrderId = Data.OrderId,
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Handle(OrderShipped message, IMessageHandlerContext context)
    {
        Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
        return Task.CompletedTask;
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        Log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.