AzureTable Persistence Usage with non-default table

Component: Azure Table Persistence
NuGet Package NServiceBus.Persistence.AzureTable (4-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample shows a client/server scenario using a dynamic table configuration for certain saga types with a fallback to the default table.

Prerequisites

Ensure that an instance of the latest Azure Storage Emulator or Azure Cosmos DB Emulator is running.

Projects

SharedMessages

The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server projects

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga sends a ShipOrder command to ShipOrderSaga
  • ShipOrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • ShipOrderSaga replies with CompleteOrder when the CompleteOrder timeout fires.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder message arrives.

Persistence config

Configure the endpoint to use Azure Table Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Table.Server");

var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();

var connection = useStorageTable ? "UseDevelopmentStorage=true" :
    "TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

var account = CloudStorageAccount.Parse(connection);
var cloudTableClient = account.CreateCloudTableClient();
persistence.UseCloudTableClient(cloudTableClient);
persistence.DefaultTable("OrderSagaData");

In the non-transactional mode the saga id is used as a partition.

Behaviors

For all messages destined to go to the ShipOrderSaga the table is overridden at runtime to use ShipOrderSagaData table.

class BehaviorProvidingDynamicTable : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is ShipOrder ||
            context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
        {
            Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' table.");

            context.Extensions.Set(new TableInformation("ShipOrderSagaData"));
        }
        else
        {
            Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(OrderSaga)}' will the default table.");
        }

        return next();
    }

    static readonly ILog Log = LogManager.GetLogger<BehaviorProvidingDynamicTable>();
}

The behavior needs to be registered in the pipeline

endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicTable(), "Provides a non-default table for sagas started by ship order message");

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessage<CompleteOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.OrderDescription = $"The saga for order {message.OrderId}";

        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        return context.SendLocal(shipOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

ShipOrder saga data

public class ShipOrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
}

ShipOrder saga

public class ShipOrderSaga :
    Saga<ShipOrderSagaData>,
    IAmStartedByMessages<ShipOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<ShipOrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
    }

    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        log.Info($"Order Shipped. OrderId {message.OrderId}");
        Data.OrderId = message.OrderId;

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder();
        return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} about to complete");
        MarkAsComplete();

        state.OrderId = Data.OrderId;

        return ReplyToOriginator(context, state);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified