This document describes how to consume messages from and send messages to non-NServiceBus endpoints via MSMQ in integration scenarios.
These examples use the System.Messaging and System.Transactions assemblies.
The
Systems.Messaging
namespace is not available in .NET Core.When using the C# code samples, be sure to add the proper includes for both the
System.Messaging
and System.Transactions
assemblies in the program that's using these functions. When using the PowerShell scripts, include these assemblies by calling Add-Type
in the script.Native send
The native send helper methods
Sending involves the following actions:
- Creating and serializing headers.
- Writing a message body directly to MSMQ.
In C#
public static void SendMessage(string queuePath, string messageBody, List<HeaderInfo> headers)
{
using (var scope = new TransactionScope())
{
using (var queue = new MessageQueue(queuePath))
using (var message = new Message())
{
var bytes = Encoding.UTF8.GetBytes(messageBody);
message.BodyStream = new MemoryStream(bytes);
message.Extension = CreateHeaders(headers);
queue.Send(message, MessageQueueTransactionType.Automatic);
}
scope.Complete();
}
}
public static byte[] CreateHeaders(List<HeaderInfo> headerInfos)
{
var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
using (var stream = new MemoryStream())
{
serializer.Serialize(stream, headerInfos);
return stream.ToArray();
}
}
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
In PowerShell
Set-StrictMode -Version 2.0
Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions
Add-Type @"
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
"@
Function CreateHeaders {
param(
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[Hashtable] $entries
)
$headerinfos = New-Object System.Collections.Generic.List[HeaderInfo]
$entries.Keys | % {
$headerinfo = New-Object HeaderInfo
$headerinfo.Key = $_
$headerinfo.Value = $entries[$_]
$headerinfos.Add($headerinfo)
}
$serializer = New-Object System.Xml.Serialization.XmlSerializer( $headerinfos.GetType() )
$stream = New-Object System.IO.MemoryStream
try
{
$serializer.Serialize($stream, $headerInfos)
return $stream.ToArray()
}
finally
{
$stream.Dispose()
}
}
Function SendMessage {
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueuePath,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageBody,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[Hashtable] $Headers
)
$scope = New-Object System.Transactions.TransactionScope
$queue = New-Object System.Messaging.MessageQueue($QueuePath)
$message = New-Object System.Messaging.Message
try
{
$msgStream = New-Object System.IO.MemoryStream
$msgBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
$msgStream.Write($msgBytes, 0, $msgBytes.Length)
$message.BodyStream = $msgStream
$message.Extension = CreateHeaders $Headers
$queue.Send($message, [System.Messaging.MessageQueueTransactionType]::Automatic)
$scope.Complete()
}
finally {
$scope.Dispose()
$message.Dispose()
$queue.Dispose()
}
}
Using the native send helper methods
In C#
SendMessage(
queuePath: @"MachineName\private$\QueueName",
messageBody: "{\"Property\":\"PropertyValue\"}",
headers: new List<HeaderInfo>
{
new HeaderInfo
{
Key = "NServiceBus.EnclosedMessageTypes",
Value = "MyNamespace.MyMessage"
}
});
In PowerShell
SendMessage -QueuePath 'MachineName\private$\QueueName' `
-MessageBody: '{\"Property\":\"PropertyValue\"}' `
-Headers @{ 'NServiceBus.EnclosedMessageTypes' = 'MyNamespace.MyMessage'; }
Return message to source queue
The retry helper methods
Retrying a message involves the following actions:
- Reading a message from the error queue.
- Extracting the failed queue from the headers.
- Forwarding that message to the failed queue name so it can be retried.
In C#
public static void ReturnMessageToSourceQueue(string errorQueueMachine, string errorQueueName, string msmqMessageId)
{
var path = $@"{errorQueueMachine}\private$\{errorQueueName}";
var errorQueue = new MessageQueue(path);
{
var messageReadPropertyFilter = new MessagePropertyFilter
{
Body = true,
TimeToBeReceived = true,
Recoverable = true,
Id = true,
ResponseQueue = true,
CorrelationId = true,
Extension = true,
AppSpecific = true,
LookupId = true,
};
errorQueue.MessageReadPropertyFilter = messageReadPropertyFilter;
using (var scope = new TransactionScope())
{
var transactionType = MessageQueueTransactionType.Automatic;
var message = errorQueue.ReceiveById(msmqMessageId, TimeSpan.FromSeconds(5), transactionType);
var fullPath = ReadFailedQueueHeader(message);
using (var failedQueue = new MessageQueue(fullPath))
{
failedQueue.Send(message, transactionType);
}
scope.Complete();
}
}
}
static string ReadFailedQueueHeader(Message message)
{
var headers = ExtractHeaders(message);
var header = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
var queueName = header.Split('@')[0];
var machineName = header.Split('@')[1];
return $@"{machineName}\private$\{queueName}";
}
public static List<HeaderInfo> ExtractHeaders(Message message)
{
var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
var extension = Encoding.UTF8.GetString(message.Extension);
using (var stringReader = new StringReader(extension))
{
return (List<HeaderInfo>) serializer.Deserialize(stringReader);
}
}
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
In PowerShell
Set-StrictMode -Version 2.0
Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions
Function ReturnMessageToSourceQueue
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $ErrorQueueMachine,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $ErrorQueueName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageId
)
$queuePath = '{0}\private$\{1}'-f $ErrorQueueMachine, $ErrorQueueName
$propertyFilter = New-Object System.Messaging.MessagePropertyFilter
$propertyFilter.Body = $true
$propertyFilter.TimeToBeReceived =$true
$propertyFilter.Recoverable = $true
$propertyFilter.Id = $true
$propertyFilter.ResponseQueue = $true
$propertyFilter.CorrelationId = $true
$propertyFilter.Extension = $true
$propertyFilter.AppSpecific = $true
$propertyFilter.LookupId = $true
$errorQueue = New-Object System.Messaging.MessageQueue($queuePath)
$errorQueue.MessageReadPropertyFilter = $propertyFilter
$scope = New-Object System.Transactions.TransactionScope
try
{
$transactionType = [System.Messaging.MessageQueueTransactionType]::Automatic
$message = $errorQueue.ReceiveById($MessageId, [System.TimeSpan]::FromSeconds(5), $transactionType)
$failedQueuePath = ReadFailedQueueFromHeaders -Message $message
$failedQueue = New-Object System.Messaging.MessageQueue($failedQueuePath)
$failedQueue.Send($message, $transactionType)
$scope.Complete()
}
finally {
$scope.Dispose()
}
}
Function ReadFailedQueueFromHeaders
{
param(
[Parameter(Mandatory=$true)]
[System.Messaging.Message] $message
)
$rawheaders = [System.Text.Encoding]::UTF8.GetString($message.Extension)
$reader = New-Object System.IO.StringReader($rawheaders)
$xml = [xml] $reader.ReadToEnd()
$header = $xml.ArrayOfHeaderInfo.HeaderInfo | ? Key -eq "NServiceBus.FailedQ" | Select -ExpandProperty Value
return ('{0}\private$\{1}' -f $header.Split('@')[1], $header.Split('@')[0])
}
Using the retry helper methods
ReturnMessageToSourceQueue(
errorQueueMachine: Environment.MachineName,
errorQueueName: "error",
msmqMessageId: @"c390a6fb-4fb5-46da-927d-a156f75739eb\15386");