Azure Storage Queues Transport Configuration

Configuration parameters

The Azure Storage Queues Transport can be configured using the following parameters:

ConnectionString

Defaults: none

PeekInterval

The amount of time that the transport waits before polling the input queue, in milliseconds. Defaults: 50 ms

MaximumWaitTimeWhenIdle

In order to save money on the transaction operations, the transport optimizes wait times according to the expected load. The transport will back off when no messages can be found on the queue. The wait time will be increased linearly, but it will never exceed the value specified here, in milliseconds.

Defaults: 1000 ms (i.e. 1 second)

PurgeOnStartup

Instructs the transport to remove any existing messages from the input queue on startup.

Defaults: false, i.e. messages are not removed when endpoint starts.

MessageInvisibleTime

The visibilitytimeout mechanism, supported by Azure Storage Queues, causes the message to become invisible after read for a specified period of time. If the processing unit fails to delete the message in the specified time, the message will reappear on the queue. Then another process can retry the message.

Defaults: 30000 ms (i.e. 30 seconds)

BatchSize

The number of messages that the transport tries to pull at once from the storage queue. Depending on the expected load, the value should vary between 1 and 32 (the maximum).

Defaults:

  • 10 in Version 6 and below
  • 32 in Version 7

SerializeMessageWrapperWith

Messages are wrapped in a transport specific structure containing message metadata. By default, Azure Storage Queues Transport uses the same serializer for the message wrapper as configured for the contained message. It is possible to configure a different serializer for the wrapper using the SerializeMessageWrapperWith option

// serialize the messages using the XML serializer:
endpointConfiguration.UseSerialization<XmlSerializer>();
var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
// wrap messages in JSON
transport.SerializeMessageWrapperWith<JsonSerializer>();
All endpoints in the same system must use the same serializer for the message wrapper. This can be achieved by using the same serializer or the above SerializeMessageWrapperWith API.

DegreeOfReceiveParallelism

The number of parallel receive operations that the transport is issuing against the storage queue to pull messages out of it.

The value is dynamically calculated based on the endpoints message processing concurrency limit, using the following equation:

Degree of parallelism = square root of MaxConcurrency
MaxConcurrencyDegreeOfReceiveParallelism
11
103
204
507
10010 [default]
20014
100032 [max]

This means that DegreeOfReceiveParallelism message processing loops will receive up to the configured BatchSize number of messages in parallel. For example with the default BatchSize of 32 and the default degree of parallelism of 10 the transport will be able to receive 320 messages from the storage queue at the same time.

Changing the value of DegreeOfReceiveParallelism will influence the total number of storage operations against Azure Storage Services and can result in higher costs.
The values of BatchSize , DegreeOfParallelism, Concurrency, ServicePointManager Settings and the other parameters like MaximumWaitTimeWhenIdle have to be selected carefully in order to get the desired speed out of the transport while not exceeding the boundaries of the allowed number of operations per second.

Settings can be overridden only using configuration API:

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
transport.ConnectionString("DefaultEndpointsProtocol=https;AccountName=[ACCOUNT];AccountKey=[KEY];");
transport.BatchSize(20);
transport.MaximumWaitTimeWhenIdle(TimeSpan.FromSeconds(1));
transport.DegreeOfReceiveParallelism(16);
transport.PeekInterval(TimeSpan.FromMilliseconds(100));
transport.MessageInvisibleTime(TimeSpan.FromSeconds(30));

Connection strings

Note that multiple connection string formats apply when working with Azure storage services. When running against the emulated environment the format is UseDevelopmentStorage=true, but when running against a cloud hosted storage account the format is DefaultEndpointsProtocol=https;AccountName=myAccount;AccountKey=myKey;

For more details refer to Configuring Azure Connection Strings document.

Using aliases for connection strings to storage accounts

It is possible to accidentally leak sensitive information in the connection string for a storage account if it is not properly secured. For example the information can be leaked if an error occurs when communicating across untrusted boundaries, or if the error information is logged to an unsecured log file.

In order to prevent it only creating an alias for each connection string is allowed. The alias is mapped to the physical connection string, and connection strings are always referred to by their alias. In the event of an error or when logging only the alias can be accidentally leaked.

This feature can be enabled when configuring the AzureStorageQueueTransport:

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
transport.UseAccountAliasesInsteadOfConnectionStrings();

See also Using aliases instead of connection strings in respect to multinamespace support.

Shortening

If a queue name is longer than 63 characters, the Azure Storage Queues Transport uses a hashing algorithm to rename it. The default algorithm is MD5. In order to use SHA1, apply the following configuration:

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();
transport.UseSha1ForShortening();

Serialization

Azure Storage Queues Transport changes the default serializer to JSON. The serializer can be changed using the serialization API.

Custom envelope unwrapper

Azure Storage Queues lacks native header support. NServiceBus solves this by wrapping headers and message body in a custom envelope structure. This envelope is serialized using the configured serializer for the endpoint before being sent.

Creating this envelope can cause uneeded complexity should headers not be needed for example in native integration scenarios. For this reason Version 7.1 and above support configuring a custom envelop unwrapper.

The snippet below shows custom unwrapping logic that enables both NServiceBus formatted and plain JSON messages to be consumed.

7.1 NServiceBus.Azure.Transports.WindowsAzureStorageQueues
var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();

transport.UnwrapMessagesWith(cloudQueueMessage =>
{
    using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
    using (var streamReader = new StreamReader(stream))
    using (var textReader = new JsonTextReader(streamReader))
    {
        //try deserialize to a NServiceBus envelope first
        var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);

        if (wrapper.Id != null)
        {
            //this was a envelope message
            return wrapper;
        }

        //this was a native message just return the body as is with no headers
        return new MessageWrapper
        {
            Id = cloudQueueMessage.Id,
            Headers = new Dictionary<string, string>(),
            Body = cloudQueueMessage.AsBytes
        };
    }
});
This feature is currently NOT compatible with ServiceControl. A ServiceControl transport adapter is required in order to leverage both.

Related Articles


Last modified