This document describes how to consume messages from and send messages to non-NServiceBus endpoints via MSMQ in integration scenarios.
These examples use the System.Messaging and System.Transactions assemblies.
The Systems.
namespace is unavailable in .NET Core.
When using the C# code samples, add the proper includes for the System.
and System.
assemblies in the program using these functions. When using the PowerShell scripts, include these assemblies by calling Add-Type
in the script.
Native send
The native send helper methods
Sending involves the following actions:
- Creating and serializing headers.
- Writing a message body directly to MSMQ.
In C#
public static void SendMessage(string queuePath, string messageBody, List<HeaderInfo> headers)
{
using (var scope = new TransactionScope())
{
using (var queue = new MessageQueue(queuePath))
using (var message = new Message())
{
var bytes = Encoding.UTF8.GetBytes(messageBody);
message.BodyStream = new MemoryStream(bytes);
message.Extension = CreateHeaders(headers);
queue.Send(message, MessageQueueTransactionType.Automatic);
}
scope.Complete();
}
}
public static byte[] CreateHeaders(List<HeaderInfo> headerInfos)
{
var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
using (var stream = new MemoryStream())
{
serializer.Serialize(stream, headerInfos);
return stream.ToArray();
}
}
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
In PowerShell
Set-StrictMode -Version 2.0
Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions
Add-Type @"
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
"@
Function CreateHeaders {
param(
[Parameter(Mandatory=$true)]
[ValidateNotNull()]
[Hashtable] $entries
)
$headerinfos = New-Object System.Collections.Generic.List[HeaderInfo]
$entries.Keys | % {
$headerinfo = New-Object HeaderInfo
$headerinfo.Key = $_
$headerinfo.Value = $entries[$_]
$headerinfos.Add($headerinfo)
}
$serializer = New-Object System.Xml.Serialization.XmlSerializer( $headerinfos.GetType() )
$stream = New-Object System.IO.MemoryStream
try
{
$serializer.Serialize($stream, $headerInfos)
return $stream.ToArray()
}
finally
{
$stream.Dispose()
}
}
Function SendMessage {
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $QueuePath,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageBody,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[Hashtable] $Headers
)
$scope = New-Object System.Transactions.TransactionScope
$queue = New-Object System.Messaging.MessageQueue($QueuePath)
$message = New-Object System.Messaging.Message
try
{
$msgStream = New-Object System.IO.MemoryStream
$msgBytes = [System.Text.Encoding]::UTF8.GetBytes($MessageBody)
$msgStream.Write($msgBytes, 0, $msgBytes.Length)
$message.BodyStream = $msgStream
$message.Extension = CreateHeaders $Headers
$queue.Send($message, [System.Messaging.MessageQueueTransactionType]::Automatic)
$scope.Complete()
}
finally {
$scope.Dispose()
$message.Dispose()
$queue.Dispose()
}
}
Using the native send helper methods
In C#
SendMessage(
queuePath: @"MachineName\private$\QueueName",
messageBody: "{\"Property\":\"PropertyValue\"}",
headers: new List<HeaderInfo>
{
new HeaderInfo
{
Key = "NServiceBus.EnclosedMessageTypes",
Value = "MyNamespace.MyMessage"
}
});
In PowerShell
SendMessage -QueuePath 'MachineName\private$\QueueName' `
-MessageBody: '{\"Property\":\"PropertyValue\"}' `
-Headers @{ 'NServiceBus.EnclosedMessageTypes' = 'MyNamespace.MyMessage'; }
Return message to source queue
The retry helper methods
Retrying a message involves the following actions:
- Reading a message from the error queue.
- Extracting the failed queue from the headers.
- Forwarding that message to the failed queue name so it can be retried.
In C#
public static void ReturnMessageToSourceQueue(string errorQueueMachine, string errorQueueName, string msmqMessageId)
{
var path = $@"{errorQueueMachine}\private$\{errorQueueName}";
var errorQueue = new MessageQueue(path);
{
var messageReadPropertyFilter = new MessagePropertyFilter
{
Body = true,
TimeToBeReceived = true,
Recoverable = true,
Id = true,
ResponseQueue = true,
CorrelationId = true,
Extension = true,
AppSpecific = true,
LookupId = true,
};
errorQueue.MessageReadPropertyFilter = messageReadPropertyFilter;
using (var scope = new TransactionScope())
{
var transactionType = MessageQueueTransactionType.Automatic;
var message = errorQueue.ReceiveById(msmqMessageId, TimeSpan.FromSeconds(5), transactionType);
var fullPath = ReadFailedQueueHeader(message);
using (var failedQueue = new MessageQueue(fullPath))
{
failedQueue.Send(message, transactionType);
}
scope.Complete();
}
}
}
static string ReadFailedQueueHeader(Message message)
{
var headers = ExtractHeaders(message);
var header = headers.Single(x => x.Key == "NServiceBus.FailedQ").Value;
var queueName = header.Split('@')[0];
var machineName = header.Split('@')[1];
return $@"{machineName}\private$\{queueName}";
}
public static List<HeaderInfo> ExtractHeaders(Message message)
{
var serializer = new XmlSerializer(typeof(List<HeaderInfo>));
var extension = Encoding.UTF8.GetString(message.Extension);
using (var stringReader = new StringReader(extension))
{
return (List<HeaderInfo>)serializer.Deserialize(stringReader);
}
}
public class HeaderInfo
{
public string Key { get; set; }
public string Value { get; set; }
}
In PowerShell
Set-StrictMode -Version 2.0
Add-Type -AssemblyName System.Messaging
Add-Type -AssemblyName System.Transactions
Function ReturnMessageToSourceQueue
{
param(
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $ErrorQueueMachine,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $ErrorQueueName,
[Parameter(Mandatory=$true)]
[ValidateNotNullOrEmpty()]
[string] $MessageId
)
$queuePath = '{0}\private$\{1}'-f $ErrorQueueMachine, $ErrorQueueName
$propertyFilter = New-Object System.Messaging.MessagePropertyFilter
$propertyFilter.Body = $true
$propertyFilter.TimeToBeReceived =$true
$propertyFilter.Recoverable = $true
$propertyFilter.Id = $true
$propertyFilter.ResponseQueue = $true
$propertyFilter.CorrelationId = $true
$propertyFilter.Extension = $true
$propertyFilter.AppSpecific = $true
$propertyFilter.LookupId = $true
$errorQueue = New-Object System.Messaging.MessageQueue($queuePath)
$errorQueue.MessageReadPropertyFilter = $propertyFilter
$scope = New-Object System.Transactions.TransactionScope
try
{
$transactionType = [System.Messaging.MessageQueueTransactionType]::Automatic
$message = $errorQueue.ReceiveById($MessageId, [System.TimeSpan]::FromSeconds(5), $transactionType)
$failedQueuePath = ReadFailedQueueFromHeaders -Message $message
$failedQueue = New-Object System.Messaging.MessageQueue($failedQueuePath)
$failedQueue.Send($message, $transactionType)
$scope.Complete()
}
finally {
$scope.Dispose()
}
}
Function ReadFailedQueueFromHeaders
{
param(
[Parameter(Mandatory=$true)]
[System.Messaging.Message] $message
)
$rawheaders = [System.Text.Encoding]::UTF8.GetString($message.Extension)
$reader = New-Object System.IO.StringReader($rawheaders)
$xml = [xml] $reader.ReadToEnd()
$header = $xml.ArrayOfHeaderInfo.HeaderInfo | ? Key -eq "NServiceBus.FailedQ" | Select -ExpandProperty Value
return ('{0}\private$\{1}' -f $header.Split('@')[1], $header.Split('@')[0])
}
Using the retry helper methods
ReturnMessageToSourceQueue(
errorQueueMachine: Environment.MachineName,
errorQueueName: "error",
msmqMessageId: @"c390a6fb-4fb5-46da-927d-a156f75739eb\15386");