This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.
In this sample, an external system sends a message to an SQS queue using the Amazon SQS SDK. In order for NServiceBus to be able to consume this message, a NServiceBus.
message attribute with an empty json string ("{}"
) must be present. Other attributes are also included to demonstrate how to access those from handlers or behaviors in the pipeline.
MessageTypeFullName
message attribute for compatibility with earlier versions of the transport.await SendTo(new Dictionary<string, MessageAttributeValue>
{
{"NServiceBus.AmazonSQS.Headers", new MessageAttributeValue {DataType = "String", StringValue = "{}"}}, //required for native integration
{"SomeRandomKey", new MessageAttributeValue {DataType = "String", StringValue = "something-random"}}, //other optional attributes that the receiver might need
}, MessageToSend);
On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage
.
First, the message will be intercepted in the incoming logical message context as there is a behavior in place:
class AccessToAmazonSqsNativeMessageBehavior : Behavior<IIncomingLogicalMessageContext>
{
static ILog log = LogManager.GetLogger<AccessToAmazonSqsNativeMessageBehavior>();
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// get the native Amazon SQS message
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeRandomKey", out var randomAttributeKey);
//do something useful with the native message
if (nativeAttributeFound)
{
log.Info($"Intercepted the native message and found attribute 'SomeRandomKey' with value '{randomAttributeKey.StringValue}'");
}
return next();
}
}
The code to register the above behavior is:
endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(), "Demonstrates how to access the native message from a pipeline behavior");
Next, the handler is invoked. The handler code can also access the native message and its attributes.
public class SomeNativeMessageHandler : IHandleMessages<SomeNativeMessage>
{
static ILog log = LogManager.GetLogger<SomeNativeMessageHandler>();
public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
{
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeRandomKey", out var randomAttributeKey);
log.Info($"Received {nameof(SomeNativeMessage)} with message {eventMessage.ThisIsTheMessage}.");
if (nativeAttributeFound)
{
log.Info($"Found attribute 'SomeRandomKey' with value '{randomAttributeKey.StringValue}'");
}
if (context.ReplyToAddress != null)
{
log.Info($"Sending reply to '{context.ReplyToAddress}'");
await context.Reply(new SomeReply());
}
}
}
Replies
To enable the NServiceBus endpoint to reply back to a native queue, a reply-to address must be provided. To see this in action manually create a queue called my-native-endpoint
in AWS and use the CreateHeadersWithReply()
method in the sender to include the NServiceBus.
header in the NServiceBus.
message attribute.
static string CreateHeadersWithReply()
{
var nsbHeaders = new Dictionary<string, string>
{
{ "NServiceBus.ReplyToAddress", "my-native-endpoint" }, //optional - used to demo replying back to the native endpoint
};
return JsonConvert.SerializeObject(nsbHeaders, Formatting.None);
}
After running the sample use the AWS management console to view the replies sent back to the native queue.