Native integration with AmazonSQS Transport

Component: Amazon SQS Transport
NuGet Package NServiceBus.AmazonSQS (5.x)
Target NServiceBus Version: 7.x

This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.

In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK. In order for NServiceBus to be able to consume this message, a MessageTypeFullName message attribute must be present. Other attributes are also included to demonstrate how to access those from handlers or behaviors in the pipeline.

await SendTo(new Dictionary<string, MessageAttributeValue>
{
    {"MessageTypeFullName", new MessageAttributeValue {DataType = "String", StringValue = "NativeIntegration.Receiver.SomeNativeMessage"}}, // required for native integration
    //{"S3BodyKey", new MessageAttributeValue {DataType = "String", StringValue = "s3bodykey"}}, // optional for native integration
    {"SomeRandomKey", new MessageAttributeValue {DataType = "String", StringValue = "something-random"}},
    {"AnotherRandomKey", new MessageAttributeValue {DataType = "String", StringValue = "something-else-thats-random"}},
}, MessageToSend);

On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.

First, the message will be intercepted in the incoming logical message context as there is a behavior in place:

class AccessToAmazonSqsNativeMessageBehavior : Behavior<IIncomingLogicalMessageContext>
{
    static ILog log = LogManager.GetLogger<AccessToAmazonSqsNativeMessageBehavior>();

    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("AnotherRandomKey", out var randomAttributeKey);

        //do something useful with the native message
        if (nativeAttributeFound)
        {
            log.Info($"Intercepted the native message and found attribute 'AnotherRandomKey' with value '{randomAttributeKey.StringValue}'");
        }

        return next();
    }
}

The code to register the above behavior is:

endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(), "access-to-native-msg");

Next, the handler is invoked. The handler code can also access the native message and its attributes.

The message attribute MessageTypeFullName might not be available anymore in the MessageAttributes collection in recoverability scenarios. Instead, it will be part of the Headers collection.
public class SomeNativeMessageHandler : IHandleMessages<SomeNativeMessage>
{
    static ILog log = LogManager.GetLogger<SomeNativeMessageHandler>();

    public Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
    {
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeRandomKey", out var randomAttributeKey);

        log.Info($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");

        if (nativeAttributeFound)
        {
            log.Info($"Found attribute 'SomeRandomKey' with value '{randomAttributeKey.StringValue}'");
        }

        return Task.CompletedTask;
    }
}

Related Articles


Last modified