Scripting

Component: SQL Server Transport
NuGet Package NServiceBus.SqlServer (1.x)
Target NServiceBus Version: 4.x
Standard support for version 4.x of NServiceBus has expired. For more information see our Support Policy.

The following are example code and scripts to facilitate deployment and operations against the SQL Server Transport.

Native Send

The native send helper methods

A send involves the following actions:

  • Create and serialize headers.
  • Write a message body directly to SQL Server Transport.

In C#

public static void SendMessage(string connectionString, string queue, string messageBody, Dictionary<string, string> headers)
{
    var insertSql = $@"
    insert into [{queue}] (
        Id,
        Recoverable,
        Headers,
        Body)
    values (
        @Id,
        @Recoverable,
        @Headers,
        @Body)";
    var serializedHeaders = Newtonsoft.Json.JsonConvert.SerializeObject(headers);
    var bytes = Encoding.UTF8.GetBytes(messageBody);
    using (var connection = new SqlConnection(connectionString))
    {
        connection.Open();
        using (var command = new SqlCommand(insertSql, connection))
        {
            var parameters = command.Parameters;
            parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = Guid.NewGuid();
            parameters.Add("Headers", SqlDbType.VarChar).Value = serializedHeaders;
            parameters.Add("Body", SqlDbType.VarBinary).Value = bytes;
            parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
            command.ExecuteNonQuery();
        }
    }
}

In this example, the value MyNamespace.MyMessage represents the .NET type of the message. See the headers documentation for more information on the EnclosedMessageTypes header.

In PowerShell

Set-StrictMode -Version 2.0

Add-Type -AssemblyName System.Data

Function SendMessage
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $ConnectionString,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $Queue,

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $MessageBody,

        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [HashTable] $Headers
    )

    $UTF8 = [System.Text.Encoding]::UTF8


    $sqlConnection = New-Object System.Data.SqlClient.SqlConnection($ConnectionString)
    $sqlConnection.Open()

    $sqlCommand = $sqlConnection.CreateCommand()
    $sqlCommand.CommandText =
        "insert into [$Queue] (Id, Recoverable, Headers, Body) values (@Id, @Recoverable, @Headers, @Body)"
    $parameters = $sqlCommand.Parameters
    $parameters.Add("Id", [System.Data.SqlDbType]::UniqueIdentifier).Value = [System.Guid]::NewGuid()
    $serializedHeaders = ConvertTo-Json $Headers
    $parameters.Add("Headers", [System.Data.SqlDbType]::NVarChar).Value = $serializedHeaders
    $parameters.Add("Body", [System.Data.SqlDbType]::VarBinary).Value = $UTF8.GetBytes($MessageBody)
    $parameters.Add("Recoverable", [System.Data.SqlDbType]::Bit).Value = 1
    $sqlCommand.ExecuteNonQuery()

    $sqlConnection.Close()
}

Using the native send helper methods

SendMessage(
    connectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
    queue: "Samples.SqlServer.NativeIntegration",
    messageBody: "{Property:'PropertyValue'}",
    headers: new Dictionary<string, string>
    {
        {"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"}
    }
);

Create queues

Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.

The create queue helper methods

In C#

public static class QueueCreationUtils
{
    public static void CreateQueue(SqlConnection connection, string schema, string queueName)
    {
        var sql = $@"
        if not  exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
        begin
            create table [{schema}].[{queueName}](
                [Id] [uniqueidentifier] not null,
                [CorrelationId] [varchar](255),
                [ReplyToAddress] [varchar](255),
                [Recoverable] [bit] not null,
                [Expires] [datetime],
                [Headers] [nvarchar](max) not null,
                [Body] [varbinary](max),
                [RowVersion] [bigint] identity(1,1) not null
            );
            create clustered index [Index_RowVersion] on [{schema}].[{queueName}]
            (
                [RowVersion]
            )
            create nonclustered index [Index_Expires] on [{schema}].[{queueName}]
            (
                [Expires]
            )
            include
            (
                [Id],
                [RowVersion]
            )
            where [Expires] is not null
        end";
        using (var command = new SqlCommand(sql, connection))
        {
            command.ExecuteNonQuery();
        }
    }

    public static void CreateDelayedQueue(SqlConnection connection, string schema, string queueName)
    {
        var sql = $@"
        if not  exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
        begin
            create table [{schema}].[{queueName}](
                [Headers] nvarchar(max) not null,
                [Body] varbinary(max),
                [Due] datetime not null,
                [RowVersion] bigint identity(1,1) not null
            );

            create nonclustered index [Index_Due] on [{schema}].[{queueName}]
            (
                [Due]
            )
        end";
        using (var command = new SqlCommand(sql, connection))
        {
            command.ExecuteNonQuery();
        }
    }

}

In PowerShell

function CreateQueue {
    param (
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [System.Data.SqlClient.SqlConnection] $connection,

        [ValidateNotNullOrEmpty()]
        [string] $schema = "dbo",

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $queueName
    )

    $sql = @"
    if not  exists (select * from sys.objects where object_id = object_id(N'[{0}].[{1}]') and type in (N'U'))
        begin
        create table [{0}].[{1}](
            [Id] [uniqueidentifier] not null,
            [CorrelationId] [varchar](255),
            [ReplyToAddress] [varchar](255),
            [Recoverable] [bit] not null,
            [Expires] [datetime],
            [Headers] [nvarchar](max) not null,
            [Body] [varbinary](max),
            [RowVersion] [bigint] identity(1,1) not null
        );
        create clustered index [Index_RowVersion] on [{0}].[{1}]
        (
            [RowVersion]
        )
        create nonclustered index [Index_Expires] on [{0}].[{1}]
        (
            [Expires]
        )
        include
        (
            [Id],
            [RowVersion]
        )
        where
            [Expires] is not null
    end
"@ -f $schema, $queueName

    $command = New-Object System.Data.SqlClient.SqlCommand($sql, $connection)
    $command.ExecuteNonQuery()
    $command.Dispose()
}

function CreateDelayedQueue {
    param (
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [System.Data.SqlClient.SqlConnection] $connection,

        [ValidateNotNullOrEmpty()]
        [string] $schema = "dbo",

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $queueName
    )

    $sql = @"
    if not  exists (select * from sys.objects where object_id = object_id(N'[{0}].[{1}]') and type in (N'U'))
        begin
        create table [{0}].[{1}](
            [Headers] nvarchar(max) not null,
            [Body] varbinary(max),
            [Due] datetime not null,
            [RowVersion] bigint identity(1,1) not null
        );

        create nonclustered index [Index_Due] on [{0}].[{1}]
        (
            [Due]
        )
    end
"@ -f $schema, $queueName

    $command = New-Object System.Data.SqlClient.SqlCommand($sql, $connection)
    $command.ExecuteNonQuery()
    $command.Dispose()
}

Creating queues for an endpoint

To create all queues for a given endpoint name.

In C#

public static void CreateQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
    // main queue
    QueueCreationUtils.CreateQueue(connection, schema, endpointName);

    // callback queue
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.{Environment.MachineName}");

    // timeout queue
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Timeouts");

    // timeout dispatcher queue
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher");

    // retries queue
    // TODO: Only required in Versions 2 and below
    QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Retries");

    // delayed messages queue
    // TODO: Only required in Version 3.1 and above when native delayed delivery is enabled
    QueueCreationUtils.CreateDelayedQueue(connection, schema, $"{endpointName}.Delayed");
}

In PowerShell

Function CreateQueuesForEndpoint
{
    param(
        [Parameter(Mandatory=$true)]
        [ValidateNotNull()]
        [string] $connection,

        [ValidateNotNullOrEmpty()]
        [string] $schema = "dbo",

        [Parameter(Mandatory=$true)]
        [ValidateNotNullOrEmpty()]
        [string] $endpointName,

        [Parameter(HelpMessage="Only required for NSB Versions 5 and below")]
        [Switch] $includeRetries,

        [Parameter(HelpMessage="Only required for SQL Server Version 3.1 and above if native delayed delivery is enabled")]
        [Switch] $includeDelayed
    )

    $sqlConnection = New-Object System.Data.SqlClient.SqlConnection($connection)
    $sqlConnection.Open()

    try {
        # main queue
        CreateQueue -connection $sqlConnection -schema $schema -queuename $endpointName

        # timeout queue
        CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.timeouts"

        # timeout dispatcher queue
        CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.timeoutsdispatcher"

        # retries queue
        if ($includeRetries) {
            CreateQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.retries"
        }

        # retries queue
        if ($includeDelayed) {
            CreateDelayedQueue -connection $sqlConnection -schema $schema -queuename "$endpointName.delayed"
        }
    }
    finally {
        $sqlConnection.Close()
        $sqlConnection.Dispose()
    }
}

Using the create create endpoint queues

In C#

using (var connection = new SqlConnection(connectionString))
{
    connection.Open();
    CreateQueuesForEndpoint(
            connection: connection,
            schema: "dbo",
            endpointName: "myendpoint");
}

In PowerShell

# For NServiceBus 6 Endpoints
CreateQueuesForEndpoint -endpointName "myendpoint" -connection "TheConnectionString"

# For NServiceBus 5 and below Endpoints
CreateQueuesForEndpoint -endpointName "myendpoint" -connection "TheConnectionString" -IncludeRetries

To create shared queues

In C#

using (var sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    QueueCreationUtils.CreateQueue(
        connection: sqlConnection,
        schema: "dbo",
        queueName: "error");

    QueueCreationUtils.CreateQueue(
        connection: sqlConnection,
        schema: "dbo",
        queueName: "audit");
}

In PowerShell

$sqlConnection = New-Object System.Data.SqlClient.SqlConnection("TheConnectionString")
$sqlConnection.Open()

try {
    CreateQueue -connection $connection -schema $schema -queuename "error"
}
finally {
    $sqlConnection.Close()
    $sqlConnection.Dispose()
}

Delete queues

The delete helper queue methods

public static class QueueDeletionUtils
{
    public static void DeleteQueue(SqlConnection connection, string schema, string queueName)
    {
        var deleteScript = $@"
            if exists (select * from sys.objects where object_id = object_id(N'[{schema}].[{queueName}]') and type in (N'U'))
            drop table [{schema}].[{queueName}]";
        using (var command = new SqlCommand(deleteScript, connection))
        {
            command.ExecuteNonQuery();
        }
    }
}

To delete all queues for a given endpoint

public static void DeleteQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
    // main queue
    QueueDeletionUtils.DeleteQueue(connection, schema, endpointName);

    // callback queue
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.{Environment.MachineName}");

    // timeout queue
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Timeouts");

    // timeout dispatcher queue
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher");

    // retries queue
    // TODO: Only required in Versions 2 and below
    QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Retries");
}
using (var sqlConnection = new SqlConnection(connectionString))
{
    sqlConnection.Open();
    DeleteQueuesForEndpoint(
        connection: sqlConnection,
        schema: "dbo",
        endpointName: "myendpoint");
}

To delete shared queues

using (var connection = new SqlConnection(connectionString))
{
    connection.Open();
    QueueDeletionUtils.DeleteQueue(
        connection: connection,
        schema: "dbo",
        queueName: "audit");
    QueueDeletionUtils.DeleteQueue(
        connection: connection,
        schema: "dbo",
        queueName: "error");
}

Return message to source queue

The retry helper methods

A retry involves the following actions:

  • Read a message from the error queue table.
  • Forward that message to another queue table to be retried.
Since the connection information for the endpoint that failed is not contained in the error queue table that information is explicitly passed in.
public static void ReturnMessageToSourceQueue(
    string errorQueueConnection,
    string errorQueueName,
    string retryConnectionString,
    string retryQueueName,
    Guid messageId)
{
    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        var messageToRetry = ReadAndDelete(errorQueueConnection, errorQueueName, messageId);
        RetryMessage(retryConnectionString, retryQueueName, messageToRetry);
        scope.Complete();
    }
}

class MessageToRetry
{
    public Guid Id;
    public string Headers;
    public byte[] Body;
}

static void RetryMessage(string connectionString, string queueName, MessageToRetry messageToRetry)
{
    var sql = $@"
        insert into [{queueName}] (
            Id,
            Recoverable,
            Headers,
            Body)
        values (
            @Id,
            @Recoverable,
            @Headers,
            @Body)";
    using (var connection = new SqlConnection(connectionString))
    {
        connection.Open();
        using (var command = new SqlCommand(sql, connection))
        {
            var parameters = command.Parameters;
            parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = messageToRetry.Id;
            parameters.Add("Headers", SqlDbType.NVarChar).Value = messageToRetry.Headers;
            parameters.Add("Body", SqlDbType.VarBinary).Value = messageToRetry.Body;
            parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
            command.ExecuteNonQuery();
        }
    }
}

static MessageToRetry ReadAndDelete(string connectionString, string queueName, Guid messageId)
{
    var sql = $@"
    delete from [{queueName}]
    output
        deleted.Headers,
        deleted.Body
    where Id = @Id";
    using (var connection = new SqlConnection(connectionString))
    {
        connection.Open();
        using (var command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("Id", messageId);
            using (var reader = command.ExecuteReader(CommandBehavior.SingleRow))
            {
                if (reader.Read())
                {
                    return new MessageToRetry
                    {
                        Id = messageId,
                        Headers = reader.GetString(0),
                        Body = reader.GetSqlBinary(1).Value
                    };
                }
                var message = $"Could not find error entry with messageId '{messageId}'";
                throw new Exception(message);
            }
        }
    }
}

Using the retry helper methods

ReturnMessageToSourceQueue(
    errorQueueConnection: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
    errorQueueName: "errors",
    retryConnectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
    retryQueueName: "target",
    messageId: Guid.Parse("1667B60E-2948-4EF0-8BB1-8C851A9407D2")
);

Archiving SqlTransport audit log to long term storage

There are several ways to achieve this including using techniques like Table Partitioning and Snapshot Replication. In this example BCP utility will be used.

Create helper "archive" table

Create an audit_archive table with this SQL script.

create table [dbo].[audit_archive](
    [Id] [uniqueidentifier] not null,
    [CorrelationId] [varchar](255),
    [ReplyToAddress] [varchar](255),
    [Recoverable] [bit] not null,
    [Expires] [datetime],
    [Headers] [nvarchar](max) not null,
    [Body] [varbinary](max),
    [RowVersion] [bigint] not null
)

Move records to archive table

This script moves the contents of the audit table into audit_archive table.

delete from [dbo].[audit]
output [deleted].*
into [dbo].[audit_archive]

This can be run with a scheduled job to clear the archive regularly.

Execute BCP

Once that query completes the records can be archived to disk. In a command prompt use the BCP to create an archive on disk.

bcp samples.dbo.audit_archive out archive.csv -c -q -T -S .\SQLExpress

Truncate the archive table

The audit records will still have to clear using the following script.

truncate table [dbo].[audit_archive];

Related Articles


Last modified