Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Modernization
Samples

AmazonSQS transport native integration sample

NuGet Package: NServiceBus.AmazonSQS (7.x)
Target Version: NServiceBus 9.x

This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.

In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK.

var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";

Attributes are included to demonstrate how to access those from handlers or behaviors in the pipeline.

await SendTo(new Dictionary<string, MessageAttributeValue>
        {
            {"SomeKey", new MessageAttributeValue {DataType = "String", StringValue = "something"}}, //optional attributes that the receiver might need
        }, MessageToSend);

On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.

var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";

The serializer must be configured to handle this annotation:

var serialization = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings
{
    TypeNameHandling = TypeNameHandling.Auto
});

First, the message will be intercepted in the incoming logical message context as there is a behavior in place:

class AccessToAmazonSqsNativeMessageBehavior(ILogger<AccessToAmazonSqsNativeMessageBehavior> logger) : Behavior<IIncomingLogicalMessageContext>
{
 
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);

        //do something useful with the native message
        if (nativeAttributeFound)
        {
            logger.LogInformation($"Intercepted the native message and found attribute 'SomeKey' with value '{attributeValue.StringValue}'");
        }

        return next();
    }
}

The code to register the above behavior is:

var serviceProvider = builder.Services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILogger<AccessToAmazonSqsNativeMessageBehavior>>();
endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(logger), "Demonstrates how to access the native message from a pipeline behavior");

Next, the handler is invoked. The handler code can also access the native message and its attributes.

public class SomeNativeMessageHandler(ILogger<SomeNativeMessageHandler> logger) : IHandleMessages<SomeNativeMessage>
{
    public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
    {
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);

        logger.LogInformation($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");

        if (nativeAttributeFound)
        {
            logger.LogInformation($"Found attribute 'SomeKey' with value '{attributeValue.StringValue}'");
        }

        if (context.ReplyToAddress != null)
        {
            logger.LogInformation($"Sending reply to '{context.ReplyToAddress}'");

            await context.Reply(new SomeReply());
        }
    }
}

Related Articles