The following are example code and scripts in C# and PowerShell to facilitate deployment and operations against the SQS Transport.
Requirements
If using PowerShell, the AWS Tools for PowerShell must be installed and properly configured.
For all operations that create resources in AWS the corresponding rights must be granted. For more information refer to the IAM policies guide.
QueueNameHelper
In C#
public static class QueueNameHelper
{
public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
{
if (string.IsNullOrWhiteSpace(destination))
{
throw new ArgumentNullException(nameof(destination));
}
var s = queueNamePrefix + destination;
if (s.Length > 80)
{
throw new ArgumentException(
$"Address {destination} with configured prefix {queueNamePrefix} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter endpoint name or a shorter queue name prefix.");
}
// SQS queue names can only have alphanumeric characters, hyphens and underscores.
// Any other characters will be replaced with a hyphen.
var skipCharacters = s.EndsWith(".fifo") ? 5 : 0;
var queueNameBuilder = new StringBuilder(s);
for (var i = 0; i < queueNameBuilder.Length - skipCharacters; ++i)
{
var c = queueNameBuilder[i];
if (!char.IsLetterOrDigit(c)
&& c != '-'
&& c != '_')
{
queueNameBuilder[i] = '-';
}
}
return queueNameBuilder.ToString();
}
}
The above QueueNameHelper
makes sure that queues follow the proper naming guidelines for SQS.
In PowerShell
Add-Type @'
using System;
public static class QueueNameHelper
{
public static string GetSqsQueueName(string destination, string queueNamePrefix = null)
{
if (string.IsNullOrWhiteSpace(destination))
{
throw new ArgumentNullException("destination");
}
var s = queueNamePrefix + destination;
// SQS queue names can only have alphanumeric characters, hyphens and underscores.
// Any other characters will be replaced with a hyphen.
for (var i = 0; i < s.Length; ++i)
{
var c = s[i];
if (!char.IsLetterOrDigit(c)
&& c != '-'
&& c != '_')
{
s = s.Replace(c, '-');
}
}
if (s.Length > 80)
{
throw new Exception(
string.Format("Address {0} with configured prefix {1} is longer than 80 characters and therefore cannot be used to create an SQS queue. Use a shorter queue name.", destination, queueNamePrefix));
}
return s;
}
}
'@
Native Send
The native send helper methods
A send involves the following actions:
- Create and serialize the payload including headers.
- Write a message body directly to SQS Transport.
In C#
public static async Task SendMessage(IAmazonSQS sqsClient, string queue, string messageBody, Dictionary<string, string> headers)
{
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
var base64Body = Convert.ToBase64String(bodyBytes);
var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
{
Headers = headers,
Body = base64Body,
});
var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue));
await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage);
}
In PowerShell
Function SendMessage
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageBody,
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[HashTable] $Headers
)
$bodyBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
$base64Body = [System.Convert]::ToBase64String($bodyBytes)
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
$serializedMessage = @{ Headers = $Headers; Body = $base64Body } | ConvertTo-Json
$queueUrl = Get-SQSQueueUrl $name
Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}
Using the native send helper methods
await SendMessage(
sqsClient: client,
queue: "samples-sqs-nativeintegration",
messageBody: "{Property:'PropertyValue'}",
headers: new Dictionary<string, string>
{
{"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
{"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
}
);
The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes
and MessageId
headers.
Native Send Large Messages
The native send helper methods
A send involves the following actions:
- Create an unique S3 key containing the
S3Prefix
and theMessageId
- Upload the body of the message to the S3 bucket
- Create and serialize the message with an empty payload including headers and the
S3BodyKey
. - Write a message directly to SQS Transport.
In C#
public static async Task SendLargeMessage(IAmazonSQS sqsClient, IAmazonS3 s3Client, string queue, string s3Prefix, string bucketName, string messageBody, Dictionary<string, string> headers)
{
var bodyBytes = Encoding.UTF8.GetBytes(messageBody);
var key = $"{s3Prefix}/{headers["NServiceBus.MessageId"]}";
using (var bodyStream = new MemoryStream(bodyBytes))
{
await s3Client.PutObjectAsync(new PutObjectRequest
{
BucketName = bucketName,
InputStream = bodyStream,
Key = key
});
}
var serializedMessage = Newtonsoft.Json.JsonConvert.SerializeObject(new
{
Headers = headers,
Body = string.Empty,
S3BodyKey = key
});
var queueUrlResponse = await sqsClient.GetQueueUrlAsync(QueueNameHelper.GetSqsQueueName(queue));
await sqsClient.SendMessageAsync(queueUrlResponse.QueueUrl, serializedMessage);
}
In PowerShell
Function SendLargeMessage
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueueName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $S3Prefix,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $BucketName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageBody,
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[HashTable] $Headers
)
$key = "$($s3Prefix)/$($Headers['NServiceBus.MessageId'])"
Write-S3Object -BucketName $BucketName -Key $key -Content $MessageBody -Force
[string]$name = [QueueNameHelper]::GetSqsQueueName($QueueName)
$serializedMessage = @{ Headers = $Headers; Body = [System.String]::Empty; S3BodyKey = $key } | ConvertTo-Json
$queueUrl = Get-SQSQueueUrl $name
Send-SQSMessage -QueueUrl $queueUrl -MessageBody $serializedMessage -Force
}
Using the native send helper methods
await SendLargeMessage(
sqsClient: client,
s3Client: s3Client,
queue: "samples-sqs-nativeintegration-large",
s3Prefix: "s3prefix",
bucketName: "bucketname",
messageBody: "{Property:'PropertyValue'}",
headers: new Dictionary<string, string>
{
{"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"},
{"NServiceBus.MessageId", "99C7320B-A645-4C74-95E8-857EAB98F4F9"}
}
);
The headers must contain the message type that is sent as a fully qualified assembly name as well as the message id header. See the headers documentation for more information on the EnclosedMessageTypes
and MessageId
headers.
The S3 bucket name and the S3 prefix must be provided as defined in the transport configuration of the endpoint that will be receiving the message.
Create resources
In order to provision the resources required by an endpoint, use the sqs-transport
command line (CLI) tool.
The tool can be obtained from NuGet and installed using the following command:
dotnet tool install -g NServiceBus.AmazonSQS.CommandLine
Once installed, the sqs-transport
command line tool will be available for use.
sqs-transport
Available commands
endpoint create
endpoint add delay-delivery-support
endpoint add large-message-support
endpoint subscribe
endpoint set-policy events
endpoint set-policy wildcard
endpoint list-policy
sqs-transport endpoint create
Create a new endpoint using:
sqs-transport endpoint create [name]
[--access-key-id]
[--secret]
[--region]
[--retention]
arguments
name
: Name of the endpoint (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --prefix
: Prefix to prepend to the endpoint queue
-t
| --retention
: Retention Period in seconds (defaults to 345600)
sqs-transport endpoint add delay-delivery-support
Add delay delivery support to an endpoint using:
sqs-transport endpoint add [name] delay-delivery-support
[--access-key-id]
[--secret]
[--region]
[--prefix]
[--retention]
arguments
name
: Name of the endpoint (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --prefix
: Prefix to prepend to the delay delivery queue
-t
| --retention
: Retention period in seconds (defaults to 345600)
sqs-transport endpoint add large-message-support
Add large message support to an endpoint using:
sqs-transport endpoint add [name] large-message-support [bucket-name]
[--access-key-id]
[--secret]
[--region]
[--retention]
arguments
name
: Name of the endpoint (required)
bucket-name
: Name of the s3 bucket (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --key-prefix
: S3 Key prefix to prepend to all bucket keys
-e
| --expiration
: Expiration time in days (defaults to 4)
sqs-transport endpoint subscribe event-type
Subscribe an endpoint to an event type using:
sqs-transport endpoint subscribe [name] [event-type]
[--access-key-id]
[--secret]
[--region]
[--prefix]
This command will only set up the subscription from the topic representing the event-type to the input queue of the endpoint. It will not set up the IAM policy which allows messages to flow from the topic to the input queue. To set up the IAM policy refer to the sqs-transport endpoint set-policy events
or sqs-transport endpoint set-policy wildcard
command.
arguments
name
: Name of the endpoint (required)
event-type
: Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --prefix
: Prefix to prepend to the topic provisioned for the event type and the subscribing queue
sqs-transport endpoint set-policy events
Set the IAM policy on the input queue of an endpoint based on the event types the endpoint subscribed to using:
sqs-transport endpoint set-policy [name] events
[--event-type]
[--access-key-id]
[--secret]
[--region]
[--prefix]
arguments
name
: Name of the endpoint (required)
options
-evt
| --event-type
: Full name of the event allowed through the IAM policy (e.g. MyNamespace.MyMessage); can be repeated multiple times to allow multiple event types to pass.
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-r
| --region
: Overrides the environment variable 'AWS_REGION'
-p
| --prefix
: Prefix to prepend to the topic provisioned for the event type and the subscribing queue
sqs-transport endpoint set-policy wildcard
Set the IAM policy on the input queue of an endpoint based on wildcard conditions using:
sqs-transport endpoint set-policy [name] wildcard
[--account-condition]
[--namespace-condition]
[--prefix-condition]
[--remove-event-type]
[--access-key-id]
[--secret]
[--region]
[--prefix]
arguments
name
: Name of the endpoint (required)
options
-ac
| --account-condition
: Allow all messages from any topic in the account to pass. If no value is provided, the account name will be derived from the endpoint input queue.
-pc
| --prefix-condition
: Allow all messages from any topic with prefix to pass. If no value is provided, the prefix from the -p | --prefix
option will be used.
-nc
| --namespace-condition
: Allow all messages from any message in the specified namespaces to pass
-revt
| --remove-event-type
: Since existing event type conditions on the policy will not be removed by default, specify a value for this option to remove an existing event type condition in case they are covered by the wildcard policy implicitly. Can be repeated multiple times to remove multiple event types.
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-r
| --region
: Overrides the environment variable 'AWS_REGION'
-p
| --prefix
: Prefix to prepend to the topic provisioned for the event type and the subscribing queue
sqs-transport endpoint list-policy
List the existing IAM policy on the input queue of an endpoint using:
sqs-transport endpoint subscribe [name] list-policy
[--access-key-id]
[--secret]
[--region]
[--prefix]
arguments
name
: Name of the endpoint (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-r
| --region
: Overrides the environment variable 'AWS_REGION'
-p
| --prefix
: Prefix to prepend to the topic provisioned for the event type and the subscribing queue
Delete resources
In order to de-provision the resources required by an endpoint, the sqs-transport
command line (CLI) tool can be used.
Available commands
endpoint unsubscribe
endpoint remove delay-delivery-support
endpoint remove large-message-support
endpoint delete
sqs-transport endpoint unsubscribe
Unsubscribe an endpoint from an event type using:
sqs-transport endpoint unsubscribe [name] [event-type]
[--access-key-id]
[--secret]
[--region]
[--prefix]
[--remove-shared-resources]
arguments
name
: Name of the endpoint (required)
event-type
: Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --prefix
: Prefix prepended to the topic provisioned for the event type and the subscribing queue
-f
| --remove-shared-resources
: Remove shared resources (the topic provisioned for the event type)
sqs-transport endpoint remove delay-delivery-support
Remove delay delivery support from an endpoint using:
sqs-transport endpoint remove [name] delay-delivery-support
[--access-key-id]
[--secret]
[--region]
[--prefix]
arguments
name
: Name of the endpoint (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --prefix
: Prefix to prepend to the delay delivery queue
sqs-transport endpoint remove large-message-support
Remove large message support from an endpoint using:
sqs-transport endpoint remove [name] large-message-support [bucket-name]
[--access-key-id]
[--secret]
[--region]
[--remove-shared-resources]
arguments
name
: Name of the endpoint (required)
bucket-name
: Name of the s3 bucket (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-e
| --remove-shared-resources
: Remove shared resources (S3 Bucket)
sqs-transport endpoint delete
Delete an existing endpoint using:
sqs-transport endpoint delete name
[--access-key-id]
[--secret]
[--region]
arguments
name
: Name of the endpoint (required)
options
-i
| --access-key-id
: Overrides the environment variable 'AWS_ACCESS_KEY_ID'
-s
| --secret
: Overrides the environment variable 'AWS_REGION'
-r
| --region
: Overrides the environment variable 'AWS_SECRET_ACCESS_KEY'
-p
| --prefix
: Prefix to prepend to the endpoint queue
Return message to source queue
The retry helper methods
A retry involves the following actions:
- Read a message from the error queue.
- Forward that message to another queue to be retried.
public static async Task ReturnMessageToSourceQueue(string errorQueueName, string messageId)
{
var path = QueueNameHelper.GetSqsQueueName(errorQueueName);
using (var client = ClientFactory.CreateSqsClient())
{
var queueUrlResponse = await client.GetQueueUrlAsync(path);
var queueUrl = queueUrlResponse.QueueUrl;
await InspectMessagesUntilFound(client, messageId, queueUrl);
}
}
static async Task InspectMessagesUntilFound(IAmazonSQS client, string messageId, string queueUrl)
{
var receivedMessages = await client.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = queueUrl,
WaitTimeSeconds = 20,
MaxNumberOfMessages = 10,
});
if (receivedMessages.Messages.Count == 0)
{
return;
}
var foundMessage = receivedMessages.Messages
.SingleOrDefault(message => message.MessageId == messageId);
if (foundMessage != null)
{
var failedQueueName = ReadFailedQueueHeader(foundMessage);
var failedQueueUrlResponse = await client.GetQueueUrlAsync(failedQueueName);
var failedQueueUrl = failedQueueUrlResponse.QueueUrl;
// Large message don't need to be handled separately since the S3BodyKey is preserved
await client.SendMessageAsync(new SendMessageRequest(failedQueueUrl, foundMessage.Body)); // what to do with the attributes?
await client.DeleteMessageAsync(queueUrl, foundMessage.ReceiptHandle);
return;
}
await InspectMessagesUntilFound(client, messageId, queueUrl);
}
static string ReadFailedQueueHeader(Message message)
{
var headers = ExtractHeaders(message);
var queueName = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
return QueueNameHelper.GetSqsQueueName(queueName);
}
static Dictionary<string, string> ExtractHeaders(Message message)
{
var transportMessage = JsonConvert.DeserializeObject<TransportMessage>(message.Body);
return transportMessage.Headers;
}
class TransportMessage
{
public Dictionary<string, string> Headers { get; set; }
public string Body { get; set; }
public string S3BodyKey { get; set; }
}
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
This example code will receive other messages from the error queue while it finds the desired message. All messages received by this code will be marked as invisible until the visibility timeout expires.
Using the retry helper methods
await ReturnMessageToSourceQueue(
errorQueueName: "error",
messageId: "c390a6fb-4fb5-46da-927d-a156f75739eb");