Getting Started
Architecture
NServiceBus
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

SQL Server Transport Scripting

NuGet Package: NServiceBus.Transport.SqlServer (8.x)
Target Version: NServiceBus 9.x

The following includes SQL scripts, C# code examples, and PowerShell scripts to facilitate deployment and operations against the SQL Server Transport.

Inspecting messages in the queue

The following script returns messages waiting in a given queue:

SELECT TOP (1000) 
    [Id],
    [Expires],
    [Headers],
    [Body],
    cast([Body] as varchar(max)) as [BodyString]
FROM {0} WITH (READPAST)

The BodyString column is a computed value that allows inspecting of the message body when a text-based serializer is used (e.g. Json or XML).

In case a column containing the message body in a human-readable format is enabled, the same result can be achieved using the following script:

SELECT TOP (1000) 
    [Id],
    [Expires],
    [Headers],
    [Body],
    [BodyString]
FROM {0} WITH (READPAST)

Create queues

Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.

The create queue helper methods

In C#

public static class QueueCreationUtils
{
    public static void CreateQueue(SqlConnection connection, string schema, string queueName)
    {
        var sql = $@"
        if not  exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
        begin
            create table [{schema}].[{queueName}](
                [Id] [uniqueidentifier] not null,
                [CorrelationId] [varchar](255),
                [ReplyToAddress] [varchar](255),
                [Recoverable] [bit] not null,
                [Expires] [datetime],
                [Headers] [nvarchar](max) not null,
                [Body] [varbinary](max),
                [RowVersion] [bigint] identity(1,1) not null
            );
            create nonclustered index [Index_RowVersion] on [{schema}].[{queueName}]
            (
                [RowVersion]
            )
            create nonclustered index [Index_Expires] on [{schema}].[{queueName}]
            (
                [Expires]
            )
            include
            (
                [Id],
                [RowVersion]
            )
            where [Expires] is not null
        end";
        using (var command = new SqlCommand(sql, connection))
        {
            command.ExecuteNonQuery();
        }
    }

    public static void CreateDelayedQueue(SqlConnection connection, string schema, string queueName)
    {
        var sql = $@"
        if not  exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
        begin
            create table [{schema}].[{queueName}](
                [Headers] nvarchar(max) not null,
                [Body] varbinary(max),
                [Due] datetime not null,
                [RowVersion] bigint identity(1,1) not null
            );

            create nonclustered index [Index_Due] on [{schema}].[{queueName}]
            (
                [Due]
            )
        end";
        using (var command = new SqlCommand(sql, connection))
        {
            command.ExecuteNonQuery();
        }
    }

}

In PowerShell

function CreateQueue {
    param (
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [System.Data.SqlClient.SqlConnection] $connection,

        [ValidateNotNullOrEmpty()]
        [string] $schema = "dbo",

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $queueName
    )

    $sql = @"
    if not  exists (select * from sys.objects where object_id = object_id(N'[{0}].[{1}]') and type in (N'U'))
        begin
        create table [{0}].[{1}](
            [Id] [uniqueidentifier] not null,
            [CorrelationId] [varchar](255),
            [ReplyToAddress] [varchar](255),
            [Recoverable] [bit] not null,
            [Expires] [datetime],
            [Headers] [nvarchar](max) not null,
            [Body] [varbinary](max),
            [RowVersion] [bigint] identity(1,1) not null
        );
        create clustered index [Index_RowVersion] on [{0}].[{1}]
        (
            [RowVersion]
        )
        create nonclustered index [Index_Expires] on [{0}].[{1}]
        (
            [Expires]
        )
        include
        (
            [Id],
            [RowVersion]
        )
        where
            [Expires] is not null
    end
"@ -f $schema, $queueName

    $command = New-Object System.Data.SqlClient.SqlCommand($sql, $connection)
    $command.ExecuteNonQuery()
    $command.Dispose()
}

function CreateDelayedQueue {
    param (
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [System.Data.SqlClient.SqlConnection] $connection,

        [ValidateNotNullOrEmpty()]
        [string] $schema = "dbo",

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $queueName
    )

    $sql = @"
    if not  exists (select * from sys.objects where object_id = object_id(N'[{0}].[{1}]') and type in (N'U'))
        begin
        create table [{0}].[{1}](
            [Headers] nvarchar(max) not null,
            [Body] varbinary(max),
            [Due] datetime not null,
            [RowVersion] bigint identity(1,1) not null
        );

        create nonclustered index [Index_Due] on [{0}].[{1}]
        (
            [Due]
        )
    end
"@ -f $schema, $queueName

    $command = New-Object System.Data.SqlClient.SqlCommand($sql, $connection)
    $command.ExecuteNonQuery()
    $command.Dispose()
}

Creating queues for an endpoint

To create all queues for a given endpoint name.

In C#

public static void CreateQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
    // main queue
    QueueCreationUtils.CreateQueue(connection, schema, endpointName);

    // callback queue
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.{Environment.MachineName}");

    // delayed messages queue
    // Only required in Version 3.1 and above when native delayed delivery is enabled
    QueueCreationUtils.CreateDelayedQueue(connection, schema, $"{endpointName}.Delayed");

    // timeout queue
    // only required in Versions 3.0 and below or when native delayed delivery is disabled or timeout manager compatibility mode is enabled
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Timeouts");

    // timeout dispatcher queue
    // only required in Versions 3.0 and below or when native delayed delivery is disabled or timeout manager compatibility mode is enabled
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher");

    // retries queue
    // TODO: Only required in Versions 2 and below
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Retries");            
}

In PowerShell

Function CreateQueuesForEndpoint
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [string] $connection,

        [ValidateNotNullOrEmpty()]
        [string] $schema = "dbo",

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $endpointName,

        [Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
        [Switch] $includeRetries,

        [Parameter(HelpMessage="Only required for SQL Server Version 3.1 and above if native delayed delivery is enabled")]
        [Switch] $includeDelayed
    )

    $sqlConnection = New-Object System.Data.SqlClient.SqlConnection($connection)
    $sqlConnection.Open()

    try {
        # main queue
        CreateQueue -connection $sqlConnection -schema $schema -queuename $endpointName

        # timeout queue
        CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.timeouts"

        # timeout dispatcher queue
        CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.timeoutsdispatcher"

        # retries queue
        if ($includeRetries) {
            CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.retries"
        }

        # retries queue
        if ($includeDelayed) {
            CreateDelayedQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.delayed"
        }
    }
    finally {
        $sqlConnection.Close()
        $sqlConnection.Dispose()
    }
}

Using the create endpoint queues

In C#

using (var connection = new SqlConnection(connectionString))
{
    connection.Open();
    CreateQueuesForEndpoint(
            connection: connection,
            schema: "dbo",
            endpointName: "myendpoint");
}

In PowerShell

# For NServiceBus 6 Endpoints
CreateQueuesForEndpoint -endpointName "myendpoint" -connection "TheConnectionString"

# For NServiceBus 5 and below Endpoints
CreateQueuesForEndpoint -endpointName "myendpoint" -connection "TheConnectionString" -IncludeRetries

Add message body string column

The following snippet adds a computed column containing the message body in a human-readable format.

In C#

public static void AddMessageBodyStringColumn(SqlConnection connection, string schema, string queueName)
{
    var sql = $@"if not exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U')) return
                 if NOT exists (select * from sys.columns where object_id = object_id(N'[{schema}].[{queueName}]') and name = 'BodyString')

                 alter table [{schema}].[{queueName}]
                 add BodyString as cast(Body as nvarchar(max));";

    using (var command = new SqlCommand(sql, connection))
    {
        command.ExecuteNonQuery();
    }
}

To create shared queues

In C#

using (var sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    QueueCreationUtils.CreateQueue(
        connection: sqlConnection,
        schema: "dbo",
        queueName: "error");

    QueueCreationUtils.CreateQueue(
        connection: sqlConnection,
        schema: "dbo",
        queueName: "audit");
}

In PowerShell

$sqlConnection = New-Object System.Data.SqlClient.SqlConnection("TheConnectionString")
$sqlConnection.Open()

try {
    CreateQueue -connection $connection -schema $schema -queuename "error"
}
finally {
    $sqlConnection.Close()
    $sqlConnection.Dispose()
}

Delete queues

The delete helper queue methods

public static class QueueDeletionUtils
{
    public static void DeleteQueue(SqlConnection connection, string schema, string queueName)
    {
        var deleteScript = $@"
            if exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
            drop table [{schema}].[{queueName}]";
        using (var command = new SqlCommand(deleteScript, connection))
        {
            command.ExecuteNonQuery();
        }
    }
}

To delete all queues for a given endpoint

public static void DeleteQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
    // main queue
    QueueDeletionUtils.DeleteQueue(connection, schema, endpointName);

    // callback queue
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.{Environment.MachineName}");

    // timeout queue
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Timeouts");

    // timeout dispatcher queue
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher");

    // retries queue
    // TODO: Only required in Versions 2 and below
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Retries");
}
using (var sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    DeleteQueuesForEndpoint(
        connection: sqlConnection,
        schema: "dbo",
        endpointName: "myendpoint");
}

To delete shared queues

using (var connection = new SqlConnection(connectionString))
{
    connection.Open();
    QueueDeletionUtils.DeleteQueue(
        connection: connection,
        schema: "dbo",
        queueName: "audit");
    QueueDeletionUtils.DeleteQueue(
        connection: connection,
        schema: "dbo",
        queueName: "error");
}

Return message to source queue

The retry helper methods

A retry involves the following actions:

  • Read a message from the error queue table.
  • Forward that message to another queue table to be retried.
public static void ReturnMessageToSourceQueue(
    string errorQueueConnection,
    string errorQueueName,
    string retryConnectionString,
    string retryQueueName,
    Guid messageId)
{
    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        var messageToRetry = ReadAndDelete(errorQueueConnection, errorQueueName, messageId);
        RetryMessage(retryConnectionString, retryQueueName, messageToRetry);
        scope.Complete();
    }
}

class MessageToRetry
{
    public Guid Id;
    public string Headers;
    public byte[] Body;
}

static void RetryMessage(string connectionString, string queueName, MessageToRetry messageToRetry)
{
    var sql = $@"
        insert into [{queueName}] (
            Id,
            Recoverable,
            Headers,
            Body)
        values (
            @Id,
            @Recoverable,
            @Headers,
            @Body)";
    using (var connection = new SqlConnection(connectionString))
    {
        connection.Open();
        using (var command = new SqlCommand(sql, connection))
        {
            var parameters = command.Parameters;
            parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = messageToRetry.Id;
            parameters.Add("Headers", SqlDbType.NVarChar).Value = messageToRetry.Headers;
            parameters.Add("Body", SqlDbType.VarBinary).Value = messageToRetry.Body;
            parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
            command.ExecuteNonQuery();
        }
    }
}

static MessageToRetry ReadAndDelete(string connectionString, string queueName, Guid messageId)
{
    var sql = $@"
    delete from [{queueName}]
    output
        deleted.Headers,
        deleted.Body
    where Id = @Id";
    using (var connection = new SqlConnection(connectionString))
    {
        connection.Open();
        using (var command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("Id", messageId);
            using (var reader = command.ExecuteReader(CommandBehavior.SingleRow))
            {
                if (reader.Read())
                {
                    return new MessageToRetry
                    {
                        Id = messageId,
                        Headers = reader.GetString(0),
                        Body = reader.GetSqlBinary(1).Value
                    };
                }
                var message = $"Could not find error entry with messageId '{messageId}'";
                throw new Exception(message);
            }
        }
    }
}

Using the retry helper methods

ReturnMessageToSourceQueue(
    errorQueueConnection: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
    errorQueueName: "errors",
    retryConnectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
    retryQueueName: "target",
    messageId: Guid.Parse("1667B60E-2948-4EF0-8BB1-8C851A9407D2")
);

Archiving SqlTransport audit log to long-term storage

There are several ways to achieve archiving of the audit log, including using techniques like Table Partitioning and Snapshot Replication. In this example, BCP utility will be used.

Create helper "archive" table

Create an audit_archive table with this SQL script.

create table [dbo].[audit_archive](
    [Id] [uniqueidentifier] not null,
    [CorrelationId] [varchar](255),
    [ReplyToAddress] [varchar](255),
    [Recoverable] [bit] not null,
    [Expires] [datetime],
    [Headers] [nvarchar](max) not null,
    [Body] [varbinary](max),
    [RowVersion] [bigint] not null
)

Move records to archive table

This script moves the contents of the audit table into audit_archive table.

delete from [dbo].[audit]
output [deleted].*
into [dbo].[audit_archive]

This can be run with a scheduled job to clear the archive regularly.

Execute BCP

Once that query completes, the records can be archived to disk. In a command prompt, use the BCP to create an archive on disk.

bcp samples.dbo.audit_archive out archive.csv -c -q -T -S .\SQLExpress

Truncate the archive table

The audit records will still have to clear using the following script:

truncate table [dbo].[audit_archive];

Related Articles