Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

AmazonSQS transport native integration sample

NuGet Package: NServiceBus.AmazonSQS (6.x)
Target Version: NServiceBus 8.x

This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.

In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK. In order for NServiceBus to be able to consume this message, a NServiceBus.AmazonSQS.Headers message attribute with an empty json string ("{}") must be present. Other attributes are also included to demonstrate how to access those from handlers or behaviors in the pipeline.

This is for NServiceBus endpoints running NServiceBus.AmazonSQS version 6.1 and above. Use the MessageTypeFullName message attribute for compatibility with earlier versions of the transport.
await SendTo(new Dictionary<string, MessageAttributeValue>
{                        
    {"NServiceBus.AmazonSQS.Headers", new MessageAttributeValue {DataType = "String", StringValue = "{}"}}, //required for native integration
    {"SomeRandomKey", new MessageAttributeValue {DataType = "String", StringValue = "something-random"}}, //other optional attributes that the receiver might need
}, MessageToSend);

On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.

For the message to be successfully deserialized by NServiceBus the sender must include the full name of the message class.

First, the message will be intercepted in the incoming logical message context as there is a behavior in place:

class AccessToAmazonSqsNativeMessageBehavior : Behavior<IIncomingLogicalMessageContext>
{
    static ILog log = LogManager.GetLogger<AccessToAmazonSqsNativeMessageBehavior>();

    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeRandomKey", out var randomAttributeKey);

        //do something useful with the native message
        if (nativeAttributeFound)
        {
            log.Info($"Intercepted the native message and found attribute 'SomeRandomKey' with value '{randomAttributeKey.StringValue}'");
        }

        return next();
    }
}

The code to register the above behavior is:

endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(), "Demonstrates how to access the native message from a pipeline behavior");

Next, the handler is invoked. The handler code can also access the native message and its attributes.

public class SomeNativeMessageHandler : IHandleMessages<SomeNativeMessage>
{
    static ILog log = LogManager.GetLogger<SomeNativeMessageHandler>();

    public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
    {
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeRandomKey", out var randomAttributeKey);

        log.Info($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");

        if (nativeAttributeFound)
        {
            log.Info($"Found attribute 'SomeRandomKey' with value '{randomAttributeKey.StringValue}'");
        }

        if (context.ReplyToAddress != null)
        {
            log.Info($"Sending reply to '{context.ReplyToAddress}'");

            await context.Reply(new SomeReply());
        }
    }
}

Replies

To enable the NServiceBus endpoint to reply back to a native queue, a reply-to address must be provided. To see this in action manually create a queue called my-native-endpoint in AWS and use the CreateHeadersWithReply() method in the sender to include the NServiceBus.ReplyToAddress header in the NServiceBus.AmazonSQS.Headers message attribute.

static string CreateHeadersWithReply()
{
    var nsbHeaders = new Dictionary<string, string>
    {
        { "NServiceBus.ReplyToAddress", "my-native-endpoint" }, //optional - used to demo replying back to the native endpoint         
    };

    return JsonConvert.SerializeObject(nsbHeaders, Formatting.None);
}

After running the sample use the AWS management console to view the replies sent back to the native queue.

Related Articles


Last modified