Getting Started
Architecture
NServiceBus
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

MSMQ native integration

Component: MSMQ Transport
NuGet Package: NServiceBus.Transport.Msmq (2.x)
Target Version: NServiceBus 8.x

This document describes how to consume messages from and send messages to non-NServiceBus endpoints via MSMQ in integration scenarios.

These examples use the System.Messaging and System.Transactions assemblies.

Native send

The native send helper methods

Sending involves the following actions:

  • Creating and serializing headers.
  • Writing a message body directly to MSMQ.

In C#

public static void SendMessage(string queuePath, string messageBody, List<HeaderInfo> headers)
{
    using (var scope = new TransactionScope())
    {
        using (var queue = new MessageQueue(queuePath))
        using (var message = new Message())
        {
            var bytes = Encoding.UTF8.GetBytes(messageBody);
            message.BodyStream = new MemoryStream(bytes);
            message.Extension = CreateHeaders(headers);
            queue.Send(message, MessageQueueTransactionType.Automatic);
        }
        scope.Complete();
    }
}

public static byte[] CreateHeaders(List<HeaderInfo> headerInfos)
{
    var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
    using (var stream = new MemoryStream())
    {
        serializer.Serialize(stream, headerInfos);
        return stream.ToArray();
    }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}

In PowerShell

Set-StrictMode -Version 2.0

Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions

Add-Type @"
    public class HeaderInfo
    {
        public string Key { get; set; }
        public string Value { get; set; }
    }
"@

Function CreateHeaders {
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [Hashtable] $entries
    )

    $headerinfos =  New-Object System.Collections.Generic.List[HeaderInfo]
    $entries.Keys | % {
        $headerinfo = New-Object HeaderInfo
        $headerinfo.Key = $_
        $headerinfo.Value = $entries[$_]
        $headerinfos.Add($headerinfo)
    }

    $serializer = New-Object System.Xml.Serialization.XmlSerializer( $headerinfos.GetType() )
    $stream = New-Object System.IO.MemoryStream
    try
    {
        $serializer.Serialize($stream, $headerInfos)
        return $stream.ToArray()
    }
    finally
    {
        $stream.Dispose()
    }
}

Function SendMessage {
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $QueuePath,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]

        [Hashtable] $Headers
    )

    $scope = New-Object System.Transactions.TransactionScope
    $queue = New-Object System.Messaging.MessageQueue($QueuePath)
    $message = New-Object System.Messaging.Message

    try
    {
        $msgStream = New-Object System.IO.MemoryStream
        $msgBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
        $msgStream.Write($msgBytes, 0, $msgBytes.Length)
        $message.BodyStream = $msgStream
        $message.Extension =  CreateHeaders $Headers
        $queue.Send($message, [System.Messaging.MessageQueueTransactionType]::Automatic)
        $scope.Complete()
    }
    finally {
        $scope.Dispose()
        $message.Dispose()
        $queue.Dispose()
    }
}

Using the native send helper methods

In C#

SendMessage(
    queuePath: @"MachineName\private$\QueueName",
    messageBody: "{\"Property\":\"PropertyValue\"}",
    headers: new List<HeaderInfo>
    {
        new HeaderInfo
        {
            Key = "NServiceBus.EnclosedMessageTypes",
            Value = "MyNamespace.MyMessage"
        }
    });

In PowerShell

SendMessage -QueuePath 'MachineName\private$\QueueName' `
            -MessageBody:  '{\"Property\":\"PropertyValue\"}' `
            -Headers  @{ 'NServiceBus.EnclosedMessageTypes' = 'MyNamespace.MyMessage'; }

Return message to source queue

The retry helper methods

Retrying a message involves the following actions:

  • Reading a message from the error queue.
  • Extracting the failed queue from the headers.
  • Forwarding that message to the failed queue name so it can be retried.

In C#

public static void ReturnMessageToSourceQueue(string errorQueueMachine, string errorQueueName, string msmqMessageId)
{
    var path = $@"{errorQueueMachine}\private$\{errorQueueName}";
    var errorQueue = new MessageQueue(path);
    {
        var messageReadPropertyFilter = new MessagePropertyFilter
        {
            Body = true,
            TimeToBeReceived = true,
            Recoverable = true,
            Id = true,
            ResponseQueue = true,
            CorrelationId = true,
            Extension = true,
            AppSpecific = true,
            LookupId = true,
        };
        errorQueue.MessageReadPropertyFilter = messageReadPropertyFilter;
        using (var scope = new TransactionScope())
        {
            var transactionType = MessageQueueTransactionType.Automatic;
            var message = errorQueue.ReceiveById(msmqMessageId, TimeSpan.FromSeconds(5), transactionType);
            var fullPath = ReadFailedQueueHeader(message);
            using (var failedQueue = new MessageQueue(fullPath))
            {
                failedQueue.Send(message, transactionType);
            }
            scope.Complete();
        }
    }
}

static string ReadFailedQueueHeader(Message message)
{
    var headers = ExtractHeaders(message);
    var header = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
    var queueName = header.Split('@')[0];
    var machineName = header.Split('@')[1];
    return $@"{machineName}\private$\{queueName}";
}

public static List<HeaderInfo> ExtractHeaders(Message message)
{
    var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
    var extension = Encoding.UTF8.GetString(message.Extension);
    using (var stringReader = new StringReader(extension))
    {
        return (List<HeaderInfo>)serializer.Deserialize(stringReader);
    }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}

In PowerShell

Set-StrictMode -Version 2.0

Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions

Function ReturnMessageToSourceQueue
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $ErrorQueueMachine,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $ErrorQueueName,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageId
    )

    $queuePath = '{0}\private$\{1}'-f $ErrorQueueMachine, $ErrorQueueName

    $propertyFilter = New-Object System.Messaging.MessagePropertyFilter
    $propertyFilter.Body = $true
    $propertyFilter.TimeToBeReceived =$true
    $propertyFilter.Recoverable = $true
    $propertyFilter.Id = $true
    $propertyFilter.ResponseQueue = $true
    $propertyFilter.CorrelationId = $true
    $propertyFilter.Extension = $true
    $propertyFilter.AppSpecific = $true
    $propertyFilter.LookupId = $true

    $errorQueue = New-Object System.Messaging.MessageQueue($queuePath)
    $errorQueue.MessageReadPropertyFilter = $propertyFilter

    $scope = New-Object System.Transactions.TransactionScope
    try
    {
        $transactionType = [System.Messaging.MessageQueueTransactionType]::Automatic
        $message = $errorQueue.ReceiveById($MessageId, [System.TimeSpan]::FromSeconds(5), $transactionType)
        $failedQueuePath = ReadFailedQueueFromHeaders -Message $message
        $failedQueue = New-Object System.Messaging.MessageQueue($failedQueuePath)
        $failedQueue.Send($message, $transactionType)
        $scope.Complete()
    }
    finally {
        $scope.Dispose()
    }
}

Function ReadFailedQueueFromHeaders
{
    param(
        [Parameter(Mandatory=$true)]
        [System.Messaging.Message] $message
    )

    $rawheaders = [System.Text.Encoding]::UTF8.GetString($message.Extension)
    $reader = New-Object System.IO.StringReader($rawheaders)
    $xml = [xml] $reader.ReadToEnd()
    $header =  $xml.ArrayOfHeaderInfo.HeaderInfo | ? Key -eq "NServiceBus.FailedQ" | Select -ExpandProperty Value
    return ('{0}\private$\{1}' -f $header.Split('@')[1], $header.Split('@')[0])
}

Using the retry helper methods

ReturnMessageToSourceQueue(
    errorQueueMachine: Environment.MachineName,
    errorQueueName: "error",
    msmqMessageId: @"c390a6fb-4fb5-46da-927d-a156f75739eb\15386");