Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

AmazonSQS transport native integration sample

NuGet Package: NServiceBus.AmazonSQS (7-pre)
Target Version: NServiceBus 9.x
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.

In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK.

var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";

Attributes are included to demonstrate how to access those from handlers or behaviors in the pipeline.

await SendTo(new Dictionary<string, MessageAttributeValue>
        {
            {"SomeKey", new MessageAttributeValue {DataType = "String", StringValue = "something"}}, //optional attributes that the receiver might need
        }, MessageToSend);

On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.

For the message to be successfully deserialized by NServiceBus, the sender must include the full name of the message class in the $type special attribute recognized by the Newtonsoft JSON serializer.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";

The serializer must be configured to handle this annotation:

var serialization = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings
{
    TypeNameHandling = TypeNameHandling.Auto
});
Custom serializers are also supported

First, the message will be intercepted in the incoming logical message context as there is a behavior in place:

class AccessToAmazonSqsNativeMessageBehavior : Behavior<IIncomingLogicalMessageContext>
{
    static readonly ILog log = LogManager.GetLogger<AccessToAmazonSqsNativeMessageBehavior>();

    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);

        //do something useful with the native message
        if (nativeAttributeFound)
        {
            log.Info($"Intercepted the native message and found attribute 'SomeKey' with value '{attributeValue.StringValue}'");
        }

        return next();
    }
}

The code to register the above behavior is:

endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(), "Demonstrates how to access the native message from a pipeline behavior");

Next, the handler is invoked. The handler code can also access the native message and its attributes.

public class SomeNativeMessageHandler : IHandleMessages<SomeNativeMessage>
{
    static readonly ILog log = LogManager.GetLogger<SomeNativeMessageHandler>();

    public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
    {
        var nativeMessage = context.Extensions.Get<Message>();
        var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);

        log.Info($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");

        if (nativeAttributeFound)
        {
            log.Info($"Found attribute 'SomeKey' with value '{attributeValue.StringValue}'");
        }

        if (context.ReplyToAddress != null)
        {
            log.Info($"Sending reply to '{context.ReplyToAddress}'");

            await context.Reply(new SomeReply());
        }
    }
}

Related Articles


Last modified