Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Modernization
Samples

AmazonSQS transport native integration sample

NuGet Package: NServiceBus.AmazonSQS (9-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.

AWS setup

Security and access configuration

Add the AWS Access Key ID and AWS Secret Access Key to the following environment variables:

  • Access Key ID in AWS_ACCESS_KEY_ID
  • Secret Access Key in AWS_SECRET_ACCESS_KEY
  • Default Region in AWS_REGION

See also AWS Account Identifiers, Managing Access Keys for an AWS Account, and IAM Security Credentials.

See also AWS Regions for a list of available regions.

SQS

Several Amazon SQS queues are required to run this sample. These will be created at start-up via the installer mechanism of NServiceBus. The queues can be seen in the SQS management UI.

  • Samples-Sqs-SimpleReceiver: The main message processing queue.
  • Samples-Sqs-SimpleReceiver-delay.fifo: Queue used for delayed retries.
  • error: Queue used for error handling.

Code walk-through

In this sample, an external system sends a message to an SQS queue using the Amazon SQS .NET SDK.

var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";

Attributes are included to demonstrate how to access those from handlers or behaviors in the pipeline.

await SendTo(new Dictionary<string, MessageAttributeValue>
            {
                {"SomeKey", new MessageAttributeValue {DataType = "String", StringValue = "something"}}, //optional attributes that the receiver might need
            }, MessageToSend);

On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.

var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";

The serializer must be configured to handle this annotation:

var serialization = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings
{
    TypeNameHandling = TypeNameHandling.Auto
});

First, the message will be intercepted in the incoming logical message context as there is a behavior in place:

class AccessToAmazonSqsNativeMessageBehavior(ILogger<AccessToAmazonSqsNativeMessageBehavior> logger) : Behavior<IIncomingLogicalMessageContext>
{
 
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);

        //do something useful with the native message
        if (nativeAttributeFound)
        {
            logger.LogInformation("Intercepted the native message and found attribute 'SomeKey' with value '{AttributeValue}'", attributeValue.StringValue);
        }

        return next();
    }
}

The code to register the above behavior is:

var serviceProvider = builder.Services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILogger<AccessToAmazonSqsNativeMessageBehavior>>();
endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(logger), "Demonstrates how to access the native message from a pipeline behavior");

Next, the handler is invoked. The handler code can also access the native message and its attributes.

public class SomeNativeMessageHandler(ILogger<SomeNativeMessageHandler> logger) : IHandleMessages<SomeNativeMessage>
{
    public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
    {
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);

        logger.LogInformation("Received {MessageType} with message {Message}", nameof(SomeNativeMessage), eventMessage.ThisIsTheMessage);

        if (nativeAttributeFound)
        {
            logger.LogInformation("Found attribute 'SomeKey' with value '{AttributeValue}'", attributeValue.StringValue);
        }

        if (context.ReplyToAddress != null)
        {
            logger.LogInformation("Sending reply to '{ReplyToAddress}'", context.ReplyToAddress);

            await context.Reply(new SomeReply());
        }
    }
}

Related Articles