Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Azure Storage Queues Transport

Prerequisites

Ensure an instance of the Azurite Emulator is running.

Azure Storage Queues Transport

This sample uses the Azure Storage Queues Transport.

Code walk-through

This sample shows a simple two endpoint scenario.

  • Endpoint1 sends a Message1 message to Endpoint2
  • Endpoint2 replies to Endpoint1 with a Message2.

Azure Table configuration

The Server endpoint is configured to use the Azure Table persistence in two locations.

The endpoint configuration

var transport = new AzureStorageQueueTransport("UseDevelopmentStorage=true");
var routingSettings = endpointConfiguration.UseTransport(transport);

Sanitization

One of the endpoints is using a long name which needs to be sanitized.

var endpointName = "Samples.Azure.StorageQueues.Endpoint1.With.A.Very.Long.Name.And.Invalid.Characters";
var endpointConfiguration = new EndpointConfiguration(endpointName);

To remain backwards compatible with the older versions of the transport, MD5 based sanitization is registered. The sample also includes SHA1 based sanitization. This sanitizer is suitable for endpoints with the transport version 7.x used to shorten queue names with SHA1 hashing algorithm.

transport.QueueNameSanitizer = BackwardsCompatibleQueueNameSanitizer.WithMd5Shortener;

The full contents of the sanitization code is shown at the end of this document.

The data in Azure Storage

The queues for the two endpoints can be seen in the Server Explorer of Visual Studio.

Reading the data using code

There are several helper methods in AzureHelper.cs in the StorageReader projects. These helpers are used to output the data seen below.

Writing Queue Messages

This helper peeks the first message from a given queue and writes out the contents of that message.

static async Task WriteOutQueue(string queueName)
{
    var queueClient = new QueueClient("UseDevelopmentStorage=true", queueName);
    PeekedMessage[] message = await queueClient.PeekMessagesAsync(1);
    if (message != null)
    {
        Debug.WriteLine("Message contents");
        WriteOutMessage(message[0]);
        return;
    }

    Console.WriteLine("No messages found in the 'samples-azure-storagequeues-endpoint2' queue. Execute 'Endpoint1' without running 'Endpoint2' and then try again.");
}

static void WriteOutMessage(PeekedMessage message)
{
    var bytes = Convert.FromBase64String(message.MessageText);
    var json = Encoding.UTF8.GetString(bytes);
    var byteOrderMarkUtf8 = Encoding.UTF8.GetString(Encoding.UTF8.GetPreamble());
    if (json.StartsWith(json))
    {
        json = json.Remove(0, byteOrderMarkUtf8.Length);
    }
    dynamic parsedJson = JsonConvert.DeserializeObject(json);
    Debug.WriteLine("Message contents:");
    Debug.WriteLine(JsonConvert.SerializeObject((object) parsedJson, Formatting.Indented));
    var body = (string)parsedJson.Body;
    Debug.WriteLine("Deserialized message body:");
    Debug.WriteLine(body.Base64Decode());
}

Using the helper

await WriteOutQueue("samples-azure-storagequeues-endpoint2");

The Message Data

Run only Endpoint1 and send a message. Notice the contents of the message in the samples-azure-storagequeues-endpoint2 queue.

CloudQueueMessage contents

{
  "IdForCorrelation": null,
  "Id": "5957b746-6636-43c7-89b9-ac6e01853eae",
  "MessageIntent": 1,
  "ReplyToAddress": "samples-azure-storagequeues-endpoint1",
  "TimeToBeReceived": "00:00:00",
  "Headers": {
    "NServiceBus.MessageId": "5957b746-6636-43c7-89b9-ac6e01853eae",
    "NServiceBus.MessageIntent": "Send",
    "NServiceBus.ConversationId": "b01e3778-23a1-42b5-bcbd-ac6e01853eaf",
    "NServiceBus.CorrelationId": "5957b746-6636-43c7-89b9-ac6e01853eae",
    "NServiceBus.ReplyToAddress": "samples-azure-storagequeues-endpoint1",
    "NServiceBus.OriginatingMachine": "BEAST",
    "NServiceBus.OriginatingEndpoint": "Samples-Azure-StorageQueues-Endpoint1",
    "$.diagnostics.originating.hostid": "27bfc91ba004f906eed90fc507597a11",
    "NServiceBus.ContentType": "application/json",
    "NServiceBus.EnclosedMessageTypes": "Message1, Shared, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
    "NServiceBus.Version": "7.4.4",
    "NServiceBus.TimeSent": "2020-11-09 23:37:11:901738 Z"
  },
  "Body": "77u/eyJQcm9wZXJ0eSI6IkhlbGxvIGZyb20gRW5kcG9pbnQxIn0=",
  "CorrelationId": "5957b746-6636-43c7-89b9-ac6e01853eae",
  "Recoverable": true
}

Decoded Body

Note that above there is a encoded Body property. Decoding this message will produce the following.

{"Property":"Hello from Endpoint1"}

Sanitizer source code

public static class BackwardsCompatibleQueueNameSanitizer
{
    public static string WithMd5Shortener(string queueName)
    {
        return Sanitize(queueName, useMd5Hashing: true);
    }

    public static string WithSha1Shortener(string queueName)
    {
        return Sanitize(queueName, useMd5Hashing: false);
    }

    static string Sanitize(string queueName, bool useMd5Hashing = true)
    {
        var queueNameInLowerCase = queueName.ToLowerInvariant();
        return ShortenQueueNameIfNecessary(SanitizeQueueName(queueNameInLowerCase), useMd5Hashing);
    }

    static string ShortenQueueNameIfNecessary(string sanitizedQueueName, bool useMd5Hashing)
    {
        if (sanitizedQueueName.Length <= 63)
        {
            return sanitizedQueueName;
        }

        var shortenedName = useMd5Hashing ? ShortenWithMd5(sanitizedQueueName) : ShortenWithSha1(sanitizedQueueName);

        return $"{sanitizedQueueName.Substring(0, 63 - shortenedName.Length - 1).Trim('-')}-{shortenedName}";
    }

    static string SanitizeQueueName(string queueName)
    {
        // this can lead to multiple '-' occurrences in a row
        var sanitized = invalidCharacters.Replace(queueName, "-");
        return multipleDashes.Replace(sanitized, "-");
    }

    static string ShortenWithMd5(string test)
    {
        //use MD5 hash to get a 16-byte hash of the string
        using (var provider = MD5.Create())
        {
            var inputBytes = Encoding.Default.GetBytes(test);
            var hashBytes = provider.ComputeHash(inputBytes);
            //generate a guid from the hash:
            return new Guid(hashBytes).ToString();
        }
    }

    static string ShortenWithSha1(string queueName)
    {
        using (var provider = SHA1.Create())
        {
            var inputBytes = Encoding.Default.GetBytes(queueName);
            var hashBytes = provider.ComputeHash(inputBytes);

            return ToChars(hashBytes);
        }
    }

    static string ToChars(byte[] hashBytes)
    {
        var chars = new char[hashBytes.Length * 2];
        for (var i = 0; i < chars.Length; i += 2)
        {
            var byteIndex = i / 2;
            chars[i] = HexToChar((byte)(hashBytes[byteIndex] >> 4));
            chars[i + 1] = HexToChar(hashBytes[byteIndex]);
        }

        return new string(chars);
    }

    static char HexToChar(byte a)
    {
        a &= 15;
        return a > 9 ? (char)(a - 10 + 97) : (char)(a + 48);
    }

    static Regex invalidCharacters = new Regex(@"[^a-z0-9\-]", RegexOptions.Compiled);
    static Regex multipleDashes = new Regex(@"\-+", RegexOptions.Compiled);
}

Related Articles


Last modified