Azure Table Persistence Usage using Saga IDs as partition key

Component: Azure Table Persistence
NuGet Package NServiceBus.Persistence.AzureTable (4-pre)
This page targets a pre-release version and is subject to change prior to the final release.

This sample shows a client/server scenario using saga and outbox persistences to store records atomically by leveraging transactions. The Saga ID is used as a partition key.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • Receive the OrderShipped message with a custom header.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

Persistence config

Configure the endpoint to use Azure Table Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Transactions.Server");
endpointConfiguration.EnableOutbox();

var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();

var connection = useStorageTable ? "UseDevelopmentStorage=true" :
    "TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

var account = CloudStorageAccount.Parse(connection);
var cloudTableClient = account.CreateCloudTableClient();
persistence.UseCloudTableClient(cloudTableClient);

persistence.DefaultTable("Server");

The order id is used to derive the saga id from.

Behaviors

Most messages implement IProvideOrderId. By default Saga IDs are deterministically derived from the saga data, the correlation property name and the correlation property value. IProvidePartitionKeyFromSagaId is a helper that can be injected into behaviors in the logical pipeline stage if the Saga ID should be used as a partition key.

class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public OrderIdAsPartitionKeyBehavior(IProvidePartitionKeyFromSagaId partitionKeyFromSagaId)
    {
        partitionKeyFromSagaId1 = partitionKeyFromSagaId;
    }

    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var correlationProperty = SagaCorrelationProperty.None;
        if (context.Message.Instance is IProvideOrderId provideOrderId)
        {
            var partitionKeyValue = provideOrderId.OrderId;
            correlationProperty = new SagaCorrelationProperty("OrderId", partitionKeyValue);
        }

        await partitionKeyFromSagaId1.SetPartitionKey<OrderSagaData>(context, correlationProperty);

        if (context.Headers.TryGetValue(Headers.SagaId, out var sagaIdHeader))
        {
            Log.Info($"Saga Id Header: {sagaIdHeader}");
        }

        if (context.Extensions.TryGet<TableInformation>(out var tableInformation))
        {
            Log.Info($"Table Information: {tableInformation.TableName}");
        }

        Log.Info($"Found partition key '{context.Extensions.Get<TableEntityPartitionKey>().PartitionKey}' from '{nameof(IProvideOrderId)}'");

        await next().ConfigureAwait(false);
    }

    public class Registration : RegisterStep
    {
        public Registration() :
            base(nameof(OrderIdAsPartitionKeyBehavior),
                typeof(OrderIdAsPartitionKeyBehavior),
                "Determines the PartitionKey from the logical message",
                provider => new OrderIdAsPartitionKeyBehavior(provider.GetRequiredService<IProvidePartitionKeyFromSagaId>()))
        {
            InsertBefore(nameof(LogicalOutboxBehavior));
        }
    }

    IProvidePartitionKeyFromSagaId partitionKeyFromSagaId1;
    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}

One handler replies with a message that doesn't implement IProvideOrderId. Transactionality can still be achieved because messages that are part of a saga conversation flow will get the Saga ID set as a header. In such cases no correlation property information needs to be extracted to derive the Saga ID from.

Finally the above behavior is registered in the pipeline.

endpointConfiguration.Pipeline.Register(new OrderIdAsPartitionKeyBehavior.Registration());

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<OrderShipped>,
    IHandleTimeouts<CompleteOrder>
{
    static readonly ILog Log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        Log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        Log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription,
            OrderId = Data.OrderId,
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Handle(OrderShipped message, IMessageHandlerContext context)
    {
        Log.Info($"Order with OrderId {Data.OrderId} shipped on {message.ShippingDate}");
        return Task.CompletedTask;
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        Log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified