Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

AmazonSQS native integration

Target Version: NServiceBus 9.x

This document describes how to consume messages from non-NServiceBus endpoints via Amazon SQS in integration scenarios.

To send messages to non-NServiceBus endpoints, configure the sender endpoint so that it does not wrap outgoing messages in a transport envelope. For more information refer to the transport configuration options.

Accessing the native Amazon SQS message

It is sometimes useful to access the native Amazon SQS message from behaviors and handlers. When a message is received, the transport adds the native message, an instance of Amazon.SQS.Model.Message, to the message processing context. For example, the native message may be accessed from a pipeline behavior:

class AccessToAmazonSqsNativeMessage : Behavior<IIncomingContext>
{
    public override Task Invoke(IIncomingContext context, Func<Task> next)
    {
        // get the native Amazon SQS message
        var message = context.Extensions.Get<Message>();

        //do something useful

        return next();
    }
}

Message type detection

NServiceBus requires the message type to be available as part of the message metadata to process a message successfully

The SQS transport can ingest any message type with no specific format and requiring no specific headers to be set. The transport applies the following logic when receiving a message:

  • If the SQS message has a header named NServiceBus.AmazonSQS.Headers, whose value is a serialized Dictionary<string, string>, the message payload is assumed be a serialized body of an unwrapped NServiceBus message. The body is deserialized into the target type inferred from the payload. The message body is loaded from the configured S3 bucket when the NServiceBus.AmazonSQS.Headers attribute contains an entry with the key S3BodyKey and the value representing an S3 object key, including the necessary prefix as configured by the receiving endpoint.
  • If the SQS message has a header named MessageTypeFullName, the message is treated as a native message (not generated by NServiceBus). The value of the MessageTypeFullName will be used as the type to deserialize the message body. The native message body is loaded from the configured S3 bucket instead of the payload when the message has an optional S3BodyKey attribute with the value representing an S3 object key, including the necessary prefix as configured by the receiving endpoint.
  • If none of the above apply, the transport tries to deserialize the message into a TransportMessage envelope (treat it like a regular message generated by NServiceBus). If that fails or there are no NServiceBus-specific headers (NServiceBus.MessageID and NServiceBus.EnclosedMessageTypes), the message is treated as a native message and the message body is passed as-is to the processing pipeline. There is no support for loading the message body from an S3 bucket. For the deserialization to work, in this case, either the payload itself has to include the type metadata (e.g. $type attribute in JSON) or there needs to be a pipeline behavior that sets the EnclosedMessageTypes header value.

Whenever the native message needs to be copied, for example, moving messages to the error queue, auditing, or delayed retries purposes, the native message is converted into the transport's internal structure. The collection from NServiceBus.AmazonSQS.Headers is moved from the native message attribute into the headers collection. All other available message attributes from the original native message are copied over into the newly formed native message.

Retrying failed messages

Native messages that failed processing can be retried using ServicePulse and ServiceControl but the native message attributes that might have been present in the original message are lost when the message is retried.

Samples