Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

AmazonSQS transport native integration sample

NuGet Package: NServiceBus.AmazonSQS (5.x)
Target Version: NServiceBus 7.x

This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.

In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK. In order for NServiceBus to be able to consume this message, a MessageTypeFullName message attribute must be present. Other attributes are also included to demonstrate how to access those from handlers or behaviors in the pipeline.

await SendTo(new Dictionary<string, MessageAttributeValue>
{
    {"MessageTypeFullName", new MessageAttributeValue {DataType = "String", StringValue = "NativeIntegration.Receiver.SomeNativeMessage"}}, // required for native integration
    //{"S3BodyKey", new MessageAttributeValue {DataType = "String", StringValue = "s3bodykey"}}, // optional for native integration
    {"SomeRandomKey", new MessageAttributeValue {DataType = "String", StringValue = "something-random"}},
    {"AnotherRandomKey", new MessageAttributeValue {DataType = "String", StringValue = "something-else-thats-random"}},
    //{"ReplyToAddress", new MessageAttributeValue {DataType = "String", StringValue = "my-native-endpoint"}}, //optional, used to demo replying back to the native endpoint, make sure to create the queue manually in AWS
}, MessageToSend);

On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.

First, the message will be intercepted in the incoming logical message context as there is a behavior in place:

class AccessToAmazonSqsNativeMessageBehavior : Behavior<IIncomingLogicalMessageContext>
{
    static ILog log = LogManager.GetLogger<AccessToAmazonSqsNativeMessageBehavior>();

    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("AnotherRandomKey", out var randomAttributeKey);

        //do something useful with the native message
        if (nativeAttributeFound)
        {
            log.Info($"Intercepted the native message and found attribute 'AnotherRandomKey' with value '{randomAttributeKey.StringValue}'");
        }

        return next();
    }
}

The code to register the above behavior is:

endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(), "Demonstrates how to access the native message from a pipeline behavior");
endpointConfiguration.Pipeline.Register(new PopulateReplyToAddressBehavior(), "Behavior to enable replies back to the native endpoint");

Next, the handler is invoked. The handler code can also access the native message and its attributes.

The message attribute MessageTypeFullName might not be available anymore in the MessageAttributes collection in recoverability scenarios. Instead, it will be part of the Headers collection.
public class SomeNativeMessageHandler : IHandleMessages<SomeNativeMessage>
{
    static ILog log = LogManager.GetLogger<SomeNativeMessageHandler>();

    public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
    {
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeRandomKey", out var randomAttributeKey);

        log.Info($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");

        if (nativeAttributeFound)
        {
            log.Info($"Found attribute 'SomeRandomKey' with value '{randomAttributeKey.StringValue}'");
        }

        if(context.ReplyToAddress != null)
        {
            log.Info($"Sending reply to '{context.ReplyToAddress}'");

            await context.Reply(new SomeReply());
        }
    }
}

Replies

To enable the NServiceBus endpoint to reply back to a native queue, a reply-to address must be provided. To see this in action manually create a queue called my-native-endpoint in AWS and uncomment the line in the sender that adds the ReplyToAddress message attribute. The receiver uses a behavior to transfer the reply-to address in the native attribute to the NServiceBus.ReplyToAddress header that NServiceBus uses to route replies:

class PopulateReplyToAddressBehavior : Behavior<ITransportReceiveContext>
{
    static ILog log = LogManager.GetLogger<PopulateReplyToAddressBehavior>();

    public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeReplyToAddressFound = nativeMessage.MessageAttributes.TryGetValue("ReplyToAddress", out var nativeReplyToAddressKey);

        // populate the `NServiceBus.ReplyToAddress` header to enable replies from message handlers to be routed back to the native endpoint
        if (nativeReplyToAddressFound)
        {
            log.Info($"Found native attribute 'ReplyToAddress' with value '{nativeReplyToAddressKey.StringValue}' that will be used to route replies");

            context.Message.Headers[Headers.ReplyToAddress] = nativeReplyToAddressKey.StringValue;
        }

        return next();
    }
}

After running the sample use the AWS management console to view the replies sent back to the native queue.

Related Articles


Last modified