Azure Storage Persistence

Component: Azure Storage Persistence
NuGet Package NServiceBus.Persistence.AzureStorage (1.x)
Target NServiceBus Version: 6.x

Prerequisites

Ensure that an instance of the latest Azure Storage Emulator is running.

Azure Storage Persistence

This sample utilizes the Azure Storage Persistence.

Code walk-through

This sample shows a simple Client + Server scenario.

  • Client sends a StartOrder message to Server
  • Server starts an OrderSaga.
  • OrderSaga requests a timeout with a CompleteOrder data.
  • When the CompleteOrder timeout fires the OrderSaga publishes a OrderCompleted event.
  • The Server then publishes a message that the client subscribes to.
  • Client handles OrderCompleted event.

Azure Storage configuration

The Server endpoint is configured to use the Azure Storage persistence in two locations.

The endpoint configuration

var endpointConfiguration = new EndpointConfiguration("Samples.Azure.StoragePersistence.Server");
var persistence = endpointConfiguration.UsePersistence<AzureStoragePersistence>();
persistence.ConnectionString("UseDevelopmentStorage=true");

endpointConfiguration.UsePersistence<AzureStoragePersistence, StorageType.Sagas>()
    .AssumeSecondaryIndicesExist();

Order Saga Data

public class OrderSagaData :
    IContainSagaData
{
    public Guid Id { get; set; }
    public string Originator { get; set; }
    public string OriginalMessageId { get; set; }
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order Saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(message => message.OrderId)
            .ToSaga(sagaData => sagaData.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };

        return RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        MarkAsComplete();
        return context.Publish(orderCompleted);
    }
}

The Data in Azure Storage

The data in Azure Storage is stored in several locations.

Reading the data using code

There are several helper methods in AzureHelper.cs in the StorageReader projects. These helpers are used to output the data seen below.

Writing table data

static void WriteOutTable(string tableName, bool decodeRowKey)
{
    var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
    var tableClient = storageAccount.CreateCloudTableClient();
    var table = tableClient.GetTableReference(tableName);
    var entities = table.ExecuteQuery(new TableQuery()).ToList();
    Debug.WriteLine($"'{tableName}' table contents");
    foreach (var entity in entities)
    {
        Debug.WriteLine($"  PartitionKey:= {entity.PartitionKey}");
        Debug.WriteLine($"    RowKey:= {entity.RowKey}");
        if (decodeRowKey)
        {
            var decodedRowKey = entity.RowKey.DecodeFromKey();
            Debug.WriteLine($"    DecodedRowKey:= {decodedRowKey}");
        }
        foreach (var property in entity.Properties)
        {
            var propertyAsObject = property.Value.PropertyAsObject
                .ToString().Truncate(50);
            Debug.WriteLine($"    {property.Key}:= {propertyAsObject}");
        }
    }
    Debug.WriteLine("");
}

Writing blob data

static async Task WriteOutBlobContainer(string containerName)
{
    var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
    var tableClient = storageAccount.CreateCloudBlobClient();
    var container = tableClient.GetContainerReference(containerName);
    Debug.WriteLine($"'{containerName}' container contents");
    foreach (var blob in container.ListBlobs())
    {
        var name = blob.Uri.AbsolutePath.Split('/').Last();
        Debug.WriteLine($"  Blob:= {name}");
        var blockBlobReference = container.GetBlockBlobReference(name);
        var text = await blockBlobReference.DownloadTextAsync()
            .ConfigureAwait(false);
        Debug.WriteLine($"    {text}");
    }
    Debug.WriteLine("");
}

Using the helpers

WriteOutTable("OrderSagaData", false);
WriteOutTable("Subscription", true);
WriteOutTable("TimeoutDataTableName", false);
WriteOutTable("TimeoutManagerDataTable", false);
return WriteOutBlobContainer("timeoutstate");

The Saga Data

The saga data from the 'OrderSagaData' table contents

PartitionKey:= 0145ddc6-5aa5-4a84-8b40-a8020044e9fb
  RowKey:= 0145ddc6-5aa5-4a84-8b40-a8020044e9fb
  OrderId:= 696788ff-97a0-4721-b6dc-71cf1ad4a4ba
  Id:= 0145ddc6-5aa5-4a84-8b40-a8020044e9fb
  Originator:= samples-azure-storagepersistence-client
  OriginalMessageId:= 14c900c2-1188-49a6-8238-a8020044e91a
  OrderDescription:= The saga for order 696788ff-97a0-4721-b6dc-71cf1ad...
  NServiceBus_2ndIndexKey:= Index_OrderSagaData_OrderId_"696788ff-97a0-4721-b6...
PartitionKey:= Index_OrderSagaData_OrderId_"696788ff-97a0-4721-b6dc-71cf1ad4a4ba"
  RowKey:= 
  SagaId:= 0145ddc6-5aa5-4a84-8b40-a8020044e9fb

The Timeouts

The sample is using Azure Storage Queues transport. From version 7.4 and above, the transport is using native delayed delivery and does not rely on the Azure Storage persistence for timeouts.

The Subscriptions

The Client endpoint registered in the Subscription table contents

PartitionKey:= Shared.OrderCompleted, Version=1.0.0.0
  RowKey:= c2FtcGxlcy1henVyZS1zdG9yYWdlcGVyc2lzdGVuY2UtY2xpZW50
  DecodedRowKey:= samples-azure-storagepersistence-client
  EndpointName:= Samples-Azure-StoragePersistence-Client

Related Articles

  • Azure Storage Persistence
    Using Azure Storage as persistence.
  • NServiceBus and Azure
    Using Azure for endpoint hosting and to provide Transports and Persistence.
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified