Getting Started
Architecture
NServiceBus
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

SQL Server transport SQL statements

NuGet Package: NServiceBus.Transport.SqlServer (8.x)
Target Version: NServiceBus 9.x

Installation

At installation time the queue creation script is executed

IF EXISTS (
    SELECT *
    FROM {1}.sys.objects
    WHERE object_id = OBJECT_ID(N'{0}')
        AND type in (N'U'))
RETURN

EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive'

IF EXISTS (
    SELECT *
    FROM {1}.sys.objects
    WHERE object_id = OBJECT_ID(N'{0}')
        AND type in (N'U'))
BEGIN
    EXEC sp_releaseapplock @Resource = '{0}_lock'
    RETURN
END

BEGIN TRY
    CREATE TABLE {0} (
        Id uniqueidentifier NOT NULL,
        CorrelationId varchar(255),
        ReplyToAddress varchar(255),
        Recoverable bit NOT NULL,
        Expires datetime,
        Headers nvarchar(max) NOT NULL,
        Body varbinary(max),
        RowVersion bigint IDENTITY(1,1) NOT NULL
    );

    CREATE NONCLUSTERED INDEX Index_RowVersion ON {0}
    (
	    [RowVersion] ASC
    )

    CREATE NONCLUSTERED INDEX Index_Expires ON {0}
    (
        Expires
    )
    INCLUDE
    (
        Id,
        RowVersion
    )
    WHERE
        Expires IS NOT NULL
END TRY
BEGIN CATCH
    EXEC sp_releaseapplock @Resource = '{0}_lock';
    THROW;
END CATCH;

EXEC sp_releaseapplock @Resource = '{0}_lock'

The subscriptions table is also created:

IF EXISTS (
    SELECT *
    FROM {1}.sys.objects
    WHERE object_id = OBJECT_ID(N'{0}')
        AND type in (N'U'))
RETURN

EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive'

IF EXISTS (
    SELECT *
    FROM {1}.sys.objects
    WHERE object_id = OBJECT_ID(N'{0}')
        AND type in (N'U'))
BEGIN
    EXEC sp_releaseapplock @Resource = '{0}_lock'
    RETURN
END

BEGIN TRY
    CREATE TABLE {0} (
        QueueAddress NVARCHAR(200) NOT NULL,
        Endpoint NVARCHAR(200) NOT NULL,
        Topic NVARCHAR(200) NOT NULL,
        PRIMARY KEY CLUSTERED
        (
            Endpoint,
            Topic
        )
    )
END TRY
BEGIN CATCH
    EXEC sp_releaseapplock @Resource = '{0}_lock';
    THROW;
END CATCH;

EXEC sp_releaseapplock @Resource = '{0}_lock'

Creating table structure in production

There are some special considerations for creating the queue tables in production environments.

NServiceBus installers

When using NServiceBus installers the queue tables are created automatically before the endpoint is started.

the user account under which the installation of the host is performed must have CREATE TABLE and VIEW DEFINITION permissions on the database where the queues are to be created. The account under which the service runs does not have to have these permissions. Standard read/write/delete permissions (e.g. db_datawriter and db_datareader roles) are enough.

Scripted

Using NServiceBus installers does not allow review of the actual T-SQL statements that are going be executed. For that reason, some prefer to store the actual scripts in a version control system.

The script above is parametrized at execution time with the queue name so it cannot be used as-is. Alternatively, the scripts could be generated from the development or staging environments, then directly executed on a production environment by DBAs to replicate that table structure.

To capture the script for later execution use SQL Server Management Studio. Connect to the server (e.g. development or staging) and right-click the database with the queue tables. From "Tasks" menu choose "Generate Scripts..." and generate the scripts for relevant tables.

Store these scripts so they can be executed as part of the production deployment.

Runtime

The following are the T-SQL statements used by the transport at runtime.

Peek message

Checks if there are messages in the queue.

SELECT isnull(cast(max([RowVersion]) - min([RowVersion]) + 1 AS int), 0) Id FROM {0} WITH (READPAST, READCOMMITTEDLOCK)

Purge expired

Purges expired messages from the queue.

DELETE FROM {0}
WHERE RowVersion
    IN (SELECT TOP (@BatchSize) RowVersion
        FROM {0} WITH (READPAST)
        WHERE Expires < GETUTCDATE())

Purge at startup

Used by an endpoint to optionally purge all message on startup.

DELETE FROM {0}

Receive message

The T-SQL statements for sending and receiving messges execute with NOCOUNT ON option. However, this does not affect the original value of this setting. The original value is saved at the beginning and restored after executing the statement.

Retrieves a message from the queue.

DECLARE @NOCOUNT VARCHAR(3) = 'OFF';
IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON';
SET NOCOUNT ON;

WITH message AS (
    SELECT TOP(1) *
    FROM {0} WITH (UPDLOCK, READPAST, ROWLOCK)
    ORDER BY RowVersion)
DELETE FROM message
OUTPUT
    deleted.Id,
    CASE WHEN deleted.Expires IS NULL
        THEN 0
        ELSE CASE WHEN deleted.Expires > GETUTCDATE()
            THEN 0
            ELSE 1
        END
    END,
    deleted.Headers,
    deleted.Body;

IF (@NOCOUNT = 'ON') SET NOCOUNT ON;
IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;
The CorrelationId, ReplyToAddress and Recoverable columns are required for backwards compatibility with version 1 of the NServiceBus.SqlServer package. When receiving messages sent by endpoints that use later versions, the values of correlation ID and reply-to address should be read from the headers (NServiceBus.CorrelationId and NServiceBus.ReplyToAddress) instead. The value Recoverable can be ignored as it is always true/1.

Send message

Places a message on the queue.

DECLARE @NOCOUNT VARCHAR(3) = 'OFF';
IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'
SET NOCOUNT ON;

INSERT INTO {0} (
    Id,
    Expires,
    Headers,
    Body)
VALUES (
    @Id,
    CASE WHEN @TimeToBeReceivedMs IS NOT NULL
        THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END,
    @Headers,
    @Body);

IF (@NOCOUNT = 'ON') SET NOCOUNT ON;
IF (@NOCOUNT = 'OFF') SET NOCOUNT OFF;
The CorrelationId, ReplyToAddress and Recoverable columns are required for backwards compatibility with version 1 of the NServiceBus.SqlServer package. When sending messages to endpoints that use later versions, the values of the correlation ID and reply-to address columns should be set to NULL and the actual values provided in the headers (NServiceBus.CorrelationId and NServiceBus.ReplyToAddress). The value Recoverable should always be true/1.

Subscribe to a topic

Subscribe and endpoint to a topic.

MERGE {0} WITH (HOLDLOCK, TABLOCK) AS target
USING(SELECT @Endpoint AS Endpoint, @QueueAddress AS QueueAddress, @Topic AS Topic) AS source
ON target.Endpoint = source.Endpoint
AND target.Topic = source.Topic
WHEN MATCHED AND target.QueueAddress <> source.QueueAddress THEN
UPDATE SET QueueAddress = @QueueAddress
WHEN NOT MATCHED THEN
INSERT
(
    QueueAddress,
    Topic,
    Endpoint
)
VALUES
(
    @QueueAddress,
    @Topic,
    @Endpoint
);

Unsubscribe from a topic

Unsubscribe an endpoint from a topic.

DELETE FROM {0}
WHERE
    Endpoint = @Endpoint and
    Topic = @Topic

Find all subscribers

Get a list of all endpoints that subscribe to a topic.

SELECT DISTINCT QueueAddress
FROM {0}
WHERE Topic IN ({1})

Missing index warning

Used to log a warning if a required index is missing. See also Upgrade from version 2 to 3.

SELECT COUNT(*)
FROM sys.indexes i
INNER JOIN sys.index_columns AS ic ON ic.index_id = i.index_id AND ic.object_id = i.object_id AND ic.key_ordinal = 1
INNER JOIN sys.columns AS c ON c.column_id = ic.column_id AND c.object_id = ic.object_id
WHERE i.object_id = OBJECT_ID('{0}')
AND c.name = 'Expires'

Check column type

Used to log a warning if the message headers data type is non-unicode. See also Supporting unicode characters in headers.

SELECT t.name
FROM sys.columns c
INNER JOIN sys.types t ON c.system_type_id = t.system_type_id
WHERE c.object_id = OBJECT_ID('{0}')
    AND c.name = 'Headers'

Create Delayed Queue Table

Performs delayed queue creation.

IF EXISTS (
    SELECT *
    FROM {1}.sys.objects
    WHERE object_id = OBJECT_ID(N'{0}')
        AND type in (N'U'))
RETURN

EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive'

IF EXISTS (
    SELECT *
    FROM {1}.sys.objects
    WHERE object_id = OBJECT_ID(N'{0}')
        AND type in (N'U'))
BEGIN
    EXEC sp_releaseapplock @Resource = '{0}_lock'
    RETURN
END

BEGIN TRY
    CREATE TABLE {0} (
        Headers nvarchar(max) NOT NULL,
        Body varbinary(max),
        Due datetime NOT NULL,
        RowVersion bigint IDENTITY(1,1) NOT NULL
    );

    CREATE NONCLUSTERED INDEX [Index_Due] ON {0}
    (
        [Due]
    )
END TRY
BEGIN CATCH
    EXEC sp_releaseapplock @Resource = '{0}_lock';
    THROW;
END CATCH;

EXEC sp_releaseapplock @Resource = '{0}_lock'

Move Due Delayed Messages

Moves due messages from the delayed queue table to the input queue table.

;WITH message AS (
    SELECT TOP(@BatchSize) *
    FROM {0} WITH (UPDLOCK, READPAST, ROWLOCK)
    WHERE Due < GETUTCDATE())
DELETE FROM message
OUTPUT
    NEWID(),
    NULL,
    NULL,
    1,
    NULL,
    deleted.Headers,
    deleted.Body
INTO {1} (Id, CorrelationId, ReplyToAddress, Recoverable, Expires, Headers, Body);;

SELECT TOP 1 GETUTCDATE() as UtcNow, Due as NextDue
FROM {0} WITH (READPAST)
ORDER BY Due

Store Delayed Message

Stores a message in the delayed queue table.

DECLARE @NOCOUNT VARCHAR(3) = 'OFF';
IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'
SET NOCOUNT ON;

DECLARE @DueAfter DATETIME = GETUTCDATE();
SET @DueAfter = DATEADD(ms, @DueAfterMilliseconds, @DueAfter);
SET @DueAfter = DATEADD(s, @DueAfterSeconds, @DueAfter);
SET @DueAfter = DATEADD(n, @DueAfterMinutes, @DueAfter);
SET @DueAfter = DATEADD(hh, @DueAfterHours, @DueAfter);
SET @DueAfter = DATEADD(d, @DueAfterDays, @DueAfter);

INSERT INTO {0} (
    Headers,
    Body,
    Due)
VALUES (
    @Headers,
    @Body,
    @DueAfter);

IF(@NOCOUNT = 'ON') SET NOCOUNT ON;
IF(@NOCOUNT = 'OFF') SET NOCOUNT OFF;