Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

AzureTable Persistence Usage with non-default table

This sample shows a client/server scenario using a dynamic table configuration for certain saga types with a fallback to the default table.

Prerequisites

Ensure that an instance of the latest Azure Storage Emulator or Azure Cosmos DB Emulator is running.

Projects

SharedMessages

  • The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga sends a ShipOrder command to ShipOrderSaga
  • ShipOrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • ShipOrderSaga replies with CompleteOrder when the CompleteOrder timeout fires.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder message arrives.

Persistence config

Configure the endpoint to use Azure Table Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Table.Server");

var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();

var connection = useStorageTable ? "UseDevelopmentStorage=true" :
    "TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

var tableServiceClient = new TableServiceClient(connection);
persistence.UseTableServiceClient(tableServiceClient);
persistence.DefaultTable("OrderSagaData");

In the non-transactional mode the saga id is used as a partition key.

Behaviors

For all messages destined to go to the ShipOrderSaga the table is overridden at runtime to use the ShipOrderSagaData table.

class BehaviorProvidingDynamicTable : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is ShipOrder ||
            context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
        {
            Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(ShipOrderSaga)}' will use 'ShipOrderSagaData' table.");

            context.Extensions.Set(new TableInformation("ShipOrderSagaData"));
        }
        else
        {
            Log.Info($"Message '{context.Message.MessageType.FullName}' destined to be handled by '{nameof(OrderSaga)}' will the default table.");
        }

        return next();
    }

    static readonly ILog Log = LogManager.GetLogger<BehaviorProvidingDynamicTable>();
}

The behavior needs to be registered in the pipeline

endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicTable(), "Provides a non-default table for sagas started by ship order message");

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessage<CompleteOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.OrderDescription = $"The saga for order {message.OrderId}";

        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        return context.SendLocal(shipOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

ShipOrder saga data

public class ShipOrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
}

ShipOrder saga

public class ShipOrderSaga :
    Saga<ShipOrderSagaData>,
    IAmStartedByMessages<ShipOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<ShipOrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
    }

    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        log.Info($"Order Shipped. OrderId {message.OrderId}");
        Data.OrderId = message.OrderId;

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder();
        return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} about to complete");
        MarkAsComplete();

        state.OrderId = Data.OrderId;

        return ReplyToOriginator(context, state);
    }
}

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.