This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.
In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";
Attributes are included to demonstrate how to access those from handlers or behaviors in the pipeline.
await SendTo(new Dictionary<string, MessageAttributeValue>
{
{"SomeKey", new MessageAttributeValue {DataType = "String", StringValue = "something"}}, //optional attributes that the receiver might need
}, MessageToSend);
On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage
.
For the message to be successfully deserialized by NServiceBus, the sender must include the full name of the message class in the $type
special attribute recognized by the Newtonsoft JSON serializer.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";
The serializer must be configured to handle this annotation:
var serialization = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Auto
});
Custom serializers are also supported
First, the message will be intercepted in the incoming logical message context as there is a behavior in place:
class AccessToAmazonSqsNativeMessageBehavior : Behavior<IIncomingLogicalMessageContext>
{
static readonly ILog log = LogManager.GetLogger<AccessToAmazonSqsNativeMessageBehavior>();
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// get the native Amazon SQS message
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);
//do something useful with the native message
if (nativeAttributeFound)
{
log.Info($"Intercepted the native message and found attribute 'SomeKey' with value '{attributeValue.StringValue}'");
}
return next();
}
}
The code to register the above behavior is:
endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(), "Demonstrates how to access the native message from a pipeline behavior");
Next, the handler is invoked. The handler code can also access the native message and its attributes.
public class SomeNativeMessageHandler : IHandleMessages<SomeNativeMessage>
{
static readonly ILog log = LogManager.GetLogger<SomeNativeMessageHandler>();
public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
{
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);
log.Info($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");
if (nativeAttributeFound)
{
log.Info($"Found attribute 'SomeKey' with value '{attributeValue.StringValue}'");
}
if (context.ReplyToAddress != null)
{
log.Info($"Sending reply to '{context.ReplyToAddress}'");
await context.Reply(new SomeReply());
}
}
}