Getting Started
Architecture
NServiceBus
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

MSMQ transport delayed delivery

Component: MSMQ Transport
NuGet Package: NServiceBus.Transport.Msmq (2.x)
Target Version: NServiceBus 8.x

Because MSMQ lacks a mechanism for sending delayed messages, the MSMQ transport uses an external store for delayed messages. Messages that are to be delivered later (e.g. saga timeouts or delayed retries) are persisted in the delayed message store until they are due. When a message is due, it is retreived from the store and dispatched to its destination.

The MSMQ transport requires explicit configuration to enable delayed message delivery. For example:

var messageStore = new SqlServerDelayedMessageStore(
    connectionString: "database=(local); initial catalog=my_catalog; integrated security=true",
    schema: "my_schema", //optional, defaults to dbo
    tableName: "my_delayed_messages"); //optional, defaults to endpoint name with '.delayed' suffix

var transport = new MsmqTransport
{
    DelayedDelivery = new DelayedDeliverySettings(messageStore)
    {
        NumberOfRetries = 7,
        MaximumRecoveryFailuresPerSecond = 2,
        TimeToTriggerStoreCircuitBreaker = TimeSpan.FromSeconds(20),
        TimeToTriggerDispatchCircuitBreaker = TimeSpan.FromSeconds(15),
        TimeToTriggerFetchCircuitBreaker = TimeSpan.FromSeconds(45)
    }
};
endpointConfiguration.UseTransport(transport);

The SQL Server delayed message store (SqlServerDelayedMessageStore) is the only delayed message store that ships with the MSMQ transport.

How it works

A delayed message store implements the IDelayedMessageStore interface. Delayed message delivery has two parts:

Storing of delayed messages

A delayed message is stored using the Store method.

Polling and dispatching of delayed messages

The message store is polled for due delayed messages in a background task which periodically calls FetchNextDueTimeout. If the method returns a message, the message is sent (see next paragraph), and the method is immediately called again. If the method returns null, Next is called, which returns either a DateTimeOffset indicating when the next message will be due, or null if there are no delayed messages. If another delayed message is persisted in the meantime, the Store method wakes up the polling thread.

When a due delayed message is returned by FetchNextDueTimeout, the message is sent to the destination queue and then removed from the store using the Remove method. In case of an unexpected exception during forwarding the failure is registered using IncrementFailureCount. If the configured number of retries is exhausted the message is forwarded to the configured error queue.

Configuration

The settings described in this section allow changing the default behavior of the built-in delayed delivery store.

NumberOfRetries

Number of retries when trying to forward due delayed messages.

Defaults to 0.

TimeToTriggerStoreCircuitBreaker

Time to wait before triggering the circuit breaker that monitors the storing of delayed messages in the database.

Defaults to 30 seconds.

TimeToTriggerFetchCircuitBreaker

Time to wait before triggering the circuit breaker that monitors the fetching of due delayed messages from the database.

Defaults to 30 seconds.

TimeToTriggerDispatchCircuitBreaker

Time to wait before triggering the circuit breaker that monitors the dispatching of due delayed messages to the destination.

Defaults to 30 seconds.

MaximumRecoveryFailuresPerSecond

Maximum number of failed attempts per second to increment the per-message failure counter that triggers the recovery circuit breaker.

Defaults to 1 per sec.

Using a custom delayed message store

Create a class which implements the IDelayedMessageStore interface and pass an instance to the DelayedDeliverySettings constructor.

If the custom store needs to set up some infrastructure (create tables, etc.) then it must implement IDelayedMessageStoreWithInfrastructure. This interface extends IDelayedMessageStore with a SetupInfrastructure() method. SetupInfrastructure() is called before Initialize().

Consistency

In TransactionScope transaction mode, the delayed message store is expected to enlist in the TransactionScope to ensure exactly once behavior. FetchNextDueTimeout, Remove, and sending messages to their destination queues are all executed in a single distributed transaction. The built-in SQL Server store supports this mode of operation.

In lower transaction modes the dispatch behavior is at least once. FetchNextDueTimeout and Remove are executed in the same TransactionScope but sending messages to their destination queues is executed in a separate (inner) transport scope. If Remove fails, the message will be sent to the destination queue multiple times and the destination endpoint must handle the duplicates, using either the outbox feature or a custom de-duplication mechanism.

The built-in SQL Server delayed message store takes a pessimistic lock on the delayed message row in the FetchNextDueTimeout operation to prevent other physical instances of the same logical endpoint from delivering the same delayed message. A custom delayed message store must also take some kind of lock to prevent this from happening. For example, a delayed message store using Azure Blog Storage may take a lease lock.

IDelayedMessageStore

When creating a custom message store, the class can either implement IDelayedMessageStore

public class DelayedMessageStore : IDelayedMessageStore

or IDelayedMessageStoreWithInfrastructure

public class DelayedMessageStoreWithInfrastructure : IDelayedMessageStoreWithInfrastructure

The only difference between the two interfaces is the SetupInfrastructure method, which must be implemented in IDelayedMessageStoreWithInfrastructure to create the required storage tables if they don't exist yet. With IDelayedMessageStore the storage tables are expected to already exist.

public async Task SetupInfrastructure(CancellationToken cancellationToken = default)
{
    await TimeoutTableCreator.CreateIfNecessary(createSqlConnection, quotedFullName, cancellationToken);
}

In the above example, TimeoutTableCreator is responsible for executing the script against the database. For SQL Server, the script would be

public const string SqlCreateTable = @"
if not exists (
select * from sys.objects
where
    object_id = object_id('{0}')
    and type in ('U')
)
begin
create table {0} (
	    Id nvarchar(250) not null primary key,
    Destination nvarchar(200),
    State varbinary(max),
    Time datetime,
    Headers varbinary(max) not null,
    RetryCount INT NOT NULL default(0)
    )
end

if not exists
(
select *
from sys.indexes
where
    name = 'Index_Time' and
    object_id = object_id('{0}')
)
begin
create index Index_Time on {0} (Time);
end
";

With both interfaces, the Initialize method will be called with the name of the endpoint being initialized. The storage implementation should throw an exception if it can't support specified transaction mode, e.g. TransactionScope mode requires the storage to enlist in a distributed transaction managed by the DTC.

public Task Initialize(string endpointName, TransportTransactionMode transactionMode, CancellationToken cancellationToken = default)
{
    if (tableName == null)
    {
        tableName = $"{endpointName}.timeouts";
    }

    quotedFullName = $"{SqlNameHelper.Quote(schema)}.{SqlNameHelper.Quote(tableName)}";

    insertCommand = string.Format(SqlConstants.SqlInsert, quotedFullName);
    removeCommand = string.Format(SqlConstants.SqlDelete, quotedFullName);
    bumpFailureCountCommand = string.Format(SqlConstants.SqlUpdate, quotedFullName);
    nextCommand = string.Format(SqlConstants.SqlGetNext, quotedFullName);
    fetchCommand = string.Format(SqlConstants.SqlFetch, quotedFullName);

    return Task.CompletedTask;
}
public const string SqlInsert = "INSERT INTO {0} (Id, Destination, Time, Headers, State) VALUES (@id, @destination, @time, @headers, @state);";
public const string SqlFetch = "SELECT TOP 1 Id, Destination, Time, Headers, State, RetryCount FROM {0} WITH (READPAST, UPDLOCK, ROWLOCK) WHERE Time < @time ORDER BY Time";
public const string SqlDelete = "DELETE {0} WHERE Id = @id";
public const string SqlUpdate = "UPDATE {0} SET RetryCount = RetryCount + 1 WHERE Id = @id";
public const string SqlGetNext = "SELECT TOP 1 Time FROM {0} ORDER BY Time";

The remaining methods implement the logic required for the message store:

Store Stores a delayed message.

public async Task Store(DelayedMessage timeout, CancellationToken cancellationToken = default)
{
    using (var cn = await createSqlConnection(cancellationToken))
    using (var cmd = new SqlCommand(insertCommand, cn))
    {
        cmd.Parameters.AddWithValue("@id", timeout.MessageId);
        cmd.Parameters.AddWithValue("@destination", timeout.Destination);
        cmd.Parameters.AddWithValue("@time", timeout.Time);
        cmd.Parameters.AddWithValue("@headers", timeout.Headers);
        cmd.Parameters.AddWithValue("@state", timeout.Body);
        await cn.OpenAsync(cancellationToken);
        _ = await cmd.ExecuteNonQueryAsync(cancellationToken);
    }
}

Remove Removes a due delayed message that has been dispatched to its destination from the store. It must return true if the removal succeeded or false if there was nothing to remove because the delayed message was already gone.

public async Task<bool> Remove(DelayedMessage timeout, CancellationToken cancellationToken = default)
{
    using (var cn = await createSqlConnection(cancellationToken))
    using (var cmd = new SqlCommand(removeCommand, cn))
    {
        cmd.Parameters.AddWithValue("@id", timeout.MessageId);
        await cn.OpenAsync(cancellationToken);
        var affected = await cmd.ExecuteNonQueryAsync(cancellationToken);
        return affected == 1;
    }
}

IncrementFailureCount Increments the counter of failures for a given due delayed message. It must return true if the increment succeeded or false if the delayed message was already gone.

public async Task<bool> IncrementFailureCount(DelayedMessage timeout, CancellationToken cancellationToken = default)
{
    using (var cn = await createSqlConnection(cancellationToken))
    using (var cmd = new SqlCommand(bumpFailureCountCommand, cn))
    {
        cmd.Parameters.AddWithValue("@id", timeout.MessageId);
        await cn.OpenAsync(cancellationToken);
        var affected = await cmd.ExecuteNonQueryAsync(cancellationToken);
        return affected == 1;
    }
}

Next Returns the date and time set for the next delayed message to become due or null if there are no delayed messages stored.

public async Task<DateTimeOffset?> Next(CancellationToken cancellationToken = default)
{
    using (var cn = await createSqlConnection(cancellationToken))
    using (var cmd = new SqlCommand(nextCommand, cn))
    {
        await cn.OpenAsync(cancellationToken);
        var result = (DateTime?)await cmd.ExecuteScalarAsync(cancellationToken);
        return result.HasValue ? (DateTimeOffset?)new DateTimeOffset(result.Value, TimeSpan.Zero) : null;
    }
}

FetchNextDueTimeout Retrieves the oldest due delayed message from the store at a specified date and time, or returns null if there are no due delayed messages.

public async Task<DelayedMessage> FetchNextDueTimeout(DateTimeOffset at, CancellationToken cancellationToken = default)
{
    DelayedMessage result = null;
    using (var cn = await createSqlConnection(cancellationToken))
    using (var cmd = new SqlCommand(fetchCommand, cn))
    {
        cmd.Parameters.AddWithValue("@time", at.UtcDateTime);

        await cn.OpenAsync(cancellationToken);
        using (var reader = await cmd.ExecuteReaderAsync(CommandBehavior.SingleRow, cancellationToken))
        {
            if (await reader.ReadAsync(cancellationToken))
            {
                result = new DelayedMessage
                {
                    MessageId = (string)reader[0],
                    Destination = (string)reader[1],
                    Time = (DateTime)reader[2],
                    Headers = (byte[])reader[3],
                    Body = (byte[])reader[4],
                    NumberOfRetries = (int)reader[5]
                };
            }
        }
    }

    return result;
}