Azure Storage Queues Transport

Component: Azure Storage Queues Transport
NuGet Package NServiceBus.Azure.Transports.WindowsAzureStorageQueues (8-pre)
Target NServiceBus Version: 7.x
This page targets a pre-release version and is subject to change prior to the final release.

Prerequisites

Ensure an instance of the Azure Storage Emulator is running.

Azure Storage Queues Transport

This sample utilizes the Azure Storage Queues Transport.

Code walk-through

This sample shows a simple two endpoint scenario.

  • Endpoint1 sends a Message1 message to Endpoint2
  • Endpoint2 replies to Endpoint1 with a Message2.

Azure Storage configuration

The Server endpoint is configured to use the Azure Storage persistence in two locations.

The endpoint configuration

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
transport.ConnectionString("UseDevelopmentStorage=true");

Sanitization

One of the endpoints is using a long name which needs to be sanitized.

var endpointName = "Samples.Azure.StorageQueues.Endpoint1.With.A.Very.Long.Name.And.Invalid.Characters";
var endpointConfiguration = new EndpointConfiguration(endpointName);

To remain backwards compatible with the older versions of the transport, MD5 based sanitization is registered. The sample also includes SHA1 based sanitization. This sanitizer is suitable for endpoints with the transport version 7.x used to shorten queue names with SHA1 hashing algorithm.

transport.SanitizeQueueNamesWith(BackwardsCompatibleQueueNameSanitizer.WithMd5Shortener);

The full contents of the sanitization code is shown at the end of this document.

The Data in Azure Storage

The queues for the two endpoints can be seen in the Server Explorer of Visual Studio.

Reading the data using code

There are several helper methods in AzureHelper.cs in the StorageReader projects. These helpers are used to output the data seen below.

Writing Queue Messages

This helper peeks the first message from a given queue and writes out the contents of that message.

static async Task WriteOutQueue(string queueName)
{
    var storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
    var queueClient = storageAccount.CreateCloudQueueClient();
    var queue = queueClient.GetQueueReference(queueName);
    var message = await queue.PeekMessageAsync()
        .ConfigureAwait(false);
    if (message != null)
    {
        Debug.WriteLine("Message contents");
        WriteOutMessage(message);
        return;
    }

    Console.WriteLine("No messages found in the 'samples-azure-storagequeues-endpoint2' queue. Execute 'Endpoint1' without running 'Endpoint2' and then try again.");
}

static void WriteOutMessage(CloudQueueMessage message)
{
    var json = message.AsString;
    var byteOrderMarkUtf8 = Encoding.UTF8.GetString(Encoding.UTF8.GetPreamble());
    if (json.StartsWith(json))
    {
        json = json.Remove(0, byteOrderMarkUtf8.Length);
    }
    dynamic parsedJson = JsonConvert.DeserializeObject(json);
    Debug.WriteLine("CloudQueueMessage contents:");
    Debug.WriteLine(JsonConvert.SerializeObject((object) parsedJson, Formatting.Indented));
    var body = (string)parsedJson.Body;
    Debug.WriteLine("Serialized message body:");
    Debug.WriteLine(body.Base64Decode());
}

Using the helper

await WriteOutQueue("samples-azure-storagequeues-endpoint2")
    .ConfigureAwait(false);

The Message Data

Run only Endpoint1 and send a message. Notice the contents of the message in the samples-azure-storagequeues-endpoint2 queue.

CloudQueueMessage contents

{
  "IdForCorrelation": null,
  "Id": "bb6ec79c-984f-4d51-8dd6-a50e010564a5",
  "MessageIntent": 1,
  "ReplyToAddress": "Samples.Azure.StorageQueues.Endpoint1@UseDevelopmentStorage=true",
  "TimeToBeReceived": "10675199.02:48:05.4775807",
  "Headers": {
    "NServiceBus.MessageId": "bb6ec79c-984f-4d51-8dd6-a50e010564a5",
    "NServiceBus.CorrelationId": "bb6ec79c-984f-4d51-8dd6-a50e010564a5",
    "NServiceBus.MessageIntent": "Send",
    "NServiceBus.Version": "5.2.5",
    "NServiceBus.TimeSent": "2015-09-09 05:51:42:197915 Z",
    "NServiceBus.ContentType": "application/json",
    "NServiceBus.EnclosedMessageTypes": "Message1, Shared, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null",
    "NServiceBus.ConversationId": "a496f0ce-a3c8-4f30-9598-a50e010564a5",
    "NServiceBus.OriginatingMachine": "RETINA",
    "NServiceBus.OriginatingEndpoint": "Samples.Azure.StorageQueues.Endpoint1",
    "$.diagnostics.originating.hostid": "658a1d15fed47c77cd63a3e63da15cc6"
  },
  "Body": "77u/eyJQcm9wZXJ0eSI6IkhlbGxvIGZyb20gRW5kcG9pbnQxIn0=",
  "CorrelationId": "bb6ec79c-984f-4d51-8dd6-a50e010564a5",
  "Recoverable": true
}

Decoded Body

Note that above there is a encoded Body property. Decoding this message will produce the following.

{"Property":"Hello from Endpoint1"}

Sanitizer source code

public static class BackwardsCompatibleQueueNameSanitizer
{
    public static string WithMd5Shortener(string queueName)
    {
        return Sanitize(queueName, useMd5Hashing: true);
    }

    public static string WithSha1Shortener(string queueName)
    {
        return Sanitize(queueName, useMd5Hashing: false);
    }

    static string Sanitize(string queueName, bool useMd5Hashing = true)
    {
        var queueNameInLowerCase = queueName.ToLowerInvariant();
        return ShortenQueueNameIfNecessary(queueNameInLowerCase, SanitizeQueueName(queueNameInLowerCase), useMd5Hashing);
    }

    static string ShortenQueueNameIfNecessary(string queueName, string hashedQueueName, bool useMd5Hashing)
    {
        if (hashedQueueName.Length <= 63)
        {
            return hashedQueueName;
        }

        var shortenedName = useMd5Hashing ? ShortenWithMd5(queueName) : ShortenWithSha1(queueName);

        return $"{hashedQueueName.Substring(0, 63 - shortenedName.Length - 1).Trim('-')}-{shortenedName}";
    }

    static string SanitizeQueueName(string queueName)
    {
        // this can lead to multiple '-' occurrences in a row
        var sanitized = invalidCharacters.Replace(queueName, "-");
        return multipleDashes.Replace(sanitized, "-");
    }

    static string ShortenWithMd5(string test)
    {
        //use MD5 hash to get a 16-byte hash of the string
        using (var provider = MD5.Create())
        {
            var inputBytes = Encoding.Default.GetBytes(test);
            var hashBytes = provider.ComputeHash(inputBytes);
            //generate a guid from the hash:
            return new Guid(hashBytes).ToString();
        }
    }

    static string ShortenWithSha1(string queueName)
    {
        using (var provider = SHA1.Create())
        {
            var inputBytes = Encoding.Default.GetBytes(queueName);
            var hashBytes = provider.ComputeHash(inputBytes);

            return ToChars(hashBytes);
        }
    }

    static string ToChars(byte[] hashBytes)
    {
        var chars = new char[hashBytes.Length * 2];
        for (var i = 0; i < chars.Length; i += 2)
        {
            var byteIndex = i / 2;
            chars[i] = HexToChar((byte)(hashBytes[byteIndex] >> 4));
            chars[i + 1] = HexToChar(hashBytes[byteIndex]);
        }

        return new string(chars);
    }

    static char HexToChar(byte a)
    {
        a &= 15;
        return a > 9 ? (char)(a - 10 + 97) : (char)(a + 48);
    }

    static Regex invalidCharacters = new Regex(@"[^a-z0-9\-]", RegexOptions.Compiled);
    static Regex multipleDashes = new Regex(@"\-+", RegexOptions.Compiled);
}

Related Articles


Last modified