Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

RabbitMQ Transport Scripting

Target Version: NServiceBus 7.x

Example code and scripts to facilitate deployment and operational actions against RabbitMQ.

These samples use the RabbitMQ.Client NuGet Package.

Since the RabbitMQ.Client is not CLS Compliant it is not possible to run this code within PowerShell.

Native Send

The native send helper methods

A send involves the following actions:

  • Create and serialize headers.
  • Write a message body directly to RabbitMQ.

In C#

public static void SendMessage(string machineName, string queueName, string userName, string password, string messageBody, Dictionary<string, object> headers)
{
    using (var connection = OpenConnection(machineName, userName, password))
    using (var channel = connection.CreateModel())
    {
        var properties = channel.CreateBasicProperties();
        properties.MessageId = Guid.NewGuid().ToString();
        properties.Headers = headers;
        var body = Encoding.UTF8.GetBytes(messageBody);
        channel.BasicPublish(string.Empty, queueName, false, properties, body);
    }
}

static IConnection OpenConnection(string machine, string userName, string password)
{
    var connectionFactory = new ConnectionFactory
    {
        HostName = machine,
        Port = AmqpTcpEndpoint.UseDefaultPort,
        UserName = userName,
        Password = password,
    };

    return connectionFactory.CreateConnection();
}

Using the native send helper methods

SendMessage(
    machineName: "MachineName",
    queueName: "QueueName",
    userName: "admin",
    password: "password",
    messageBody: "{\"Property\":\"PropertyValue\"}",
    headers: new Dictionary<string, object>
    {
        {
            "NServiceBus.EnclosedMessageTypes", "MyNamespace.MyMessage"
        }
    });

In this example, the value MyNamespace.MyMessage represents the .NET type of the message. See the headers documentation for more information on the EnclosedMessageTypes header.

Return message to source queue

The retry helper methods

This code shows an example of how to perform the following actions:

  • Read a message from the error queue.
  • Extract the failed queue from the headers.
  • Forward that message to the failed queue name so it can be retried.
public static void ReturnMessageToSourceQueue(IConnection connection, string errorQueueName, string messageId)
{
    using (var channel = connection.CreateModel())
    {
        // Enable publisher confirms so that messages aren't removed from the error queue until the broker confirms it has accepted the new message
        channel.ConfirmSelect();

        BasicGetResult result;

        do
        {
            result = channel.BasicGet(errorQueueName, false);

            if (result == null || result.BasicProperties.MessageId != messageId)
            {
                continue;
            }
            ReadFailedQueueHeader(out var failedQueueName, result);

            channel.BasicPublish(string.Empty, failedQueueName, false, result.BasicProperties, result.Body);

            // Wait for confirmation that message is sent back to source queue
            channel.WaitForConfirmsOrDie();

            // Acknowledge and consume the incoming message
            channel.BasicAck(result.DeliveryTag, false);

            return;
        } while (result != null);

        throw new Exception($"Could not find message with id '{messageId}'");
    }
}

static void ReadFailedQueueHeader(out string queueName, BasicGetResult getResult)
{
    var headerBytes = (byte[])getResult.BasicProperties.Headers["NServiceBus.FailedQ"];
    var header = Encoding.UTF8.GetString(headerBytes);
    // In Versions 3.3.1 and below the machine name will be included after the @
    // In Versions 3.3.2 and above it will only be the queue name
    queueName = header.Split('@').First();
}

Using the retry helper methods

using (var connection = connectionFactory.CreateConnection())
{
    ReturnMessageToSourceQueue(
        connection: connection,
        errorQueueName: "error",
        messageId: "6698f196-bd50-4f3c-8819-a49e0163d57b");
}

Create queues

Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.

The create queue helper methods

public static class QueueCreationUtils
{
    public static void CreateQueue(string uri, string queueName, bool durableMessages, bool createExchange)
    {
        var connectionFactory = new ConnectionFactory
        {
            Uri = new Uri(uri)
        };

        using (var connection = connectionFactory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(
                queue: queueName,
                durable: durableMessages,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            if (createExchange)
            {
                CreateExchange(channel, queueName, durableMessages);
            }
        }
    }

    static void CreateExchange(IModel channel, string queueName, bool durableMessages)
    {
        channel.ExchangeDeclare(queueName, ExchangeType.Fanout, durableMessages);
        channel.QueueBind(queueName, queueName, string.Empty);
    }
}

Creating queues for an endpoint

To create all queues for a given endpoint name.

public static void CreateQueuesForEndpoint(string uri, string endpointName, bool durableMessages, bool createExchanges)
{
    // main queue
    QueueCreationUtils.CreateQueue(uri, endpointName, durableMessages, createExchanges);

    // callback queue
    QueueCreationUtils.CreateQueue(uri, $"{endpointName}.{Environment.MachineName}", durableMessages, createExchanges);

    // timeout queue - not used if using native timeouts
    QueueCreationUtils.CreateQueue(uri, $"{endpointName}.Timeouts", durableMessages, createExchanges);

    // timeout dispatcher queue - not used if using native timeouts
    QueueCreationUtils.CreateQueue(uri, $"{endpointName}.TimeoutsDispatcher", durableMessages, createExchanges);

    // retries queue
    // TODO: Only required in Versions 3 and below
    QueueCreationUtils.CreateQueue(uri, $"{endpointName}.Retries", durableMessages, createExchanges);

}

Using the create endpoint queues

CreateQueuesForEndpoint(
    uri: "amqp://guest:guest@localhost:5672",
    endpointName: "myendpoint",
    durableMessages: true,
    createExchanges: true);

To create shared queues

QueueCreationUtils.CreateQueue(
    uri: "amqp://guest:guest@localhost:5672",
    queueName: "error",
    durableMessages: true,
    createExchange: true);

QueueCreationUtils.CreateQueue(
    uri: "amqp://guest:guest@localhost:5672",
    queueName: "audit",
    durableMessages: true,
    createExchange: true);

Create HA policy

To configure HA policy, refer to the RabbitMQ HA documentation.

Delete queues

The delete helper queue methods

public static class QueueDeletionUtils
{
    public static void DeleteQueue(string uri, string queueName)
    {
        var connectionFactory = new ConnectionFactory
        {
            Uri = new Uri(uri)
        };

        using (var connection = connectionFactory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueUnbind(queueName, queueName, string.Empty, null);
            channel.ExchangeDelete(queueName);
            channel.QueueDelete(queueName);
        }
    }
}

To delete all queues for a given endpoint

public static void DeleteQueuesForEndpoint(string uri, string endpointName)
{
    // main queue
    QueueDeletionUtils.DeleteQueue(uri, endpointName);

    // callback queue
    QueueDeletionUtils.DeleteQueue(uri, $"{endpointName}.{Environment.MachineName}");

    // timeout queue - not used if using native timeouts
    QueueDeletionUtils.DeleteQueue(uri, $"{endpointName}.Timeouts");

    // timeout dispatcher queue - not used if using native timeouts
    QueueDeletionUtils.DeleteQueue(uri, $"{endpointName}.TimeoutsDispatcher");

    // retries queue
    // TODO: Only required in Versions 3 and below
    QueueDeletionUtils.DeleteQueue(uri, $"{endpointName}.Retries");
}
DeleteQueuesForEndpoint(
    uri: "amqp://guest:guest@localhost:5672",
    endpointName: "myendpoint");

To delete shared queues

QueueDeletionUtils.DeleteQueue(
    uri: "amqp://guest:guest@localhost:5672",
    queueName: "error");
QueueDeletionUtils.DeleteQueue(
    uri: "amqp://guest:guest@localhost:5672",
    queueName: "audit");

Related Articles