The MSMQ Subscription storage can be used to enable publish/subscribe with MSMQ without the need for any additional persister.
This can only be used to store subscriptions. To use sagas, timeouts, deferred messages, or the Outbox, a different persister is required.
Persistence at a glance
For a description of each feature, see the persistence at a glance legend.
Feature | |
---|---|
Supported storage types | Subscriptions only |
Transactions | Does not apply to subscriptions. |
Concurrency control | Does not apply to subscriptions. |
Scripted deployment | Not supported |
Installers | Subscription queues are created by installers. |
Configuration
MSMQ Persistence does not support scaled-out publishers. This is because the MSMQ storage is local to the machine, and a subscription message will only be delivered to one endpoint instance of a given logical endpoint, and only that instance will be able to update its information, while other instances remain unaware of the new subscriber.
The subscription queue can contain duplicate entries. This is by design and does not result in events being published multiple times. Subscription entries are added for each subscription request received. After a publisher restarts, the subscription queue state will be rewritten and deduplicated.
To configure MSMQ as the subscription persistence:
endpointConfiguration.UsePersistence<MsmqPersistence>();
If the subscription storage is not specified, NServiceBus uses a queue called [EndpointName].
. This is to avoid all endpoints deployed on the same server from having to share the same queue to store subscriptions. When a subscription queue is not specified and the default queue is being used, the following message will get logged:
The queue used to store subscriptions has not been configured, the default 'NServiceBus.Subscriptions' will be used.
When using MSMQ subscription persistence on multiple endpoints running on the same machine, every endpoint must have a dedicated subscription storage queue.
Versions 5.x and Versions 6.x of NServiceBus used a default queue called NServiceBus.
. An exception is thrown on startup if this queue is detected. Either specify the subscription queue explicitly or move the subscription messages to the new default queue to avoid message loss.
In order to specify a different queue, use the code API or specify a configuration section.
Via code
Call the following code API passing the subscription queue to use:
var persistence = endpointConfiguration.UsePersistence<MsmqPersistence>();
persistence.SubscriptionQueue("YourEndpointName.Subscriptions");
Via app.config
Add the following configSections
and subsequent config entry:
<configuration>
<configSections>
<section name="MsmqSubscriptionStorageConfig"
type="NServiceBus.Config.MsmqSubscriptionStorageConfig, NServiceBus.Core" />
</configSections>
<MsmqSubscriptionStorageConfig Queue="YourEndpointName.Subscriptions" />
</configuration>
Timeouts persistence
MsmqPersistence
provides persistence only for storing event subscriptions. By default, NServiceBus also requires a timeout persistence, which is used by delayed retries, saga timeouts and for delayed delivery.
Version 2 of MSMQ Transport supports delayed delivery of messages by persisting them in a delayed message store. There is a built-in SQL Server-based store and an extention point to use a custom storage.
var messageStore = new SqlServerDelayedMessageStore(
connectionString: "database=(local); initial catalog=my_catalog; integrated security=true",
schema: "my_schema", //optional, defaults to dbo
tableName: "my_delayed_messages"); //optional, defaults to endpoint name with '.delayed' suffix
var transport = new MsmqTransport
{
DelayedDelivery = new DelayedDeliverySettings(messageStore)
{
NumberOfRetries = 7,
MaximumRecoveryFailuresPerSecond = 2,
TimeToTriggerStoreCircuitBreaker = TimeSpan.FromSeconds(20),
TimeToTriggerDispatchCircuitBreaker = TimeSpan.FromSeconds(15),
TimeToTriggerFetchCircuitBreaker = TimeSpan.FromSeconds(45)
}
};
endpointConfiguration.UseTransport(transport);