Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Modernization
Samples

AzureTable Persistence Usage with non-default table

This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample shows a client/server scenario using a dynamic table configuration for certain saga types with a fallback to the default table.

Prerequisites

Ensure that an instance of the latest Azure Storage Emulator or Azure Cosmos DB Emulator is running.

Projects

SharedMessages

  • The shared message contracts used by all endpoints.

Client

  • Sends the StartOrder message to Server.
  • Receives and handles the OrderCompleted event.

Server

  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga sends a ShipOrder command to ShipOrderSaga
  • ShipOrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • ShipOrderSaga replies with CompleteOrder when the CompleteOrder timeout fires.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder message arrives.

Persistence config

Configure the endpoint to use Azure Table Persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.AzureTable.Table.Server");

var useStorageTable = true;
var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();

var connection = useStorageTable ? "UseDevelopmentStorage=true" :
    "TableEndpoint=https://localhost:8081/;AccountName=AzureTableSamples;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==";

var tableServiceClient = new TableServiceClient(connection);
persistence.UseTableServiceClient(tableServiceClient);
persistence.DefaultTable("OrderSagaData");

In the non-transactional mode the saga id is used as a partition key.

Behaviors

For all messages destined to go to the ShipOrderSaga the table is overridden at runtime to use the ShipOrderSagaData table.

class BehaviorProvidingDynamicTable(ILogger<BehaviorProvidingDynamicTable> logger) : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is ShipOrder ||
            context.Headers.TryGetValue(Headers.SagaType, out var sagaTypeHeader) && sagaTypeHeader.Contains(nameof(ShipOrderSaga)))
        {

            logger.LogInformation("Message '{MessageType}' destined to be handled by '{SagaType}' will use '{TableName}' table.", context.Message.MessageType.FullName, nameof(ShipOrderSaga), "ShipOrderSagaData");

            context.Extensions.Set(new TableInformation("ShipOrderSagaData"));
        }
        else
        {
            logger.LogInformation("Message '{MessageType}' destined to be handled by '{SagaType}' will use the default table.", context.Message.MessageType.FullName, nameof(OrderSaga));
        }

        return next();
    }

}

The behavior needs to be registered in the pipeline

var serviceProvider = builder.Services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILogger<BehaviorProvidingDynamicTable>>();
endpointConfiguration.Pipeline.Register(new BehaviorProvidingDynamicTable(logger), "Provides a non-default table for sagas started by ship order message");

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga(ILogger<OrderSaga> logger) :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<CompleteOrder>
{

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId)
            .ToMessage<StartOrder>(msg => msg.OrderId)
            .ToMessage<CompleteOrder>(msg => msg.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        Data.OrderDescription = $"The saga for order {message.OrderId}";

        logger.LogInformation("Received StartOrder message {OrderId}. Starting Saga", Data.OrderId);

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        return context.SendLocal(shipOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {

        logger.LogInformation("Saga with OrderId {OrderId} completed", Data.OrderId);
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

ShipOrder saga data

public class ShipOrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
}

ShipOrder saga

public class ShipOrderSaga(ILogger<ShipOrderSaga> logger) :
    Saga<ShipOrderSagaData>,
    IAmStartedByMessages<ShipOrder>,
    IHandleTimeouts<CompleteOrder>
{

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShipOrderSagaData> mapper)
    {
        mapper.MapSaga(saga => saga.OrderId).ToMessage<ShipOrder>(msg => msg.OrderId);
    }

    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        logger.LogInformation("Order Shipped. OrderId {OrderId}", message.OrderId);
        Data.OrderId = message.OrderId;

        logger.LogInformation("Order will complete in {Seconds} seconds", 5);
        var timeoutData = new CompleteOrder();
        return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        logger.LogInformation("Saga with OrderId {OrderId} about to complete", Data.OrderId);
        MarkAsComplete();

        state.OrderId = Data.OrderId;

        return ReplyToOriginator(context, state);
    }
}

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.