This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.
AWS setup
Security and access configuration
Add the AWS Access Key ID and AWS Secret Access Key to the following environment variables:
- Access Key ID in
AWS_ACCESS_KEY_ID - Secret Access Key in
AWS_SECRET_ACCESS_KEY - Default Region in
AWS_REGION
See also AWS Account Identifiers, Managing Access Keys for an AWS Account, and IAM Security Credentials.
See also AWS Regions for a list of available regions.
SQS
Several Amazon SQS queues are required to run this sample. These will be created at start-up via the installer mechanism of NServiceBus. The queues can be seen in the SQS management UI.
Samples-Sqs-SimpleReceiver: The main message processing queue.Samples-Sqs-SimpleReceiver-delay.: Queue used for delayed retries.fifo error: Queue used for error handling.
Code walk-through
In this sample, an external system sends a message to an SQS queue using the Amazon SQS .NET SDK.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";
Attributes are included to demonstrate how to access those from handlers or behaviors in the pipeline.
await SendTo(new Dictionary<string, MessageAttributeValue>
{
{"SomeKey", new MessageAttributeValue {DataType = "String", StringValue = "something"}}, //optional attributes that the receiver might need
}, MessageToSend);
On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage.
For the message to be successfully deserialized by NServiceBus, the sender must include the full name of the message class in the $type special attribute recognized by the Newtonsoft JSON serializer.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";
The serializer must be configured to handle this annotation:
var serialization = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Auto
});
Custom serializers are also supported
First, the message will be intercepted in the incoming logical message context as there is a behavior in place:
class AccessToAmazonSqsNativeMessageBehavior(ILogger<AccessToAmazonSqsNativeMessageBehavior> logger) : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// get the native Amazon SQS message
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);
//do something useful with the native message
if (nativeAttributeFound)
{
logger.LogInformation("Intercepted the native message and found attribute 'SomeKey' with value '{AttributeValue}'", attributeValue.StringValue);
}
return next();
}
}
The code to register the above behavior is:
var serviceProvider = builder.Services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILogger<AccessToAmazonSqsNativeMessageBehavior>>();
endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(logger), "Demonstrates how to access the native message from a pipeline behavior");
Next, the handler is invoked. The handler code can also access the native message and its attributes.
public class SomeNativeMessageHandler(ILogger<SomeNativeMessageHandler> logger) : IHandleMessages<SomeNativeMessage>
{
public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
{
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);
logger.LogInformation("Received {MessageType} with message {Message}", nameof(SomeNativeMessage), eventMessage.ThisIsTheMessage);
if (nativeAttributeFound)
{
logger.LogInformation("Found attribute 'SomeKey' with value '{AttributeValue}'", attributeValue.StringValue);
}
if (context.ReplyToAddress != null)
{
logger.LogInformation("Sending reply to '{ReplyToAddress}'", context.ReplyToAddress);
await context.Reply(new SomeReply());
}
}
}