This sample demonstrates how to enable an NServiceBus endpoint to receive messages sent by a native (i.e. non-NServiceBus-based) implementation.
AWS setup
Security and access configuration
Add the AWS Access Key ID and AWS Secret Access Key to the following environment variables:
- Access Key ID in
AWS_ACCESS_KEY_ID
- Secret Access Key in
AWS_SECRET_ACCESS_KEY
- Default Region in
AWS_REGION
See also AWS Account Identifiers, Managing Access Keys for an AWS Account, and IAM Security Credentials.
See also AWS Regions for a list of available regions.
SQS
Several Amazon SQS queues are required to run this sample. These will be created at start-up via the installer mechanism of NServiceBus. The queues can be seen in the SQS management UI.
Samples-Sqs-SimpleReceiver
: The main message processing queue.Samples-Sqs-SimpleReceiver-delay.
: Queue used for delayed retries.fifo error
: Queue used for error handling.
Code walk-through
In this sample, an external system sends a message to an SQS queue using the Amazon SQS .NET SDK.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";
Attributes are included to demonstrate how to access those from handlers or behaviors in the pipeline.
await SendTo(new Dictionary<string, MessageAttributeValue>
{
{"SomeKey", new MessageAttributeValue {DataType = "String", StringValue = "something"}}, //optional attributes that the receiver might need
}, MessageToSend);
On the receiving end, an NServiceBus endpoint is listening to the queue and has a handler in place to handle messages of type SomeNativeMessage
.
For the message to be successfully deserialized by NServiceBus, the sender must include the full name of the message class in the $type
special attribute recognized by the Newtonsoft JSON serializer.
var MessageToSend = @"{""$type"" : ""NativeIntegration.Receiver.SomeNativeMessage, Receiver"", ""ThisIsTheMessage"": ""Hello world!""}";
The serializer must be configured to handle this annotation:
var serialization = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serialization.Settings(new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Auto
});
Custom serializers are also supported
First, the message will be intercepted in the incoming logical message context as there is a behavior in place:
class AccessToAmazonSqsNativeMessageBehavior(ILogger<AccessToAmazonSqsNativeMessageBehavior> logger) : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// get the native Amazon SQS message
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);
//do something useful with the native message
if (nativeAttributeFound)
{
logger.LogInformation("Intercepted the native message and found attribute 'SomeKey' with value '{AttributeValue}'", attributeValue.StringValue);
}
return next();
}
}
The code to register the above behavior is:
var serviceProvider = builder.Services.BuildServiceProvider();
var logger = serviceProvider.GetRequiredService<ILogger<AccessToAmazonSqsNativeMessageBehavior>>();
endpointConfiguration.Pipeline.Register(new AccessToAmazonSqsNativeMessageBehavior(logger), "Demonstrates how to access the native message from a pipeline behavior");
Next, the handler is invoked. The handler code can also access the native message and its attributes.
public class SomeNativeMessageHandler(ILogger<SomeNativeMessageHandler> logger) : IHandleMessages<SomeNativeMessage>
{
public async Task Handle(SomeNativeMessage eventMessage, IMessageHandlerContext context)
{
var nativeMessage = context.Extensions.Get<Message>();
var nativeAttributeFound = nativeMessage.MessageAttributes.TryGetValue("SomeKey", out var attributeValue);
logger.LogInformation("Received {MessageType} with message {Message}", nameof(SomeNativeMessage), eventMessage.ThisIsTheMessage);
if (nativeAttributeFound)
{
logger.LogInformation("Found attribute 'SomeKey' with value '{AttributeValue}'", attributeValue.StringValue);
}
if (context.ReplyToAddress != null)
{
logger.LogInformation("Sending reply to '{ReplyToAddress}'", context.ReplyToAddress);
await context.Reply(new SomeReply());
}
}
}