This document describes how to consume messages from and send messages to non-NServiceBus endpoints via Azure Storage Queues in integration scenarios.
Sending native messages
Sending native messages can be accomplished by sending a message with a JSON-serialized payload using the QueueClient
. Refer to the sample for more information.
Custom envelope unwrapper
Azure Storage Queues lacks native header support. NServiceBus solves this by wrapping headers and message body in a custom envelope structure. This envelope is serialized using the configured serializer for the endpoint before being sent.
Creating this envelope can cause unnecessary complexity if headers are not needed, as is the case in native integration scenarios. For this reason, NServiceBus.Transport.AzureStorageQueues 9.0 and above support configuring a custom envelope unwrapper.
In this scenario, NServiceBus may place messages in your queue in addition to the native messages that are expected, for example if a message results in a delayed retry. Any custom envelope unwrapper must verify if the incoming message is capable of being deserialized as a native message, and the resulting body must be serialized according to the configured endpoint serializer. If either of these conditions cannot be met then MessageUnwrapper
should return null
to allow the default unwrapper to handle the message.
The snippet below shows custom unwrapping logic that enables both NServiceBus formatted and plain serialized messages to be consumed.
var transport = new AzureStorageQueueTransport("connection string")
{
MessageUnwrapper = queueMessage =>
{
using (var stream = new MemoryStream(Convert.FromBase64String(queueMessage.MessageText)))
using (var streamReader = new StreamReader(stream))
using (var textReader = new JsonTextReader(streamReader))
{
//try deserialize to a NServiceBus envelope first
var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);
if (wrapper.Id != null)
{
//this was a envelope message
return wrapper;
}
//this was a native message just return the body as is with no headers
return new MessageWrapper
{
Id = queueMessage.MessageId,
Headers = new Dictionary<string, string>(),
Body = Convert.FromBase64String(queueMessage.MessageText)
};
}
}
};
endpointConfiguration.UseTransport(transport);
This feature is currently NOT compatible with ServiceControl. A ServiceControl transport adapter is required to leverage both.