Azure Storage Persistence

Component: Azure Storage Persistence
NuGet Package NServiceBus.Azure (6.x)
Target NServiceBus Version: 5.x

Prerequisites

Ensure that an instance of the latest Azure Storage Emulator is running.

Azure Storage Persistence

This sample utilizes the Azure Storage Persistence.

Code walk-through

This sample shows a simple Client + Server scenario.

  • Client sends a StartOrder message to Server
  • Server starts an OrderSaga.
  • OrderSaga requests a timeout with a CompleteOrder data.
  • When the CompleteOrder timeout fires the OrderSaga publishes a OrderCompleted event.
  • The Server then publishes a message that the client subscribes to.
  • Client handles OrderCompleted event.

Azure Storage configuration

The Server endpoint is configured to use the Azure Storage persistence in two locations.

The endpoint configuration

busConfiguration.EndpointName("Samples.Azure.StoragePersistence.Server");
busConfiguration.UsePersistence<AzureStoragePersistence>();

The app.config

Note the use of UseDevelopmentStorage to point to the Azure Storage Emulator.

<configSections>
  <section name="AzureSubscriptionStorageConfig" type="NServiceBus.Config.AzureSubscriptionStorageConfig, NServiceBus.Azure" />
  <section name="AzureSagaPersisterConfig" type="NServiceBus.Config.AzureSagaPersisterConfig, NServiceBus.Azure" />
  <section name="AzureTimeoutPersisterConfig" type="NServiceBus.Config.AzureTimeoutPersisterConfig, NServiceBus.Azure" />
</configSections>
<AzureSagaPersisterConfig ConnectionString="UseDevelopmentStorage=true" />
<AzureTimeoutPersisterConfig ConnectionString="UseDevelopmentStorage=true" />
<AzureSubscriptionStorageConfig ConnectionString="UseDevelopmentStorage=true" />

Order Saga Data

public class OrderSagaData :
    IContainSagaData
{
    public Guid Id { get; set; }
    public string Originator { get; set; }
    public string OriginalMessageId { get; set; }

    [Unique]
    public Guid OrderId { get; set; }

    public string OrderDescription { get; set; }

}

Order Saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<OrderSaga>();

    public OrderSaga(IBus bus)
    {
        this.bus = bus;
    }

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(message => message.OrderId)
            .ToSaga(sagaData => sagaData.OrderId);
    }

    public void Handle(StartOrder message)
    {
        Data.OrderId = message.OrderId;
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };

        RequestTimeout(TimeSpan.FromSeconds(5), timeoutData);
    }

    public void Timeout(CompleteOrder state)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        bus.Publish(orderCompleted);
        MarkAsComplete();
    }

}

The Data in Azure Storage

The data in Azure Storage is stored in several locations.

Reading the data using code

There are several helper methods in AzureHelper.cs in the StorageReader projects. These helpers are used to output the data seen below.

Writing table data

static void WriteOutTable(string tableName, bool decodeRowKey)
{
    var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
    var tableClient = storageAccount.CreateCloudTableClient();
    var table = tableClient.GetTableReference(tableName);
    var entities = table.ExecuteQuery(new TableQuery()).ToList();
    Debug.WriteLine($"'{tableName}' table contents");
    foreach (var entity in entities)
    {
        Debug.WriteLine($"  PartitionKey:= {entity.PartitionKey}");
        Debug.WriteLine($"    RowKey:= {entity.RowKey}");
        if (decodeRowKey)
        {
            var decodedRowKey = entity.RowKey.DecodeFromKey();
            Debug.WriteLine($"    DecodedRowKey:= {decodedRowKey}");
        }
        foreach (var property in entity.Properties)
        {
            var propertyAsObject = property.Value.PropertyAsObject
                .ToString().Truncate(50);
            Debug.WriteLine($"    {property.Key}:= {propertyAsObject}");
        }
    }
    Debug.WriteLine("");
}

Writing blob data

static async Task WriteOutBlobContainer(string containerName)
{
    var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
    var tableClient = storageAccount.CreateCloudBlobClient();
    var container = tableClient.GetContainerReference(containerName);
    Debug.WriteLine($"'{containerName}' container contents");
    foreach (var blob in container.ListBlobs())
    {
        var name = blob.Uri.AbsolutePath.Split('/').Last();
        Debug.WriteLine($"  Blob:= {name}");
        var blockBlobReference = container.GetBlockBlobReference(name);
        var text = await blockBlobReference.DownloadTextAsync()
            .ConfigureAwait(false);
        Debug.WriteLine($"    {text}");
    }
    Debug.WriteLine("");
}

Using the helpers

WriteOutTable("OrderSagaData", false);
WriteOutTable("Subscription", true);
WriteOutTable("TimeoutDataTableName", false);
WriteOutTable("TimeoutManagerDataTable", false);
return WriteOutBlobContainer("timeoutstate");

The Saga Data

The saga data from the 'OrderSagaData' table contents

PartitionKey:= 21a6f7ed-65d2-42ff-a4d3-a50e00ea76ba
  RowKey:= 21a6f7ed-65d2-42ff-a4d3-a50e00ea76ba
  Id:= 21a6f7ed-65d2-42ff-a4d3-a50e00ea76ba
  Originator:= Samples.Azure.StoragePersistence.Client@RETINA
  OriginalMessageId:= 0d574aa7-0d39-4e93-8233-a50e00ea764f
  OrderId:= 79cc2072-c724-4cc0-9202-b6c4918a3de2
  OrderDescription:= The saga for order 79cc2072-c724-4cc0-9202-b6c4918...

The Timeouts

The timeout data from the TimeoutDataTableName table

  PartitionKey:= 2015090918
    RowKey:= 06800d44-9fc4-49b5-a9e9-a50e00ea76c0
    Destination:= Samples.Azure.StoragePersistence.Server@RETINA
    Headers:= {"NServiceBus.MessageId":"06800d44-9fc4-49b5-a9e9-...
    OwningTimeoutManager:= Samples.Azure.StoragePersistence.Server
    SagaId:= 21a6f7ed-65d2-42ff-a4d3-a50e00ea76ba
    StateAddress:= 06800d44-9fc4-49b5-a9e9-a50e00ea76c0
    Time:= 9/09/2015 6:06:59 PM

The timeout serialized message from the timeoutstate blob container.

'timeoutstate' container contents
  Blob:= 06800d44-9fc4-49b5-a9e9-a50e00ea76c0
    {"OrderDescription":"The saga for order 79cc2072-c724-4cc0-9202-b6c4918a3de2"}

The Subscriptions

The Client endpoint registered in the Subscription table contents

PartitionKey:= OrderCompleted, Version=0.0.0.0
  RowKey:= U2FtcGxlcy5BenVyZS5TdG9yYWdlUGVyc2lzdGVuY2UuQ2xpZW50QFJFVElOQQ==
  DecodedRowKey:= Samples.Azure.StoragePersistence.Client@RETINA

Related Articles

  • Azure Storage Persistence
    Using Azure Storage as persistence.
  • NServiceBus and Azure
    Using Azure for endpoint hosting and to provide Transports and Persistence.
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified