SQS Transport Scripting

Component: Amazon SQS Transport
NuGet Package NServiceBus.AmazonSQS (5.x)
Target NServiceBus Version: 7.x

The following are example code and scripts to facilitate deployment and operations against the SQS Transport.

Requirements

The PowerShell scripts require the PowerShell SDK installed and properly configured. For more information refer to the PowerShell Getting setup guide.

For all operations that create resources in AWS the corresponding rights must be granted. For more information refer to the IAM policies guide.

QueueNameHelper

In C#

public static class QueueNameHelper
{
    public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
    {
        if (string.IsNullOrWhiteSpace(destination))
        {
            throw new ArgumentNullException(nameof(destination));
        }

        var s = queueNamePrefix + destination;

        if (s.Length > 80)
        {
            throw new ArgumentException(
                $"Address {destination} with configured prefix {queueNamePrefix} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter endpoint name or a shorter queue name prefix.");
        }

        // SQS queue names can only have alphanumeric characters, hyphens and underscores.
        // Any other characters will be replaced with a hyphen.
        var skipCharacters = s.EndsWith(".fifo") ? 5 : 0;
        var queueNameBuilder = new StringBuilder(s);

        for (var i = 0; i < queueNameBuilder.Length - skipCharacters; ++i)
        {
            var c = queueNameBuilder[i];
            if (!char.IsLetterOrDigit(c)
                && c != '-'
                && c != '_')
            {
                queueNameBuilder[i] = '-';
            }
        }

        return queueNameBuilder.ToString();
    }
}

The above QueueNameHelper makes sure that queues follow the proper naming guidelines for SQS.

In PowerShell

Add-Type @'
    using System;

    public static class QueueNameHelper
    {
        public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
        {
            if (string.IsNullOrWhiteSpace(destination))
            {
                throw new ArgumentNullException("destination");
            }

            var s = queueNamePrefix + destination;

            // SQS queue names can only have alphanumeric characters, hyphens and underscores.
            // Any other characters will be replaced with a hyphen.
            for (var i = 0; i < s.Length; ++i)
            {
                var c = s[i];
                if (!char.IsLetterOrDigit(c)
                    && c != '-'
                    && c != '_')
                {
                    s = s.Replace(c, '-');
                }
            }

            if (s.Length > 80)
            {
                throw new Exception(
                    string.Format("Address {0} with configured prefix {1} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter queue name.", destination, queueNamePrefix));
            }

            return s;
        }
    }
'@

Native Send

The native send helper methods

A send involves the following actions:

  • Create and serialize the payload including headers.
  • Write a message body directly to SQS Transport.

In C#

public static async Task SendMessage(IAmazonSQS sqsClient, string queue, string messageBody, Dictionary<string, string> headers)
{
    var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
    var base64Body = Convert.ToBase64String(bodyBytes);
    var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
    {
        Headers = headers,
        Body = base64Body,
    });
    var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue))
        .ConfigureAwait(false);
    await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage)
        .ConfigureAwait(false);
}

In PowerShell

Function SendMessage
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $QueueName,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [HashTable] $Headers
    )

	$bodyBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
	$base64Body = [System.Convert]::ToBase64String($bodyBytes)
	[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
	$serializedMessage = @{ Headers = $Headers; Body =  $base64Body } | ConvertTo-Json
	$queueUrl = Get-SQSQueueUrl $name
	Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}

Using the native send helper methods

await SendMessage(
    sqsClient: client,
    queue: "samples-sqs-nativeintegration",
    messageBody: "{Property:'PropertyValue'}",
    headers: new Dictionary<string, string>
    {
        {"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
        {"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
    }
)
.ConfigureAwait(false);

The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes and MessageId headers.

Native Send Large Messages

The native send helper methods

A send involves the following actions:

  • Create an unique S3 key containing the S3Prefix and the MessageId
  • Upload the body of the message to the S3 bucket
  • Create and serialize the message with an empty payload including headers and the S3BodyKey.
  • Write a message directly to SQS Transport.

In C#

public static async Task SendLargeMessage(IAmazonSQS sqsClient, IAmazonS3 s3Client, string queue, string s3Prefix, string bucketName, string messageBody, Dictionary<string, string> headers)
{
    var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
    var key = $"{s3Prefix}/{headers["NServiceBus.MessageId"]}";
    using (var bodyStream = new MemoryStream(bodyBytes))
    {
        await s3Client.PutObjectAsync(new PutObjectRequest
        {
            BucketName = bucketName,
            InputStream = bodyStream,
            Key = key
        }).ConfigureAwait(false);
    }
    var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
    {
        Headers = headers,
        Body = string.Empty,
        S3BodyKey = key
    });
    var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue))
        .ConfigureAwait(false);
    await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage)
        .ConfigureAwait(false);
}

In PowerShell

Function SendLargeMessage
{
	param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $QueueName,

		[Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $S3Prefix,

		[Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $BucketName,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [HashTable] $Headers
    )
	
	$key = "$($s3Prefix)/$($Headers['NServiceBus.MessageId'])"
    Write-S3Object -BucketName $BucketName -Key $key -Content $MessageBody -Force
	[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
	$serializedMessage = @{ Headers = $Headers; Body =  [System.String]::Empty; S3BodyKey = $key } | ConvertTo-Json
	$queueUrl = Get-SQSQueueUrl $name
	Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}

Using the native send helper methods

await SendLargeMessage(
        sqsClient: client,
        s3Client: s3Client,
        queue: "samples-sqs-nativeintegration-large",
        s3Prefix: "s3prefix",
        bucketName: "bucketname",
        messageBody: "{Property:'PropertyValue'}",
        headers: new Dictionary<string, string>
        {
            {"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
            {"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
        }
    )
    .ConfigureAwait(false);

The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes and MessageId headers.

The S3 bucket name and the S3 prefix must be provided as defined in the transport configuration of the endpoint that will be receiving the message.

Create resources

In order to provision the resources required by an endpoint, use the sqs-transport command line (CLI) tool.

The tool can be obtained from NuGet and installed using the following command:

dotnet tool install -g NServiceBus.NServiceBus.AmazonSQS.CommandLine

Once installed, the sqs-transport command line tool will be available for use.

sqs-transport <command> [arguments] [options]

Available commands

  • endpoint create
  • endpoint add delay-delivery-support
  • endpoint add large-message-support
  • endpoint subscribe

sqs-transport endpoint create

Create a new endpoint using:

sqs-transport endpoint create [name]
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--retention]

arguments

name : Name of the endpoint (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --prefix: Prefix to prepend to the endpoint queue

-t | --retention: Retention Period in seconds (defaults to 345600)

sqs-transport endpoint add delay-delivery-support

Add delay delivery support to an endpoint using:

sqs-transport endpoint add [name] delay-delivery-support
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--prefix]
                              [--retention]

arguments

name : Name of the endpoint (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --prefix: Prefix to prepend to the delay delivery queue

-t | --retention: Retention period in seconds (defaults to 345600)

sqs-transport endpoint add large-message-support

Add large message support to an endpoint using:

sqs-transport endpoint add [name] large-message-support [bucket-name]
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--retention]

arguments

name : Name of the endpoint (required)

bucket-name : Name of the s3 bucket (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --key-prefix: S3 Key prefix to prepend to all bucket keys

-e | --expiration: Expiration time in days (defaults to 4)

sqs-transport endpoint subscribe event-type

Subscribe an endpoint to an event type using:

sqs-transport endpoint subscribe [name] [event-type]
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--prefix]

arguments

name : Name of the endpoint (required)

event-type : Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --prefix: Prefix to prepend to the topic provisioned for the event type and the subscribing queue

Delete resources

In order to de-provision the resources required by an endpoint, the sqs-transport command line (CLI) tool can be used.

Available commands

  • endpoint unsubscribe
  • endpoint remove delay-delivery-support
  • endpoint remove large-message-support
  • endpoint delete

sqs-transport endpoint unsubscribe

Unsubscribe an endpoint from an event type using:

sqs-transport endpoint unsubscribe [name] [event-type]
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--prefix]
                              [--remove-shared-resources]

arguments

name : Name of the endpoint (required)

event-type : Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --prefix: Prefix prepended to the topic provisioned for the event type and the subscribing queue

-f | --remove-shared-resources: Remove shared resources (the topic provisioned for the event type)

sqs-transport endpoint remove delay-delivery-support

Remove delay delivery support from an endpoint using:

sqs-transport endpoint remove [name] delay-delivery-support
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--prefix]

arguments

name : Name of the endpoint (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --prefix: Prefix to prepend to the delay delivery queue

sqs-transport endpoint remove large-message-support

Remove large message support from an endpoint using:

sqs-transport endpoint remove [name] large-message-support [bucket-name]
                              [--access-key-id]
                              [--secret]
                              [--region]
                              [--remove-shared-resources]

arguments

name : Name of the endpoint (required)

bucket-name : Name of the s3 bucket (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-e | --remove-shared-resources: Remove shared resources (S3 Bucket)

sqs-transport endpoint delete

Delete an existing endpoint using:

sqs-transport endpoint delete name
                              [--access-key-id]
                              [--secret]
                              [--region]

arguments

name : Name of the endpoint (required)

options

-i | --access-key-id : Overrides the environment variable 'AWS_ACCESS_KEY_ID'

-s | --secret : Overrides the environment variable 'AWS_REGION'

-r | --region: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'

-p | --prefix: Prefix to prepend to the endpoint queue

Return message to source queue

The retry helper methods

A retry involves the following actions:

  • Read a message from the error queue.
  • Forward that message to another queue to be retried.
public static async Task ReturnMessageToSourceQueue(string errorQueueName, string messageId)
{
    var path = QueueNameHelper.GetSqsQueueName(errorQueueName);
    using (var client = ClientFactory.CreateSqsClient())
    {
        var queueUrlResponse = await client.GetQueueUrlAsync(path)
            .ConfigureAwait(false);
        var queueUrl = queueUrlResponse.QueueUrl;

        await InspectMessagesUntilFound(client, messageId, queueUrl)
            .ConfigureAwait(false);
    }
}

static async Task InspectMessagesUntilFound(IAmazonSQS client, string messageId, string queueUrl)
{
    var receivedMessages = await client.ReceiveMessageAsync(new ReceiveMessageRequest
        {
            QueueUrl = queueUrl,
            WaitTimeSeconds = 20,
            MaxNumberOfMessages = 10,
        })
        .ConfigureAwait(false);

    if (receivedMessages.Messages.Count == 0)
    {
        return;
    }

    var foundMessage = receivedMessages.Messages
        .SingleOrDefault(message => message.MessageId == messageId);

    if (foundMessage != null)
    {
        var failedQueueName = ReadFailedQueueHeader(foundMessage);
        var failedQueueUrlResponse = await client.GetQueueUrlAsync(failedQueueName)
            .ConfigureAwait(false);
        var failedQueueUrl = failedQueueUrlResponse.QueueUrl;
        // Large message don't need to be handled separately since the S3BodyKey is preserved
        await client.SendMessageAsync(new SendMessageRequest(failedQueueUrl, foundMessage.Body)) // what to do with the attributes?
            .ConfigureAwait(false);
        await client.DeleteMessageAsync(queueUrl, foundMessage.ReceiptHandle)
            .ConfigureAwait(false);

        return;
    }

    await InspectMessagesUntilFound(client, messageId, queueUrl)
        .ConfigureAwait(false);
}

static string ReadFailedQueueHeader(Message message)
{
    var headers = ExtractHeaders(message);
    var queueName = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
    return QueueNameHelper.GetSqsQueueName(queueName);
}

static Dictionary<string, string> ExtractHeaders(Message message)
{
    var transportMessage = JsonConvert.DeserializeObject<TransportMessage>(message.Body);
    return transportMessage.Headers;
}

class TransportMessage
{
    public Dictionary<string, string> Headers { get; set; }

    public string Body { get; set; }

    public string S3BodyKey { get; set; }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}
This example code will receive other messages from the error queue while it finds the desired message. All messages received by this code will be marked as invisible until the visibility timeout expires.

Using the retry helper methods

await ReturnMessageToSourceQueue(
        errorQueueName: "error",
        messageId: "c390a6fb-4fb5-46da-927d-a156f75739eb")
    .ConfigureAwait(false);

Related Articles


Last modified