The following are example code and scripts in C# and PowerShell to facilitate deployment and operations against the SQS Transport.
Requirements
If using PowerShell, the AWS Tools for PowerShell must be installed and properly configured.
For all operations that create resources in AWS the corresponding rights must be granted. For more information refer to the IAM policies guide.
QueueNameHelper
In C#
public static class QueueNameHelper
{
public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
{
if (string.IsNullOrWhiteSpace(destination))
{
throw new ArgumentNullException(nameof(destination));
}
var s = queueNamePrefix + destination;
if (s.Length > 80)
{
throw new ArgumentException(
$"Address {destination} with configured prefix {queueNamePrefix} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter endpoint name or a shorter queue name prefix.");
}
// SQS queue names can only have alphanumeric characters, hyphens and underscores.
// Any other characters will be replaced with a hyphen.
var skipCharacters = s.EndsWith(".fifo") ? 5 : 0;
var queueNameBuilder = new StringBuilder(s);
for (var i = 0; i < queueNameBuilder.Length - skipCharacters; ++i)
{
var c = queueNameBuilder[i];
if (!char.IsLetterOrDigit(c)
&& c != '-'
&& c != '_')
{
queueNameBuilder[i] = '-';
}
}
return queueNameBuilder.ToString();
}
}
The above QueueNameHelper
makes sure that queues follow the proper naming guidelines for SQS.
In PowerShell
Add-Type @'
using System;
public static class QueueNameHelper
{
public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
{
if (string.IsNullOrWhiteSpace(destination))
{
throw new ArgumentNullException("destination");
}
var s = queueNamePrefix + destination;
// SQS queue names can only have alphanumeric characters, hyphens and underscores.
// Any other characters will be replaced with a hyphen.
for (var i = 0; i < s.Length; ++i)
{
var c = s[i];
if (!char.IsLetterOrDigit(c)
&& c != '-'
&& c != '_')
{
s = s.Replace(c, '-');
}
}
if (s.Length > 80)
{
throw new Exception(
string.Format("Address {0} with configured prefix {1} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter queue name.", destination, queueNamePrefix));
}
return s;
}
}
'@
Native Send
The native send helper methods
A send involves the following actions:
- Create and serialize the payload including headers.
- Write a message body directly to SQS Transport.
In C#
public static async Task SendMessage(IAmazonSQS sqsClient, string queue, string messageBody, Dictionary<string, string> headers)
{
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
var base64Body = Convert.ToBase64String(bodyBytes);
var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
{
Headers = headers,
Body = base64Body,
});
var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue));
await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage);
}
In PowerShell
Function SendMessage
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageBody,
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[HashTable] $Headers
)
$bodyBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
$base64Body = [System.Convert]::ToBase64String($bodyBytes)
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
$serializedMessage = @{ Headers = $Headers; Body = $base64Body } | ConvertTo-Json
$queueUrl = Get-SQSQueueUrl $name
Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}
Using the native send helper methods
await SendMessage(
sqsClient: client,
queue: "samples-sqs-nativeintegration",
messageBody: "{Property:'PropertyValue'}",
headers: new Dictionary<string, string>
{
{"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
{"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
}
);
The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes
and MessageId
headers.
Native Send Large Messages
The native send helper methods
A send involves the following actions:
- Create an unique S3 key containing the
S3Prefix
and theMessageId
- Upload the body of the message to the S3 bucket
- Create and serialize the message with an empty payload including headers and the
S3BodyKey
. - Write a message directly to SQS Transport.
In C#
public static async Task SendLargeMessage(IAmazonSQS sqsClient, IAmazonS3 s3Client, string queue, string s3Prefix, string bucketName, string messageBody, Dictionary<string, string> headers)
{
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
var key = $"{s3Prefix}/{headers["NServiceBus.MessageId"]}";
using (var bodyStream = new MemoryStream(bodyBytes))
{
await s3Client.PutObjectAsync(new PutObjectRequest
{
BucketName = bucketName,
InputStream = bodyStream,
Key = key
});
}
var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
{
Headers = headers,
Body = string.Empty,
S3BodyKey = key
});
var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue));
await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage);
}
In PowerShell
Function SendLargeMessage
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $S3Prefix,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $BucketName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageBody,
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[HashTable] $Headers
)
$key = "$($s3Prefix)/$($Headers['NServiceBus.MessageId'])"
Write-S3Object -BucketName $BucketName -Key $key -Content $MessageBody -Force
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
$serializedMessage = @{ Headers = $Headers; Body = [System.String]::Empty; S3BodyKey = $key } | ConvertTo-Json
$queueUrl = Get-SQSQueueUrl $name
Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}
Using the native send helper methods
await SendLargeMessage(
sqsClient: client,
s3Client: s3Client,
queue: "samples-sqs-nativeintegration-large",
s3Prefix: "s3prefix",
bucketName: "bucketname",
messageBody: "{Property:'PropertyValue'}",
headers: new Dictionary<string, string>
{
{"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
{"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
}
);
The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes
and MessageId
headers.
The S3 bucket name and the S3 prefix must be provided as defined in the transport configuration of the endpoint that will be receiving the message.
Create queues
Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.
The create queue helper methods
In C#
public static class QueueCreationUtils
{
public static TimeSpan DefaultTimeToLive = TimeSpan.FromDays(4);
public static async Task CreateQueue(string queueName, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null)
{
using (var client = ClientFactory.CreateSqsClient())
{
var sqsRequest = new CreateQueueRequest
{
QueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix),
};
sqsRequest.Attributes.Add(QueueAttributeName.MessageRetentionPeriod, Convert.ToInt32((maxTimeToLive ?? DefaultTimeToLive).TotalSeconds).ToString());
if (queueName.EndsWith(".fifo"))
{
sqsRequest.Attributes.Add(QueueAttributeName.FifoQueue, "true");
sqsRequest.Attributes.Add(QueueAttributeName.DelaySeconds, "900");
}
try
{
var createQueueResponse = await client.CreateQueueAsync(sqsRequest);
}
catch (QueueNameExistsException)
{
}
}
}
}
In PowerShell
Function CreateQueue
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,
[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix,
[Parameter(Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[ValidateScript({ValidateMaxTimeToLive -MaxTimeToLive $_})]
[string] $MaxTimeToLive = "4:0:0:0"
)
$timeToLive = [System.Convert]::ToInt32([System.TimeSpan]::Parse($MaxTimeToLive, [System.Globalization.CultureInfo]::InvariantCulture).TotalSeconds).ToString()
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName, $QueueNamePrefix)
$attributes = @{ "MessageRetentionPeriod" = $timeToLive; }
if($name -like "*.fifo") {
$attributes.Add("FifoQueue", "true")
$attributes.Add("DelaySeconds", "900")
}
New-SQSQueue -QueueName $name -Attributes $attributes -Force
}
Function ValidateMaxTimeToLive {
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MaxTimeToLive
)
# Test MaxTimeToLive is valid
try {
$maxTimeToLive = [System.TimeSpan]::Parse($MaxTimeToLive, [System.Globalization.CultureInfo]::InvariantCulture)
return $true
}
catch [System.FormatException] {
Write-Warning "$MaxTimeToLive is not a valid TimeSpan"
return $false
}
}
In CloudFormation
public static class QueueCreationUtilsCloudFormation
{
static TimeSpan DefaultTimeToLive = TimeSpan.FromDays(4);
public static async Task CreateQueue(string queueName, string templatePath, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null)
{
using (var client = ClientFactory.CreateCloudFormationClient())
{
var sqsQueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix);
var request = new CreateStackRequest
{
StackName = sqsQueueName,
Parameters = new List<Parameter>
{
new Parameter
{
ParameterKey = "QueueName",
ParameterValue = sqsQueueName
},
new Parameter
{
ParameterKey = "MaxTimeToLive",
ParameterValue = Convert.ToInt32((maxTimeToLive ?? DefaultTimeToLive).TotalSeconds).ToString()
}
},
TemplateBody = CloudFormationHelper.ConvertToValidJson(templatePath)
};
await client.CreateStackAsync(request);
var describeRequest = new DescribeStacksRequest
{
StackName = sqsQueueName
};
StackStatus currentStatus = string.Empty;
while (currentStatus != StackStatus.CREATE_COMPLETE)
{
var response = await client.DescribeStacksAsync(describeRequest);
var stack = response.Stacks.SingleOrDefault();
currentStatus = stack?.StackStatus;
await Task.Delay(1000);
}
}
}
}
The template used
{
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {
"QueueName": {
"Type": "String",
"Description": "Enter queue name."
},
"MaxTimeToLive": {
"Type": "Number",
"Description": "Maximum time to live."
}
},
"Resources": {
"CreatedQueue": {
"Properties": {
"QueueName": { "Ref": "QueueName" },
"MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
},
"Type": "AWS::SQS::Queue"
}
},
"Outputs": {
"QueueName": {
"Description": "The name of the queue",
"Value": {
"Fn::GetAtt": [
"CreatedQueue",
"QueueName"
]
}
},
"QueueURL": {
"Description": "The URL of the queue",
"Value": {
"Ref": "CreatedQueue"
}
},
"QueueARN": {
"Description": "The ARN of the queue",
"Value": {
"Fn::GetAtt": [
"CreatedQueue",
"Arn"
]
}
}
}
}
Creating queues for an endpoint
To create all queues for a given endpoint name.
In C#
public static async Task CreateQueuesForEndpoint(string endpointName, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null, bool includeRetries = false, string delayedDeliveryMethod = "Native")
{
switch (delayedDeliveryMethod)
{
case "TimeoutManager":
// timeout dispatcher queue
// This queue is created first because it has the longest name.
// If the endpointName and queueNamePrefix are too long this call will throw and no queues will be created.
// In this event, a shorter value for endpointName or queueNamePrefix should be used.
await QueueCreationUtils.CreateQueue($"{endpointName}.TimeoutsDispatcher", maxTimeToLive, queueNamePrefix);
// timeout queue
await QueueCreationUtils.CreateQueue($"{endpointName}.Timeouts", maxTimeToLive, queueNamePrefix);
break;
case "UnrestrictedDelayedDelivery":
await QueueCreationUtils.CreateQueue($"{endpointName}-delay.fifo", maxTimeToLive, queueNamePrefix);
break;
}
// main queue
await QueueCreationUtils.CreateQueue(endpointName, maxTimeToLive, queueNamePrefix);
// retries queue
if (includeRetries)
{
await QueueCreationUtils.CreateQueue($"{endpointName}.Retries", maxTimeToLive, queueNamePrefix);
}
}
In PowerShell
Function CreateQueuesForEndpoint
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $EndpointName,
[Parameter(Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[ValidateScript({ValidateMaxTimeToLive -MaxTimeToLive $_})]
[string] $MaxTimeToLive = "4:0:0:0",
[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix,
[Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
[Switch] $IncludeRetries,
[Parameter(HelpMessage="'TimeoutManager' for timeout manager queues (V1), 'Native' for V2-V3, 'UnrestrictedDelayedDelivery' for unrestricted delayed delivery (V4 and higher)", Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[string] $DelayedDeliveryMethod = "Native"
)
switch($DelayedDeliveryMethod) {
"TimeoutManager" {
# timeout dispatcher queue
# This queue is created first because it has the longest name.
# If the endpointName and queueNamePrefix are too long this call will throw and no queues will be created.
# In this event, a shorter value for endpointName or queueNamePrefix should be used.
CreateQueue -QueueName "$EndpointName.TimeoutsDispatcher" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive
# timeout queue
CreateQueue -QueueName "$EndpointName.Timeouts" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive
}
"UnrestrictedDelayedDelivery" {
CreateQueue -QueueName "$EndpointName-delay.fifo" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive
}
}
# main queue
CreateQueue -QueueName $EndpointName -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive
# retries queue
if ($IncludeRetries) {
CreateQueue -QueueName "$EndpointName.Retries" -QueueNamePrefix $QueueNamePrefix -MaxTimeToLive $MaxTimeToLive
}
}
In CloudFormation
public static async Task CreateQueuesForEndpoint(string endpointName, string templatePath, TimeSpan? maxTimeToLive = null, string queueNamePrefix = null, bool includeRetries = false, string delayedDeliveryMethod = "Native")
{
using (var client = ClientFactory.CreateCloudFormationClient())
{
var endpointNameWithPrefix = QueueNameHelper.GetSqsQueueName(endpointName, queueNamePrefix);
var request = new CreateStackRequest
{
StackName = endpointNameWithPrefix,
Parameters = new List<Parameter>
{
new Parameter
{
ParameterKey = "EndpointName",
ParameterValue = endpointNameWithPrefix
},
new Parameter
{
ParameterKey = "MaxTimeToLive",
ParameterValue = Convert.ToInt32((maxTimeToLive ?? QueueCreationUtils.DefaultTimeToLive).TotalSeconds).ToString()
},
new Parameter
{
ParameterKey = "IncludeRetries",
ParameterValue = includeRetries.ToString()
},
new Parameter
{
ParameterKey = "DelayedDeliveryMethod",
ParameterValue = delayedDeliveryMethod
},
},
TemplateBody = CloudFormationHelper.ConvertToValidJson(templatePath)
};
await client.CreateStackAsync(request);
var describeRequest = new DescribeStacksRequest
{
StackName = endpointNameWithPrefix
};
StackStatus currentStatus = string.Empty;
while (currentStatus != StackStatus.CREATE_COMPLETE)
{
var response = await client.DescribeStacksAsync(describeRequest);
var stack = response.Stacks.SingleOrDefault();
currentStatus = stack?.StackStatus;
await Task.Delay(1000);
}
}
}
The template used
{
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {
"EndpointName": {
"Type": "String",
"Description": "Enter endpoint name."
},
"MaxTimeToLive": {
"Type": "Number",
"Description": "Maximum time to live."
},
"IncludeRetries": {
"Type": "String",
"Description": "Include Retries Queue."
},
"DelayedDeliveryMethod": {
"Type": "String",
"Description": "'TimeoutManager' for timeout manager queues (V1), 'Native' for V2-V3, 'UnrestrictedDelayedDelivery' for unrestricted delayed delivery (V4 and higher)"
}
},
"Conditions": {
"IncludeRetriesResources": {
"Fn::Equals": [
{ "Ref": "IncludeRetries" },
"True"
]
},
"IncludeTimeoutManagerResources": {
"Fn::Equals": [
{ "Ref": "DelayedDeliveryMethod" },
"TimeoutManager"
]
},
"IncludeUnrestrictedDelayedDeliveryResources": {
"Fn::Equals": [
{ "Ref": "DelayedDeliveryMethod" },
"UnrestrictedDelayedDelivery"
]
}
},
"Resources": {
"EndpointQueue": {
"Properties": {
"QueueName": { "Ref": "EndpointName" },
"MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
},
"Type": "AWS::SQS::Queue"
},
"TimeoutsQueue": {
"Properties": {
"QueueName": {
"Fn::Join": [
"-",
[
{ "Ref": "EndpointName" },
"Timeouts"
]
]
},
"MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
},
"Condition": "IncludeTimeoutManagerResources",
"Type": "AWS::SQS::Queue"
},
"TimeoutsDispatcherQueue": {
"Properties": {
"QueueName": {
"Fn::Join": [
"-",
[
{ "Ref": "EndpointName" },
"TimeoutsDispatcher"
]
]
},
"MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
},
"Condition": "IncludeTimeoutManagerResources",
"Type": "AWS::SQS::Queue"
},
"RetriesQueue": {
"Properties": {
"QueueName": {
"Fn::Join": [
"-",
[
{ "Ref": "EndpointName" },
"Retries"
]
]
},
"MessageRetentionPeriod": { "Ref": "MaxTimeToLive" }
},
"Condition": "IncludeRetriesResources",
"Type": "AWS::SQS::Queue"
},
"UnrestrictedDelayedDeliveryQueue": {
"Properties": {
"QueueName": {
"Fn::Join": [
"-",
[
{ "Ref": "EndpointName" },
"delay.fifo"
]
]
},
"MessageRetentionPeriod": { "Ref": "MaxTimeToLive" },
"FifoQueue": "true",
"DelaySeconds": "900"
},
"Condition": "IncludeUnrestrictedDelayedDeliveryResources",
"Type": "AWS::SQS::Queue"
}
},
"Outputs": {
"EndpointQueueURL": {
"Description": "The URL of the endpoint queue",
"Value": {
"Ref": "EndpointQueue"
}
},
"TimeoutsQueueURL": {
"Description": "The URL of the timeouts queue",
"Value": {
"Ref": "TimeoutsQueue"
},
"Condition": "IncludeTimeoutManagerResources"
},
"TimeoutsDispatcherQueueURL": {
"Description": "The URL of the timeouts dispatcher queue",
"Value": {
"Ref": "TimeoutsDispatcherQueue"
},
"Condition": "IncludeTimeoutManagerResources"
},
"RetriesQueueURL": {
"Description": "The URL of the rertries queue",
"Value": {
"Ref": "RetriesQueue"
},
"Condition": "IncludeRetriesResources"
},
"UnrestrictedDelayedDeliveryQueueURL": {
"Description": "The URL of unrestricted delayed delivery queue",
"Value": {
"Ref": "UnrestrictedDelayedDeliveryQueue"
},
"Condition": "IncludeUnrestrictedDelayedDeliveryResources"
}
}
}
Using the create endpoint queues helper methods
In C#
await CreateQueuesForEndpoint(
endpointName: "myendpoint",
maxTimeToLive: TimeSpan.FromDays(2),
queueNamePrefix: "PROD",
includeRetries: true /* required for V5 and below */);
In PowerShell
# For NServiceBus 6 Endpoints
CreateQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD" -MaxTimeToLive "2:0:0"
# For NServiceBus 5 and below Endpoints
CreateQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD" -MaxTimeToLive "2:0:0" -IncludeRetries
In CloudFormation
await CreateQueuesForEndpoint(
endpointName: "myendpoint",
templatePath: @".\CreateQueuesEndpoint.json",
maxTimeToLive: TimeSpan.FromDays(2),
queueNamePrefix: "PROD",
includeRetries: true /* required for V5 and below */);
To create shared queues
In C#
In PowerShell
In CloudFormation
Delete queues
The delete helper queue methods
In C#
public static async Task DeleteQueue(string queueName, string queueNamePrefix = null)
{
try
{
using (var client = ClientFactory.CreateSqsClient())
{
var sqsQueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix);
var queueUrlResponse = await client.GetQueueUrlAsync(sqsQueueName);
await client.DeleteQueueAsync(queueUrlResponse.QueueUrl);
}
}
catch (QueueDoesNotExistException)
{
}
}
In PowerShell
Function DeleteQueue
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,
[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix
)
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName, $QueueNamePrefix)
$queueUrl = Get-SQSQueueUrl $name
Remove-SQSQueue -QueueUrl $queueUrl -Force
}
In CloudFormation
public static class QueueDeletionUtilsCloudFormation
{
public static async Task DeleteQueue(string queueName, string queueNamePrefix = null)
{
using (var client = ClientFactory.CreateCloudFormationClient())
{
var sqsQueueName = QueueNameHelper.GetSqsQueueName(queueName, queueNamePrefix);
var request = new DeleteStackRequest
{
StackName = sqsQueueName,
};
await client.DeleteStackAsync(request);
var describeRequest = new DescribeStacksRequest
{
StackName = sqsQueueName
};
StackStatus currentStatus = string.Empty;
while (currentStatus != StackStatus.DELETE_IN_PROGRESS) // in progress is enough, no need to wait for completion
{
try
{
var response = await client.DescribeStacksAsync(describeRequest);
var stack = response.Stacks.SingleOrDefault();
currentStatus = stack?.StackStatus;
await Task.Delay(1000);
}
catch (AmazonCloudFormationException)
{
Console.WriteLine("Stack does not exist");
return;
}
}
}
}
}
To delete all queues for a given endpoint
In C#
public static async Task DeleteQueuesForEndpoint(string endpointName, string queueNamePrefix = null, bool includeRetries = false, string delayedDeliveryMethod = "Native")
{
switch (delayedDeliveryMethod)
{
case "TimeoutManager":
// timeout queue
await QueueDeletionUtils.DeleteQueue($"{endpointName}.Timeouts", queueNamePrefix);
// timeout dispatcher queue
await QueueDeletionUtils.DeleteQueue($"{endpointName}.TimeoutsDispatcher", queueNamePrefix);
break;
case "UnrestrictedDelayedDelivery":
await QueueDeletionUtils.DeleteQueue($"{endpointName}-delay.fifo", queueNamePrefix);
break;
}
// main queue
await QueueDeletionUtils.DeleteQueue(endpointName, queueNamePrefix);
// retries queue
if (includeRetries)
{
await QueueDeletionUtils.DeleteQueue($"{endpointName}.Retries", queueNamePrefix);
}
}
await DeleteQueuesForEndpoint(
endpointName: "myendpoint",
queueNamePrefix: "PROD",
includeRetries: true /* required for V5 and below */);
In PowerShell
Function DeleteQueuesForEndpoint
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $EndpointName,
[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix,
[Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
[Switch] $IncludeRetries,
[Parameter(HelpMessage="'TimeoutManager' for timeout manager queues (V1), 'Native' for V2-V3, 'UnrestrictedDelayedDelivery' for unrestricted delayed delivery (V4 and higher)", Mandatory=$false)]
[ValidateNotNullOrEmpty()]
[string] $DelayedDeliveryMethod = "Native"
)
switch($DelayedDeliveryMethod) {
"TimeoutManager" {
# timeout dispatcher queue
DeleteQueue -QueueName "$EndpointName.TimeoutsDispatcher" -QueueNamePrefix $QueueNamePrefix
# timeout queue
DeleteQueue -QueueName "$EndpointName.Timeouts" -QueueNamePrefix $QueueNamePrefix
}
"UnrestrictedDelayedDelivery" {
DeleteQueue -QueueName "$EndpointName-delay.fifo" -QueueNamePrefix $QueueNamePrefix
}
}
# main queue
DeleteQueue -QueueName $EndpointName -QueueNamePrefix $QueueNamePrefix
# retries queue
if ($IncludeRetries) {
DeleteQueue -QueueName "$EndpointName.Retries" -QueueNamePrefix $QueueNamePrefix
}
}
# For NServiceBus 6 Endpoints
DeleteQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD"
# For NServiceBus 5 and below Endpoints
DeleteQueuesForEndpoint -EndpointName "myendpoint" -QueueNamePrefix "PROD" -IncludeRetries
In CloudFormation
public static async Task DeleteQueuesForEndpoint(string endpointName, string queueNamePrefix = null)
{
using (var client = ClientFactory.CreateCloudFormationClient())
{
var endpointNameWithPrefix = QueueNameHelper.GetSqsQueueName(endpointName, queueNamePrefix);
var request = new DeleteStackRequest
{
StackName = endpointNameWithPrefix,
};
await client.DeleteStackAsync(request);
var describeRequest = new DescribeStacksRequest
{
StackName = endpointNameWithPrefix
};
StackStatus currentStatus = string.Empty;
while (currentStatus != StackStatus.DELETE_IN_PROGRESS) // in progress is enough, no need to wait for completion
{
try
{
var response = await client.DescribeStacksAsync(describeRequest);
var stack = response.Stacks.SingleOrDefault();
currentStatus = stack?.StackStatus;
await Task.Delay(1000);
}
catch (AmazonCloudFormationException)
{
Console.WriteLine("Stack does not exist");
return;
}
}
}
}
await DeleteQueuesForEndpoint(
endpointName: "myendpoint",
queueNamePrefix: "PROD");
To delete all queues with a given prefix
In C#
public static async Task DeleteAllQueues(string queueNamePrefix = null)
{
using (var client = ClientFactory.CreateSqsClient())
{
await DeleteQueues(client, queueNamePrefix);
}
}
static async Task DeleteQueues(IAmazonSQS client, string queueNamePrefix = null)
{
var queuesToDelete = await client.ListQueuesAsync(queueNamePrefix);
var numberOfQueuesFound = queuesToDelete.QueueUrls.Count;
var deletionTasks = new Task[numberOfQueuesFound + 1];
for (var i = 0; i < numberOfQueuesFound; i++)
{
deletionTasks[i] = RetryDeleteOnThrottle(client, queuesToDelete.QueueUrls[i], TimeSpan.FromSeconds(10), 6);
}
// queue deletion can take up to 60 seconds
deletionTasks[numberOfQueuesFound] = numberOfQueuesFound > 0 ? Task.Delay(TimeSpan.FromSeconds(60)) : Task.CompletedTask;
await Task.WhenAll(deletionTasks);
if (numberOfQueuesFound == 1000)
{
await DeleteQueues(client);
}
}
static async Task RetryDeleteOnThrottle(IAmazonSQS client, string queueUrl, TimeSpan delay, int maxRetryAttempts, int retryAttempts = 0)
{
try
{
await client.DeleteQueueAsync(queueUrl);
}
catch (AmazonServiceException ex) when (ex.ErrorCode == "AWS.SimpleQueueService.NonExistentQueue")
{
// ignore
}
catch (AmazonServiceException ex) when (ex.ErrorCode == "RequestThrottled")
{
if (retryAttempts < maxRetryAttempts)
{
var attempts = TimeSpan.FromMilliseconds(Convert.ToInt32(delay.TotalMilliseconds * (retryAttempts + 1)));
Console.WriteLine($"Retry {queueUrl} {retryAttempts}/{maxRetryAttempts} with delay {attempts}.");
await Task.Delay(attempts);
await RetryDeleteOnThrottle(client, queueUrl, delay, maxRetryAttempts, ++retryAttempts);
}
else
{
Console.WriteLine($"Unable to delete {queueUrl}. Max retry attempts reached.");
}
}
catch (AmazonServiceException ex)
{
Console.WriteLine($"Unable to delete {queueUrl} on {retryAttempts}/{maxRetryAttempts}. Reason: {ex.Message}");
}
}
In PowerShell
Function DeleteAllQueues
{
param(
[Parameter(Mandatory=$false)]
[string] $QueueNamePrefix
)
Get-SQSQueue $QueueNamePrefix | Remove-SQSQueue -Force
}
To delete shared queues
In C#
In PowerShell
In CloudFormation
Return message to source queue
The retry helper methods
A retry involves the following actions:
- Read a message from the error queue.
- Forward that message to another queue to be retried.
public static async Task ReturnMessageToSourceQueue(string errorQueueName, string messageId)
{
var path = QueueNameHelper.GetSqsQueueName(errorQueueName);
using (var client = ClientFactory.CreateSqsClient())
{
var queueUrlResponse = await client.GetQueueUrlAsync(path);
var queueUrl = queueUrlResponse.QueueUrl;
await InspectMessagesUntilFound(client, messageId, queueUrl);
}
}
static async Task InspectMessagesUntilFound(IAmazonSQS client, string messageId, string queueUrl)
{
var receivedMessages = await client.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = queueUrl,
WaitTimeSeconds = 20,
MaxNumberOfMessages = 10,
});
if (receivedMessages.Messages.Count == 0)
{
return;
}
var foundMessage = receivedMessages.Messages
.SingleOrDefault(message => message.MessageId == messageId);
if (foundMessage != null)
{
var failedQueueName = ReadFailedQueueHeader(foundMessage);
var failedQueueUrlResponse = await client.GetQueueUrlAsync(failedQueueName);
var failedQueueUrl = failedQueueUrlResponse.QueueUrl;
// Large message don't need to be handled separately since the S3BodyKey is preserved
await client.SendMessageAsync(new SendMessageRequest(failedQueueUrl, foundMessage.Body)); // what to do with the attributes?
await client.DeleteMessageAsync(queueUrl, foundMessage.ReceiptHandle);
return;
}
await InspectMessagesUntilFound(client, messageId, queueUrl);
}
static string ReadFailedQueueHeader(Message message)
{
var headers = ExtractHeaders(message);
var queueName = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
return QueueNameHelper.GetSqsQueueName(queueName);
}
static Dictionary<string, string> ExtractHeaders(Message message)
{
var transportMessage = JsonConvert.DeserializeObject<TransportMessage>(message.Body);
return transportMessage.Headers;
}
class TransportMessage
{
public Dictionary<string, string> Headers { get; set; }
public string Body { get; set; }
public string S3BodyKey { get; set; }
}
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
This example code will receive other messages from the error queue while it finds the desired message. All messages received by this code will be marked as invisible until the visibility timeout expires.
Using the retry helper methods
await ReturnMessageToSourceQueue(
errorQueueName: "error",
messageId: "c390a6fb-4fb5-46da-927d-a156f75739eb");