Scripting

Component: Amazon SQS Transport
NuGet Package NServiceBus.AmazonSQS (4-pre)
Target NServiceBus Version: 7.x
This page targets a pre-release version and is subject to change prior to the final release.

The following are example code and scripts to facilitate deployment and operations against the SQS Transport.

Requirements

The PowerShell scripts require the PowerShell SDK installed and properly configured. For more information refer to the PowerShell Getting setup guide.

For all operations that create resources in AWS the corresponding rights must be granted. For more information refer to the IAM policies guide.

QueueNameHelper

In C#

public static class QueueNameHelper
{
    public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
    {
        if (string.IsNullOrWhiteSpace(destination))
        {
            throw new ArgumentNullException(nameof(destination));
        }

        var s = queueNamePrefix + destination;

        // SQS queue names can only have alphanumeric characters, hyphens and underscores.
        // Any other characters will be replaced with a hyphen.
        for (var i = 0; i < s.Length; ++i)
        {
            var c = s[i];
            if (!char.IsLetterOrDigit(c)
                && c != '-'
                && c != '_')
            {
                s = s.Replace(c, '-');
            }
        }

        if (s.Length > 80)
        {
            throw new ArgumentException(
                $"Address {destination} with configured prefix {queueNamePrefix} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter endpoint name or a shorter queue name prefix.");
        }

        return s;
    }
}

The above QueueNameHelper makes sure that queues follow the proper naming guidelines for SQS.

In PowerShell

Add-Type @'
    using System;

    public static class QueueNameHelper
    {
        public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
        {
            if (string.IsNullOrWhiteSpace(destination))
            {
                throw new ArgumentNullException("destination");
            }

            var s = queueNamePrefix + destination;

            // SQS queue names can only have alphanumeric characters, hyphens and underscores.
            // Any other characters will be replaced with a hyphen.
            for (var i = 0; i < s.Length; ++i)
            {
                var c = s[i];
                if (!char.IsLetterOrDigit(c)
                    && c != '-'
                    && c != '_')
                {
                    s = s.Replace(c, '-');
                }
            }

            if (s.Length > 80)
            {
                throw new Exception(
                    string.Format("Address {0} with configured prefix {1} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter queue name.", destination, queueNamePrefix));
            }

            return s;
        }
    }
'@

Native Send

The native send helper methods

A send involves the following actions:

  • Create and serialize the payload including headers.
  • Write a message body directly to SQS Transport.

In C#

public static async Task SendMessage(IAmazonSQS sqsClient, string queue, string messageBody, Dictionary<string, string> headers)
{
    var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
    var base64Body = Convert.ToBase64String(bodyBytes);
    var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
    {
        Headers = headers,
        Body = base64Body,
    });
    var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue))
        .ConfigureAwait(false);
    await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage)
        .ConfigureAwait(false);
}

In PowerShell

Function SendMessage
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $QueueName,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [HashTable] $Headers
    )

	$bodyBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
	$base64Body = [System.Convert]::ToBase64String($bodyBytes)
	[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
	$serializedMessage = @{ Headers = $Headers; Body =  $base64Body } | ConvertTo-Json
	$queueUrl = Get-SQSQueueUrl $name
	Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}

Using the native send helper methods

await SendMessage(
    sqsClient: client,
    queue: "samples-sqs-nativeintegration",
    messageBody: "{Property:'PropertyValue'}",
    headers: new Dictionary<string, string>
    {
        {"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
        {"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
    }
)
.ConfigureAwait(false);

The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes and MessageId headers.

Native Send Large Messages

The native send helper methods

A send involves the following actions:

  • Create an unique S3 key containing the S3Prefix and the MessageId
  • Upload the body of the message to the S3 bucket
  • Create and serialize the message with an empty payload including headers and the S3BodyKey.
  • Write a message directly to SQS Transport.

In C#

public static async Task SendLargeMessage(IAmazonSQS sqsClient, IAmazonS3 s3Client, string queue, string s3Prefix, string bucketName, string messageBody, Dictionary<string, string> headers)
{
    var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
    var key = $"{s3Prefix}/{headers["NServiceBus.MessageId"]}";
    using (var bodyStream = new MemoryStream(bodyBytes))
    {
        await s3Client.PutObjectAsync(new PutObjectRequest
        {
            BucketName = bucketName,
            InputStream = bodyStream,
            Key = key
        }).ConfigureAwait(false);
    }
    var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
    {
        Headers = headers,
        Body = string.Empty,
        S3BodyKey = key
    });
    var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue))
        .ConfigureAwait(false);
    await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage)
        .ConfigureAwait(false);
}

In PowerShell

Function SendLargeMessage
{
	param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $QueueName,

		[Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $S3Prefix,

		[Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $BucketName,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [HashTable] $Headers
    )
	
	$key = "$($s3Prefix)/$($Headers['NServiceBus.MessageId'])"
    Write-S3Object -BucketName $BucketName -Key $key -Content $MessageBody -Force
	[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
	$serializedMessage = @{ Headers = $Headers; Body =  [System.String]::Empty; S3BodyKey = $key } | ConvertTo-Json
	$queueUrl = Get-SQSQueueUrl $name
	Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}

Using the native send helper methods

await SendLargeMessage(
        sqsClient: client,
        s3Client: s3Client,
        queue: "samples-sqs-nativeintegration-large",
        s3Prefix: "s3prefix",
        bucketName: "bucketname",
        messageBody: "{Property:'PropertyValue'}",
        headers: new Dictionary<string, string>
        {
            {"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
            {"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
        }
    )
    .ConfigureAwait(false);

The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes and MessageId headers.

The S3 bucket name and the S3 prefix must be provided as defined in the transport configuration of the endpoint that will be receiving the message.

Create queues

Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.

The create queue helper methods

In C#

public static class QueueCreationUtils
{
    public static TimeSpan DefaultTimeToLive = TimeSpan.FromDays(4);

    public static async Task CreateQueue(string queueName, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null)
    {
        using (var client = ClientFactory.CreateSqsClient())
        {
            var sqsRequest = new CreateQueueRequest
            {
                QueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix)
            };
            var value = Convert.ToInt32((maxTimeToLive ?? DefaultTimeToLive).TotalSeconds).ToString();
            sqsRequest.Attributes.Add(QueueAttributeName.MessageRetentionPeriod, value);
            try
            {
                await client.CreateQueueAsync(sqsRequest)
                    .ConfigureAwait(false);
            }
            catch (QueueNameExistsException)
            {
            }
        }
    }
}

In PowerShell

Function CreateQueue
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,

[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix,

[Parameter(Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[ValidateScript({ValidateMaxTimeToLive -MaxTimeToLive $_})]
[string] $MaxTimeToLive = "4:0:0:0"
)
$timeToLive = [System.Convert]::ToInt32([System.TimeSpan]::Parse($MaxTimeToLive, [System.Globalization.CultureInfo]::InvariantCulture).TotalSeconds).ToString()
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName, $QueueNamePrefix)
New-SQSQueue -QueueName $name -Attributes @{"MessageRetentionPeriod" = $timeToLive;} -Force
}

Function ValidateMaxTimeToLive {
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MaxTimeToLive
)

# Test MaxTimeToLive is valid
  
try {
$maxTimeToLive = [System.TimeSpan]::Parse($MaxTimeToLive, [System.Globalization.CultureInfo]::InvariantCulture)
return $true
}
catch [System.FormatException] {
Write-Warning "$MaxTimeToLive is not a valid TimeSpan"
return $false
}
}

In CloudFormation

public static class QueueCreationUtilsCloudFormation
{
    static TimeSpan DefaultTimeToLive = TimeSpan.FromDays(4);

    public static async Task CreateQueue(string queueName, string templatePath, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null)
    {
        using (var client = ClientFactory.CreateCloudFormationClient())
        {
            var sqsQueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix);
            var request = new CreateStackRequest
            {
                StackName = sqsQueueName,
                Parameters = new List<Parameter>
                {
                    new Parameter
                    {
                        ParameterKey = "QueueName",
                        ParameterValue = sqsQueueName
                    },
                    new Parameter
                    {
                        ParameterKey = "MaxTimeToLive",
                        ParameterValue = Convert.ToInt32((maxTimeToLive ?? DefaultTimeToLive).TotalSeconds).ToString()
                    }
                },
                TemplateBody = File.ReadAllText(templatePath)
            };

            await client.CreateStackAsync(request)
                .ConfigureAwait(false);

            var describeRequest = new DescribeStacksRequest
            {
                StackName = sqsQueueName
            };
            StackStatus currentStatus = string.Empty;
            while (currentStatus != StackStatus.CREATE_COMPLETE)
            {
                var response = await client.DescribeStacksAsync(describeRequest)
                    .ConfigureAwait(false);
                var stack = response.Stacks.SingleOrDefault();
                currentStatus = stack?.StackStatus;
            }
        }
    }
}

The template used

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Parameters": {
    "QueueName": {
      "Type": "String",
      "Description": "Enter queue name."
    },
    "MaxTimeToLive": {
      "Type": "Number",
      "Description": "Maximum time to live."
    }
  },
  "Resources": {
    "CreatedQueue": {
      "Properties": {
        "QueueName": { "Ref": "QueueName" },
        "MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
      },
      "Type": "AWS::SQS::Queue"
    }
  },
  "Outputs": {
    "QueueName": {
      "Description": "The name of the queue",
      "Value": {
        "Fn::GetAtt": [
          "CreatedQueue",
          "QueueName"
        ]
      }
    },
    "QueueURL": {
      "Description": "The URL of the queue",
      "Value": {
        "Ref": "CreatedQueue"
      }
    },
    "QueueARN": {
      "Description": "The ARN of the queue",
      "Value": {
        "Fn::GetAtt": [
          "CreatedQueue",
          "Arn"
        ]
      }
    }
  }
}

Creating queues for an endpoint

To create all queues for a given endpoint name.

In C#

public static async Task CreateQueuesForEndpoint(string endpointName, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null, bool includeRetries = false)
{
    // timeout dispatcher queue
    // This queue is created first because it has the longest name. 
    // If the endpointName and queueNamePrefix are too long this call will throw and no queues will be created. 
    // In this event, a shorter value for endpointName or queueNamePrefix should be used.
    await QueueCreationUtils.CreateQueue($"{endpointName}.TimeoutsDispatcher", maxTimeToLive, queueNamePrefix)
        .ConfigureAwait(false);

    // main queue
    await QueueCreationUtils.CreateQueue(endpointName, maxTimeToLive, queueNamePrefix)
        .ConfigureAwait(false);

    // timeout queue
    await QueueCreationUtils.CreateQueue($"{endpointName}.Timeouts", maxTimeToLive, queueNamePrefix)
        .ConfigureAwait(false);
    
    // retries queue
    if (includeRetries)
    {
        await QueueCreationUtils.CreateQueue($"{endpointName}.Retries", maxTimeToLive, queueNamePrefix)
            .ConfigureAwait(false);
    }
}

In PowerShell

Function CreateQueuesForEndpoint
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $EndpointName,

[Parameter(Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[ValidateScript({ValidateMaxTimeToLive -MaxTimeToLive $_})]
[string] $MaxTimeToLive = "4:0:0:0",

[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix,

[Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
[Switch] $IncludeRetries
)

# timeout dispatcher queue
# This queue is created first because it has the longest name. 
# If the endpointName and queueNamePrefix are too long this call will throw and no queues will be created. 
# In this event, a shorter value for endpointName or queueNamePrefix should be used.
CreateQueue -QueueName "$EndpointName.TimeoutsDispatcher" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive

# main queue
CreateQueue -QueueName $EndpointName -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive

# timeout queue
CreateQueue -QueueName "$EndpointName.Timeouts" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive

# retries queue
if ($IncludeRetries) {
CreateQueue -QueueName "$EndpointName.Retries" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive
}
}

In CloudFormation

public static async Task CreateQueuesForEndpoint(string endpointName, string templatePath, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null, bool includeRetries = false)
{
    using (var client = ClientFactory.CreateCloudFormationClient())
    {
        var endpointNameWithPrefix = QueueNameHelper.GetSqsQueueName(endpointName, queueNamePrefix);
        var request = new CreateStackRequest
        {
            StackName = endpointNameWithPrefix,
            Parameters = new List<Parameter>
            {
                new Parameter
                {
                    ParameterKey = "EndpointName",
                    ParameterValue = endpointNameWithPrefix
                },
                new Parameter
                {
                    ParameterKey = "MaxTimeToLive",
                    ParameterValue = Convert.ToInt32((maxTimeToLive ?? QueueCreationUtils.DefaultTimeToLive).TotalSeconds).ToString()
                },
                new Parameter
                {
                    ParameterKey = "IncludeRetries",
                    ParameterValue = includeRetries.ToString()
                },
            },
            TemplateBody = File.ReadAllText(templatePath)
        };

        await client.CreateStackAsync(request)
            .ConfigureAwait(false);

        var describeRequest = new DescribeStacksRequest
        {
            StackName = endpointNameWithPrefix
        };
        StackStatus currentStatus = string.Empty;
        while (currentStatus != StackStatus.CREATE_COMPLETE)
        {
            var response = await client.DescribeStacksAsync(describeRequest)
                .ConfigureAwait(false);
            var stack = response.Stacks.SingleOrDefault();
            currentStatus = stack?.StackStatus;
        }
    }
}

The template used

{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Parameters": {
    "EndpointName": {
      "Type": "String",
      "Description": "Enter endpoint name."
    },
    "MaxTimeToLive": {
      "Type": "Number",
      "Description": "Maximum time to live."
    },
    "IncludeRetries": {
      "Type": "String",
      "Description": "Include Retries Queue."
    }
  },
  "Conditions": {
    "IncludeRetriesResources": {
      "Fn::Equals": [
        { "Ref": "IncludeRetries" },
        "True"
      ]
    }
  },
  "Resources": {
    "EndpointQueue": {
      "Properties": {
        "QueueName": { "Ref": "EndpointName" },
        "MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
      },
      "Type": "AWS::SQS::Queue"
    },
    "TimeoutsQueue": {
      "Properties": {
        "QueueName": {
          "Fn::Join": [
            "-",
            [
              { "Ref": "EndpointName" },
              "Timeouts"
            ]
          ]
        },
        "MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
      },
      "Type": "AWS::SQS::Queue"
    },
    "TimeoutsDispatcherQueue": {
      "Properties": {
        "QueueName": {
          "Fn::Join": [
            "-",
            [
              { "Ref": "EndpointName" },
              "TimeoutsDispatcher"
            ]
          ]
        },
        "MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
      },
      "Type": "AWS::SQS::Queue"
    },
    "RetriesQueue": {
      "Properties": {
        "QueueName": {
          "Fn::Join": [
            "-",
            [
              { "Ref": "EndpointName" },
              "Retries"
            ]
          ]
        },
        "MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
      },
      "Condition": "IncludeRetriesResources",
      "Type": "AWS::SQS::Queue"
    }
  },
  "Outputs": {
    "EndpointQueueURL": {
      "Description": "The URL of the endpoint queue",
      "Value": {
        "Ref": "EndpointQueue"
      }
    },
    "TimeoutsQueueURL": {
      "Description": "The URL of the timeouts queue",
      "Value": {
        "Ref": "TimeoutsQueue"
      }
    },
    "TimeoutsDispatcherQueueURL": {
      "Description": "The URL of the timeouts dispatcher queue",
      "Value": {
        "Ref": "TimeoutsDispatcherQueue"
      }
    },
    "RetriesQueueURL": {
      "Description": "The URL of the rertries queue",
      "Value": {
        "Ref": "RetriesQueue"
      }
    }
  }
}

Using the create endpoint queues helper methods

In C#

await CreateQueuesForEndpoint(
        endpointName: "myendpoint",
        maxTimeToLive: TimeSpan.FromDays(2),
        queueNamePrefix: "PROD",
        includeRetries: true /* required for V5 and below */)
    .ConfigureAwait(false);

In PowerShell

# For NServiceBus 6 Endpoints
CreateQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD" -MaxTimeToLive "2:0:0"

# For NServiceBus 5 and below Endpoints
CreateQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD" -MaxTimeToLive "2:0:0" -IncludeRetries

In CloudFormation

await CreateQueuesForEndpoint(
        endpointName: "myendpoint",
        templatePath: @".\CreateQueuesEndpoint.json",
        maxTimeToLive: TimeSpan.FromDays(2),
        queueNamePrefix: "PROD",
        includeRetries: true /* required for V5 and below */)
    .ConfigureAwait(false);

To create shared queues

In C#

await QueueCreationUtils.CreateQueue(
        queueName: "error",
        maxTimeToLive: TimeSpan.FromDays(2),
        queueNamePrefix: "PROD")
    .ConfigureAwait(false);

await QueueCreationUtils.CreateQueue(
        queueName: "audit",
        maxTimeToLive: TimeSpan.FromDays(2),
        queueNamePrefix: "PROD")
    .ConfigureAwait(false);

In PowerShell

CreateQueue -QueueName "error" -QueueNamePrefix "PROD" -MaxTimeToLive "2:0:0"
CreateQueue -QueueName "audit" -QueueNamePrefix "PROD" -MaxTimeToLive "2:0:0"

In CloudFormation

await QueueCreationUtilsCloudFormation.CreateQueue(
        queueName: "error",
        templatePath: @".\QueueCreation.json",
        maxTimeToLive: TimeSpan.FromDays(2),
        queueNamePrefix: "PROD")
    .ConfigureAwait(false);

await QueueCreationUtilsCloudFormation.CreateQueue(
        queueName: "audit",
        templatePath: @".\QueueCreation.json",
        maxTimeToLive: TimeSpan.FromDays(2),
        queueNamePrefix: "PROD")
    .ConfigureAwait(false);

Delete queues

The delete helper queue methods

In C#

public static async Task DeleteQueue(string queueName, string queueNamePrefix = null)
{
    try
    {
        using (var client = ClientFactory.CreateSqsClient())
        {
            var sqsQueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix);
            var queueUrlResponse = await client.GetQueueUrlAsync(sqsQueueName)
                .ConfigureAwait(false);
            await client.DeleteQueueAsync(queueUrlResponse.QueueUrl)
                .ConfigureAwait(false);
        }
    }
    catch (QueueDoesNotExistException)
    {
    }
}

In PowerShell

Function DeleteQueue
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,

[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix
)
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName, $QueueNamePrefix)
$queueUrl = Get-SQSQueueUrl $name
Remove-SQSQueue -QueueUrl $queueUrl -Force
}

In CloudFormation

public static class QueueDeletionUtilsCloudFormation
{
    public static async Task DeleteQueue(string queueName, string queueNamePrefix = null)
    {
        using (var client = ClientFactory.CreateCloudFormationClient())
        {
            var sqsQueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix);

            var request = new DeleteStackRequest
            {
                StackName = sqsQueueName,
            };


            await client.DeleteStackAsync(request)
                .ConfigureAwait(false);

            var describeRequest = new DescribeStacksRequest
            {
                StackName = sqsQueueName
            };
            StackStatus currentStatus = string.Empty;
            while (currentStatus != StackStatus.DELETE_COMPLETE)
            {
                try
                {
                    var response = await client.DescribeStacksAsync(describeRequest)
                        .ConfigureAwait(false);
                    var stack = response.Stacks.SingleOrDefault();
                    currentStatus = stack?.StackStatus;
                }
                catch (AmazonCloudFormationException)
                {
                    Console.WriteLine("Stack does not exist");
                    return;
                }
            }
        }
    }
}

To delete all queues for a given endpoint

In C#

public static async Task DeleteQueuesForEndpoint(string endpointName, string queueNamePrefix = null, bool includeRetries = false)
{
    // main queue
    await QueueDeletionUtils.DeleteQueue(endpointName, queueNamePrefix)
        .ConfigureAwait(false);

    // timeout queue
    await QueueDeletionUtils.DeleteQueue($"{endpointName}.Timeouts", queueNamePrefix)
        .ConfigureAwait(false);

    // timeout dispatcher queue
    await QueueDeletionUtils.DeleteQueue($"{endpointName}.TimeoutsDispatcher", queueNamePrefix)
        .ConfigureAwait(false);

    // retries queue
    if (includeRetries)
    {
        await QueueDeletionUtils.DeleteQueue($"{endpointName}.Retries", queueNamePrefix)
            .ConfigureAwait(false);
    }
}
await DeleteQueuesForEndpoint(
    endpointName: "myendpoint", 
    queueNamePrefix: "PROD",
    includeRetries: true /* required for V5 and below */)
.ConfigureAwait(false);

In PowerShell

Function DeleteQueuesForEndpoint
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $EndpointName,

[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix,

[Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
[Switch] $IncludeRetries
)

# main queue
DeleteQueue -QueueName $EndpointName -QueueNamePrefix $QueueNamePrefix

# timeout queue
DeleteQueue -QueueName "$EndpointName.Timeouts" -QueueNamePrefix $QueueNamePrefix

# timeout dispatcher queue
DeleteQueue -QueueName "$EndpointName.TimeoutsDispatcher" -QueueNamePrefix $QueueNamePrefix

# retries queue
if ($IncludeRetries) {
DeleteQueue -QueueName "$EndpointName.Retries" -QueueNamePrefix $QueueNamePrefix
}
}
# For NServiceBus 6 Endpoints
DeleteQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD"

# For NServiceBus 5 and below Endpoints
DeleteQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD" -IncludeRetries

In CloudFormation

public static async Task DeleteQueuesForEndpoint(string endpointName, string queueNamePrefix = null)
{
    using (var client = ClientFactory.CreateCloudFormationClient())
    {
        var endpointNameWithPrefix = QueueNameHelper.GetSqsQueueName(endpointName, queueNamePrefix);

        var request = new DeleteStackRequest
        {
            StackName = endpointNameWithPrefix,
        };


        await client.DeleteStackAsync(request)
            .ConfigureAwait(false);


        var describeRequest = new DescribeStacksRequest
        {
            StackName = endpointNameWithPrefix
        };
        StackStatus currentStatus = string.Empty;
        while (currentStatus != StackStatus.DELETE_COMPLETE)
        {
            try
            {
                var response = await client.DescribeStacksAsync(describeRequest)
                    .ConfigureAwait(false);
                var stack = response.Stacks.SingleOrDefault();
                currentStatus = stack?.StackStatus;
            }
            catch (AmazonCloudFormationException)
            {
                Console.WriteLine("Stack does not exist");
                return;
            }
        }
    }
}
await DeleteQueuesForEndpoint(
        endpointName: "myendpoint",
        queueNamePrefix: "PROD")
    .ConfigureAwait(false);

To delete all queues with a given prefix

In C#

public static async Task DeleteAllQueues(string queueNamePrefix = null)
{
    using (var client = ClientFactory.CreateSqsClient())
    {
        await DeleteQueues(client, queueNamePrefix)
            .ConfigureAwait(false);
    }
}

static async Task DeleteQueues(IAmazonSQS client, string queueNamePrefix = null)
{
    var queuesToDelete = await client.ListQueuesAsync(queueNamePrefix)
        .ConfigureAwait(false);
    var numberOfQueuesFound = queuesToDelete.QueueUrls.Count;
    var deletionTasks = new Task[numberOfQueuesFound + 1];

    for (var i = 0; i < numberOfQueuesFound; i++)
    {
        deletionTasks[i] = RetryDeleteOnThrottle(client, queuesToDelete.QueueUrls[i], TimeSpan.FromSeconds(10), 6);
    }

    // queue deletion can take up to 60 seconds
    deletionTasks[numberOfQueuesFound] = numberOfQueuesFound > 0 ? Task.Delay(TimeSpan.FromSeconds(60)) : Task.FromResult(0);

    await Task.WhenAll(deletionTasks)
        .ConfigureAwait(false);

    if (numberOfQueuesFound == 1000)
    {
        await DeleteQueues(client)
            .ConfigureAwait(false);
    }
}

static async Task RetryDeleteOnThrottle(IAmazonSQS client, string queueUrl, TimeSpan delay, int maxRetryAttempts, int retryAttempts = 0)
{
    try
    {
        await client.DeleteQueueAsync(queueUrl).ConfigureAwait(false);
    }
    catch (AmazonServiceException ex) when (ex.ErrorCode == "AWS.SimpleQueueService.NonExistentQueue")
    {
        // ignore
    }
    catch (AmazonServiceException ex) when (ex.ErrorCode == "RequestThrottled")
    {
        if (retryAttempts < maxRetryAttempts)
        {
            var attempts = TimeSpan.FromMilliseconds(Convert.ToInt32(delay.TotalMilliseconds * (retryAttempts + 1)));
            Console.WriteLine($"Retry {queueUrl} {retryAttempts}/{maxRetryAttempts} with delay {attempts}.");
            await Task.Delay(attempts).ConfigureAwait(false);
            await RetryDeleteOnThrottle(client, queueUrl, delay, maxRetryAttempts, ++retryAttempts).ConfigureAwait(false);
        }
        else
        {
            Console.WriteLine($"Unable to delete {queueUrl}. Max retry attempts reached.");
        }
    }
    catch (AmazonServiceException ex)
    {
        Console.WriteLine($"Unable to delete {queueUrl} on {retryAttempts}/{maxRetryAttempts}. Reason: {ex.Message}");
    }
}

In PowerShell

Function DeleteAllQueues
{
param(

[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix

)
Get-SQSQueue $QueueNamePrefix | Remove-SQSQueue -Force
}

To delete shared queues

In C#

await QueueDeletionUtils.DeleteQueue(queueName: "error", queueNamePrefix: "PROD")
    .ConfigureAwait(false);
await QueueDeletionUtils.DeleteQueue(queueName: "audit", queueNamePrefix: "PROD")
    .ConfigureAwait(false);

In PowerShell

DeleteQueue -QueueName "error" -QueueNamePrefix "PROD"
DeleteQueue -QueueName "audit" -QueueNamePrefix "PROD"

In CloudFormation

await QueueDeletionUtilsCloudFormation.DeleteQueue(queueName: "error", queueNamePrefix: "PROD")
    .ConfigureAwait(false);
await QueueDeletionUtilsCloudFormation.DeleteQueue(queueName: "audit", queueNamePrefix: "PROD")
    .ConfigureAwait(false);

Return message to source queue

The retry helper methods

A retry involves the following actions:

  • Read a message from the error queue.
  • Forward that message to another queue to be retried.
public static async Task ReturnMessageToSourceQueue(string errorQueueName, string messageId)
{
    var path = QueueNameHelper.GetSqsQueueName(errorQueueName);
    using (var client = ClientFactory.CreateSqsClient())
    {
        var queueUrlResponse = await client.GetQueueUrlAsync(path)
            .ConfigureAwait(false);
        var queueUrl = queueUrlResponse.QueueUrl;

        await InspectMessagesUntilFound(client, messageId, queueUrl)
            .ConfigureAwait(false);
    }
}

static async Task InspectMessagesUntilFound(IAmazonSQS client, string messageId, string queueUrl)
{
    var receivedMessages = await client.ReceiveMessageAsync(new ReceiveMessageRequest
        {
            QueueUrl = queueUrl,
            WaitTimeSeconds = 20,
            MaxNumberOfMessages = 10,
        })
        .ConfigureAwait(false);

    if (receivedMessages.Messages.Count == 0)
    {
        return;
    }

    var foundMessage = receivedMessages.Messages
        .SingleOrDefault(message => message.MessageId != messageId);

    if (foundMessage != null)
    {
        var failedQueueName = ReadFailedQueueHeader(foundMessage);
        var failedQueueUrlResponse = await client.GetQueueUrlAsync(failedQueueName)
            .ConfigureAwait(false);
        var failedQueueUrl = failedQueueUrlResponse.QueueUrl;
        // Large message don't need to be handled separately since the S3BodyKey is preserved
        await client.SendMessageAsync(new SendMessageRequest(failedQueueUrl, foundMessage.Body)) // what to do with the attributes?
            .ConfigureAwait(false);
        await client.DeleteMessageAsync(queueUrl, foundMessage.ReceiptHandle)
            .ConfigureAwait(false);

        return;
    }

    await InspectMessagesUntilFound(client, messageId, queueUrl)
        .ConfigureAwait(false);
}

static string ReadFailedQueueHeader(Message message)
{
    var headers = ExtractHeaders(message);
    var queueName = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
    return QueueNameHelper.GetSqsQueueName(queueName);
}

static Dictionary<string, string> ExtractHeaders(Message message)
{
    var transportMessage = JsonConvert.DeserializeObject<TransportMessage>(message.Body);
    return transportMessage.Headers;
}

class TransportMessage
{
    public Dictionary<string, string> Headers { get; set; }

    public string Body { get; set; }

    public string S3BodyKey { get; set; }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}
This example code will receive other messages from the error queue while it finds the desired message. All messages received by this code will be marked as invisible until the visibility timeout expires.

Using the retry helper methods

await ReturnMessageToSourceQueue(
        errorQueueName: "error",
        messageId: "c390a6fb-4fb5-46da-927d-a156f75739eb")
    .ConfigureAwait(false);

Related Articles


Last modified