The RabbitMQ transport requires a connection string to connect to the RabbitMQ broker, and there are two different styles to choose from. It can accept the standard amqp URI connection strings, or a custom format documented below.
Specifying the connection string via code
To specify the connection string in code:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ConnectionString("My custom connection string");
Connection string options
Below is the list of connection string options. When constructing a connection string, these options should be separated by a semicolon.
Host
The host name of the broker.
The host name is required.
Port
The port where the broker listens.
Default: 5671
if the UseTls
setting is set to true
, otherwise the default value is 5672
VirtualHost
The virtual host to use.
Default: /
UserName
The user name to use to connect to the broker.
Default: guest
Password
The password to use to connect to the broker.
Default: guest
UseTls
Indicates if the connection to the broker should be secured with TLS.
Default: false
Transport Layer Security support
Secure connections to the broker using Transport Layer Security (TLS) are supported. To enable TLS support, set the UseTls
setting to true
in the connection string:
host=broker1;UseTls=true
Client authentication
If the broker has been configured to require client authentication, a client certificate must be specified:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetClientCertificate("path", "password");
This can also be done by passing a certificate in directly:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetClientCertificate(new X509Certificate2("/path/to/certificate"));
Remote certificate validation
By default, the RabbitMQ client will refuse to connect to the broker if the remote server certificate is invalid. This validation can be disabled with the following setting:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.DisableRemoteCertificateValidation();
External authentication
By default, the broker requires a username and password to authenticate the client, but it can be configured to use other external authentication mechanisms. If the broker requires an external authentication mechanism, the client can be configured to use it with the following setting:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.UseExternalAuthMechanism();
Connecting to multiple cluster nodes
When connecting to a RabbitMQ cluster, it is beneficial if endpoints are able to connect to any of the nodes in the cluster. For example, if a node goes down, the endpoint can attempt to reconnect to a different node and continue operation.
Since endpoint connection strings are limited to specifying a single hostname, the AddClusterNode
API can be used to tell the endpoint about additional cluster nodes:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.AddClusterNode("node2", useTls: false);
There is another overload to specify a non-default port:
transport.AddClusterNode("node2", 5675, useTls: true);
Configuring RabbitMQ management API access
The transport uses the RabbitMQ management API to verify broker requirements and enable delivery limit validation.
The RabbitMQ management plugin must be enabled, and the plugin's statistics and metrics collection must not be disabled. The port that the management API is using needs to be accessible by the transport. The default port is 15672
for HTTP and 15671
for HTTPS.
By default, the transport will infer the settings to use to access the management API from the connection string. If the broker configuration requires different settings to access the management API, custom settings can be provided:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.ManagementApiConfiguration(
url: "{scheme}://{host}:{port}",
userName: "{username}",
password: "{password}"
);
There are also overloads to specify just the URL or just the credentials and continue to rely on the connection string for the unspecified settings.
Delivery limit validation
Quorum queues have a delivery limit setting that controls how many times the broker will redeliver a message before it considers the message poison and deletes the message. While the transport does use the x-delivery-count
header that is part of this feature, it requires the delivery limit to be set to unlimited
in order to ensure that a message is not unexpectedly deleted by the broker while retrying the message based on the endpoint's recoverability settings.
In RabbitMQ version 4.0 and above, the default delivery limit has been changed from unlimited
to 20
. Because of this change, the transport now validates that queue being consumed by the endpoint has been properly configured to have an unlimited
delivery limit.
As part of this validation process, the transport will attempt to use the management API to create a policy that applies to the queue to set the delivery limit to unlimited
. This policy will only be created if the transport does not detect another policy already applying to a queue.
If the transport cannot validate that the queue has an unlimited
delivery limit and cannot create a policy to change the setting, the validation will fail and prevent the endpoint from starting.
The credentials used to access the RabbitMQ management API require policymaker permissions in order for the transport to be be able to create unlimited
delivery limit policies.
If the endpoint will not start because delivery limit validation is failing, manual intervention is required. If there is already a policy applied to the queue, it either needs to be updated to include setting the delivery limit on the queue, or the policy needs to be removed from the queue entirely. Removing the policy will let the transport successfully create its policy the next time the endpoint is started.
If it not possible to allow the validation to pass, the DoNotValidateDeliveryLimits
configuration method can be used to skip the validation entirely. This is not recommended because it leaves the endpoint open to the possibility of message loss in the case where the number of immediate retries is greater than the delivery limit of the queue.
The queue validate-delivery-limit
command can also be used to validate the delivery limit of a queue.
Controlling the prefetch count
When consuming messages from the broker, throughput can be improved by having the consumer prefetch additional messages. The prefetch count is calculated by multiplying the maximum concurrency by the prefetch multiplier. The default value of the multiplier is 3, but it can be changed as follows:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.PrefetchMultiplier(4);
Alternatively, the whole calculation can be overridden by setting the prefetch count directly as follows:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.PrefetchCount(100);
If the configured value is less than the maximum concurrency, the prefetch count will be set to the maximum concurrency value instead.
Controlling behavior when the broker connection is lost
The RabbitMQ transport monitors the connection to the broker and will trigger the critical error action if the connection fails and stays disconnected for the configured amount of time.
Heartbeat interval
Controls how frequently AMQP heartbeat messages are sent between the endpoint and the broker.
Type: System.
Default: 00:01:00
(1 minute)
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetHeartbeatInterval(TimeSpan.FromSeconds(30));
Network recovery interval
Controls the time to wait between attempts to reconnect to the broker if the connection is lost.
Type: System.
Default: 00:00:10
(10 seconds)
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetNetworkRecoveryInterval(TimeSpan.FromSeconds(30));
TimeToWaitBeforeTriggering
Controls the amount of time the transport waits after a failure is detected before triggering the critical error action.
Type: System.
Default: 00:02:00
(2 minutes)
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.TimeToWaitBeforeTriggeringCircuitBreaker(TimeSpan.FromMinutes(2));
Debugging recommendations
It can be helpful to increase the heartbeat interval to avoid connection timeouts while debugging:
var transport = endpointConfiguration.UseTransport<RabbitMQTransport>();
transport.SetHeartbeatInterval(TimeSpan.FromMinutes(10));