MSMQ Native Integration

Component: MSMQ Transport
NuGet Package NServiceBus.Transport.Msmq (2-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This document describes how to consume messages from and send messages to non-NServiceBus endpoints via MSMQ in integration scenarios.

These examples use the System.Messaging and System.Transactions assemblies.

The Systems.Messaging namespace is not available in .NET Core.
When using the C# code samples, be sure to add the proper includes for both the System.Messaging and System.Transactions assemblies in the program that's using these functions. When using the PowerShell scripts, include these assemblies by calling Add-Type in the script.

Native send

The native send helper methods

Sending involves the following actions:

  • Creating and serializing headers.
  • Writing a message body directly to MSMQ.

In C#

public static void SendMessage(string queuePath, string messageBody, List<HeaderInfo> headers)
{
    using (var scope = new TransactionScope())
    {
        using (var queue = new MessageQueue(queuePath))
        using (var message = new Message())
        {
            var bytes = Encoding.UTF8.GetBytes(messageBody);
            message.BodyStream = new MemoryStream(bytes);
            message.Extension = CreateHeaders(headers);
            queue.Send(message, MessageQueueTransactionType.Automatic);
        }
        scope.Complete();
    }
}

public static byte[] CreateHeaders(List<HeaderInfo> headerInfos)
{
    var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
    using (var stream = new MemoryStream())
    {
        serializer.Serialize(stream, headerInfos);
        return stream.ToArray();
    }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}

In PowerShell

Set-StrictMode -Version 2.0

Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions

Add-Type @"
    public class HeaderInfo
    {
        public string Key { get; set; }
        public string Value { get; set; }
    }
"@

Function CreateHeaders {
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [Hashtable] $entries
    )

    $headerinfos =  New-Object System.Collections.Generic.List[HeaderInfo]
    $entries.Keys | % {
        $headerinfo = New-Object HeaderInfo
        $headerinfo.Key = $_
        $headerinfo.Value = $entries[$_]
        $headerinfos.Add($headerinfo)
    }

    $serializer = New-Object System.Xml.Serialization.XmlSerializer( $headerinfos.GetType() )
    $stream = New-Object System.IO.MemoryStream
    try
    {
        $serializer.Serialize($stream, $headerInfos)
        return $stream.ToArray()
    }
    finally
    {
        $stream.Dispose()
    }
}

Function SendMessage {
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $QueuePath,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]

        [Hashtable] $Headers
    )

    $scope = New-Object System.Transactions.TransactionScope
    $queue = New-Object System.Messaging.MessageQueue($QueuePath)
    $message = New-Object System.Messaging.Message

    try
    {
        $msgStream = New-Object System.IO.MemoryStream
        $msgBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
        $msgStream.Write($msgBytes, 0, $msgBytes.Length)
        $message.BodyStream = $msgStream
        $message.Extension =  CreateHeaders $Headers
        $queue.Send($message, [System.Messaging.MessageQueueTransactionType]::Automatic)
        $scope.Complete()
    }
    finally {
        $scope.Dispose()
        $message.Dispose()
        $queue.Dispose()
    }
}

Using the native send helper methods

In C#

SendMessage(
    queuePath: @"MachineName\private$\QueueName",
    messageBody: "{\"Property\":\"PropertyValue\"}",
    headers: new List<HeaderInfo>
    {
        new HeaderInfo
        {
            Key = "NServiceBus.EnclosedMessageTypes",
            Value = "MyNamespace.MyMessage"
        }
    });

In PowerShell

SendMessage -QueuePath 'MachineName\private$\QueueName' `
            -MessageBody:  '{\"Property\":\"PropertyValue\"}' `
            -Headers  @{ 'NServiceBus.EnclosedMessageTypes' = 'MyNamespace.MyMessage'; }

Return message to source queue

The retry helper methods

Retrying a message involves the following actions:

  • Reading a message from the error queue.
  • Extracting the failed queue from the headers.
  • Forwarding that message to the failed queue name so it can be retried.

In C#

public static void ReturnMessageToSourceQueue(string errorQueueMachine, string errorQueueName, string msmqMessageId)
{
    var path = $@"{errorQueueMachine}\private$\{errorQueueName}";
    var errorQueue = new MessageQueue(path);
    {
        var messageReadPropertyFilter = new MessagePropertyFilter
        {
            Body = true,
            TimeToBeReceived = true,
            Recoverable = true,
            Id = true,
            ResponseQueue = true,
            CorrelationId = true,
            Extension = true,
            AppSpecific = true,
            LookupId = true,
        };
        errorQueue.MessageReadPropertyFilter = messageReadPropertyFilter;
        using (var scope = new TransactionScope())
        {
            var transactionType = MessageQueueTransactionType.Automatic;
            var message = errorQueue.ReceiveById(msmqMessageId, TimeSpan.FromSeconds(5), transactionType);
            var fullPath = ReadFailedQueueHeader(message);
            using (var failedQueue = new MessageQueue(fullPath))
            {
                failedQueue.Send(message, transactionType);
            }
            scope.Complete();
        }
    }
}

static string ReadFailedQueueHeader(Message message)
{
    var headers = ExtractHeaders(message);
    var header = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
    var queueName = header.Split('@')[0];
    var machineName = header.Split('@')[1];
    return $@"{machineName}\private$\{queueName}";
}

public static List<HeaderInfo> ExtractHeaders(Message message)
{
    var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
    var extension = Encoding.UTF8.GetString(message.Extension);
    using (var stringReader = new StringReader(extension))
    {
        return (List<HeaderInfo>) serializer.Deserialize(stringReader);
    }
}

public class HeaderInfo
{
    public string Key { get; set; }
    public string Value { get; set; }
}

In PowerShell

Set-StrictMode -Version 2.0

Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions

Function ReturnMessageToSourceQueue
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $ErrorQueueMachine,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $ErrorQueueName,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageId
    )

    $queuePath = '{0}\private$\{1}'-f $ErrorQueueMachine, $ErrorQueueName

    $propertyFilter = New-Object System.Messaging.MessagePropertyFilter
    $propertyFilter.Body = $true
    $propertyFilter.TimeToBeReceived =$true
    $propertyFilter.Recoverable = $true
    $propertyFilter.Id = $true
    $propertyFilter.ResponseQueue = $true
    $propertyFilter.CorrelationId = $true
    $propertyFilter.Extension = $true
    $propertyFilter.AppSpecific = $true
    $propertyFilter.LookupId = $true

    $errorQueue = New-Object System.Messaging.MessageQueue($queuePath)
    $errorQueue.MessageReadPropertyFilter = $propertyFilter

    $scope = New-Object System.Transactions.TransactionScope
    try
    {
        $transactionType = [System.Messaging.MessageQueueTransactionType]::Automatic
        $message = $errorQueue.ReceiveById($MessageId, [System.TimeSpan]::FromSeconds(5), $transactionType)
        $failedQueuePath = ReadFailedQueueFromHeaders -Message $message
        $failedQueue = New-Object System.Messaging.MessageQueue($failedQueuePath)
        $failedQueue.Send($message, $transactionType)
        $scope.Complete()
    }
    finally {
        $scope.Dispose()
    }
}

Function ReadFailedQueueFromHeaders
{
    param(
        [Parameter(Mandatory=$true)]
        [System.Messaging.Message] $message
    )

    $rawheaders = [System.Text.Encoding]::UTF8.GetString($message.Extension)
    $reader = New-Object System.IO.StringReader($rawheaders)
    $xml = [xml] $reader.ReadToEnd()
    $header =  $xml.ArrayOfHeaderInfo.HeaderInfo | ? Key -eq "NServiceBus.FailedQ" | Select -ExpandProperty Value
    return ('{0}\private$\{1}' -f $header.Split('@')[1], $header.Split('@')[0])
}

Using the retry helper methods

ReturnMessageToSourceQueue(
    errorQueueMachine: Environment.MachineName,
    errorQueueName: "error",
    msmqMessageId: @"c390a6fb-4fb5-46da-927d-a156f75739eb\15386");

Last modified