Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

MSMQ Subscription Persistence

NuGet Package: NServiceBus.Transport.Msmq (2.x)
Target Version: NServiceBus 8.x

The MSMQ Subscription storage can be used to enable publish/subscribe with MSMQ without the need for any additional persister.

This can only be used to store subscriptions. To use sagas, timeouts, deferred messages, or the Outbox, a different persister is required.

Persistence at a glance

For a description of each feature, see the persistence at a glance legend.

Feature
Supported storage typesSubscriptions only
TransactionsDoes not apply to subscriptions.
Concurrency controlDoes not apply to subscriptions.
Scripted deploymentNot supported
InstallersSubscription queues are created by installers.

Configuration

MSMQ Persistence does not support scaled-out publishers. This is because the MSMQ storage is local to the machine, and a subscription message will only be delivered to one endpoint instance of a given logical endpoint, and only that instance will be able to update its information, while other instances remain unaware of the new subscriber.
The subscription queue can contain duplicate entries. This is by design and does not result in events being published multiple times. Subscription entries are added for each subscription request received. After a publisher restarts, the subscription queue state will be rewritten and deduplicated.

To configure MSMQ as the subscription persistence:

endpointConfiguration.UsePersistence<MsmqPersistence>();

If the subscription storage is not specified, NServiceBus uses a queue called [EndpointName].Subscriptions. This is to avoid all endpoints deployed on the same server from having to share the same queue to store subscriptions. When a subscription queue is not specified and the default queue is being used, the following message will get logged:

The queue used to store subscriptions has not been configured, the default 'NServiceBus.Subscriptions' will be used.

When using MSMQ subscription persistence on multiple endpoints running on the same machine, every endpoint must have a dedicated subscription storage queue.

Versions 5.x and Versions 6.x of NServiceBus used a default queue called NServiceBus.Subscriptions. An exception is thrown on startup if this queue is detected. Either specify the subscription queue explicitly or move the subscription messages to the new default queue to avoid message loss.

In order to specify a different queue, use the code API or specify a configuration section.

Via code

Call the following code API passing the subscription queue to use:

var persistence = endpointConfiguration.UsePersistence<MsmqPersistence>();
persistence.SubscriptionQueue("YourEndpointName.Subscriptions");

Via app.config

Add the following configSections and subsequent config entry:

<configuration>
  <configSections>
    <section name="MsmqSubscriptionStorageConfig" 
             type="NServiceBus.Config.MsmqSubscriptionStorageConfig, NServiceBus.Core" />
  </configSections>
  <MsmqSubscriptionStorageConfig Queue="YourEndpointName.Subscriptions" />
</configuration>

Timeouts persistence

MsmqPersistence provides persistence only for storing event subscriptions. By default, NServiceBus also requires a timeout persistence, which is used by delayed retries, saga timeouts and for delayed delivery.

Version 2 of MSMQ Transport supports delayed delivery of messages by persisting them in a delayed message store. There is a built-in SQL Server-based store and an extention point to use a custom storage.

var messageStore = new SqlServerDelayedMessageStore(
    connectionString: "database=(local); initial catalog=my_catalog; integrated security=true",
    schema: "my_schema", //optional, defaults to dbo
    tableName: "my_delayed_messages"); //optional, defaults to endpoint name with '.delayed' suffix

var transport = new MsmqTransport
{
    DelayedDelivery = new DelayedDeliverySettings(messageStore)
    {
        NumberOfRetries = 7,
        MaximumRecoveryFailuresPerSecond = 2,
        TimeToTriggerStoreCircuitBreaker = TimeSpan.FromSeconds(20),
        TimeToTriggerDispatchCircuitBreaker = TimeSpan.FromSeconds(15),
        TimeToTriggerFetchCircuitBreaker = TimeSpan.FromSeconds(45)
    }
};
endpointConfiguration.UseTransport(transport);

Samples


Last modified