Scripting

Component: SQL Server Transport | Nuget: NServiceBus.SqlServer (Version: 2.x)
Target NServiceBus Version: 5.x

The followings are example codes and scripts to facilitate deployment and operations against the SQL Server Transport.

Native Send

The native send helper methods

A send involves the following actions:

  • Create and serialize headers.
  • Write a message body directly to SQL Server Transport.

In C#

Edit
public static async Task SendMessage(string connectionString, string queue, string messageBody, Dictionary<string, string> headers)
{
    var insertSql = $@"INSERT INTO [{queue}] (
        Id,
        Recoverable,
        Headers,
        Body)
    VALUES (
        @Id,
        @Recoverable,
        @Headers,
        @Body)";
    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        using (var connection = new SqlConnection(connectionString))
        {
            await connection.OpenAsync()
                .ConfigureAwait(false);
            using (var command = new SqlCommand(insertSql, connection))
            {
                var parameters = command.Parameters;
                command.CommandType = CommandType.Text;
                parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = Guid.NewGuid();
                var serializeHeaders = Newtonsoft.Json.JsonConvert.SerializeObject(headers);
                parameters.Add("Headers", SqlDbType.VarChar).Value = serializeHeaders;
                parameters.Add("Body", SqlDbType.VarBinary).Value = Encoding.UTF8.GetBytes(messageBody);
                parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
                await command.ExecuteNonQueryAsync()
                    .ConfigureAwait(false);
            }
        }
        scope.Complete();
    }
}

In this example, the value MyNamespace.MyMessage represents the .NET type of the message. See the headers documentation for more information on the EnclosedMessageTypes header.

In PowerShell;

Edit
Set-StrictMode -Version 2.0

Add-Type -AssemblyName System.Data

Function SendMessage
{
	param(
		[Parameter(Mandatory=$true)]
		[ValidateNotNullOrEmpty()]
		[string] $ConnectionString,

		[Parameter(Mandatory=$true)]
		[ValidateNotNullOrEmpty()]
		[string] $Queue,

		[Parameter(Mandatory=$true)]
		[ValidateNotNullOrEmpty()]
		[string] $MessageBody,

		[Parameter(Mandatory=$true)]
		[ValidateNotNull()]
		[HashTable] $Headers
	)

	$UTF8 = [System.Text.Encoding]::UTF8


	$sqlConnection = New-Object System.Data.SqlClient.SqlConnection($ConnectionString)
	$sqlConnection.Open()

    $sqlCommand = $sqlConnection.CreateCommand()
	$sqlCommand.CommandText =
	    "INSERT INTO [$Queue] (Id, Recoverable, Headers, Body) VALUES (@Id, @Recoverable, @Headers, @Body)"
    $parameters = $sqlCommand.Parameters
	$parameters.Add("Id", [System.Data.SqlDbType]::UniqueIdentifier).Value = [System.Guid]::NewGuid()
	$serializedHeaders = ConvertTo-Json $Headers
	$parameters.Add("Headers", [System.Data.SqlDbType]::VarChar).Value = $serializedHeaders
	$parameters.Add("Body", [System.Data.SqlDbType]::VarBinary).Value = $UTF8.GetBytes($MessageBody)
	$parameters.Add("Recoverable", [System.Data.SqlDbType]::Bit).Value = 1
	$sqlCommand.ExecuteNonQuery()

	$sqlConnection.Close()
}

Using the native send helper methods

Edit
await SendMessage(
        connectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
        queue: "Samples.SqlServer.NativeIntegration",
        messageBody: "{\"Property\":\"PropertyValue\"}",
        headers: new Dictionary<string, string>
        {
            {"NServiceBus.EnclosedMessageTypes", "MessageTypeToSend"}
        }
    )
    .ConfigureAwait(false);

Create queues

Queue creation can be done for a specific endpoint or queues shared between multiple endpoints.

The create queue helper methods

Edit
namespace SqlServer_All.Operations.QueueCreation
{
    public static class QueueCreationUtils
    {
        public static async Task CreateQueue(SqlConnection connection, string schema, string queueName)
        {
            var sql = $@"IF NOT  EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[{schema}].[{queueName}]') AND type in (N'U'))
                BEGIN
                CREATE TABLE [{schema}].[{queueName}](
                    [Id] [uniqueidentifier] NOT NULL,
                    [CorrelationId] [varchar](255),
                    [ReplyToAddress] [varchar](255),
                    [Recoverable] [bit] NOT NULL,
                    [Expires] [datetime],
                    [Headers] [varchar](max) NOT NULL,
                    [Body] [varbinary](max),
                    [RowVersion] [bigint] IDENTITY(1,1) NOT NULL
                ) ON [PRIMARY];
                CREATE CLUSTERED INDEX [Index_RowVersion] ON [{schema}].[{queueName}]
                (
                    [RowVersion] ASC
                ) ON [PRIMARY]
        CREATE NONCLUSTERED INDEX [Index_Expires] ON [{schema}].[{queueName}]
            (
            [Expires] ASC
            )
            INCLUDE
            (
            [Id],
            [RowVersion]
        )
                END";
            using (var command = new SqlCommand(sql, connection))
            {
                await command.ExecuteNonQueryAsync()
                    .ConfigureAwait(false);
            }
        }

    }
}

Creating queues for an endpoint

To create all queues for a given endpoint name:

Edit
public static async Task CreateQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
    // main queue
    await QueueCreationUtils.CreateQueue(connection, schema, endpointName)
        .ConfigureAwait(false);

    // callback queue
    await QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.{Environment.MachineName}")
        .ConfigureAwait(false);

    // timeout queue
    await QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Timeouts")
        .ConfigureAwait(false);

    // timeout dispatcher queue
    await QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher")
        .ConfigureAwait(false);

    // retries queue
    // TODO: Only required in Versions 2 and below
    await QueueCreationUtils.CreateQueue(connection, schema, $"{endpointName}.Retries")
        .ConfigureAwait(false);
}
Edit
using (var sqlConnection = new SqlConnection(connectionString))
{
    await sqlConnection.OpenAsync()
        .ConfigureAwait(false);
    await CreateQueuesForEndpoint(
            connection: sqlConnection,
            schema: "dbo",
            endpointName: "myendpoint")
        .ConfigureAwait(false);
}

To create shared queues

Edit
using (var sqlConnection = new SqlConnection(connectionString))
{
    await sqlConnection.OpenAsync()
        .ConfigureAwait(false);
    await QueueCreationUtils.CreateQueue(
            connection: sqlConnection,
            schema: "dbo",
            queueName: "error")
        .ConfigureAwait(false);

    await QueueCreationUtils.CreateQueue(
            connection: sqlConnection,
            schema: "dbo",
            queueName: "audit")
        .ConfigureAwait(false);
}

Delete queues

The delete helper queue methods

Edit
public static class QueueDeletionUtils
{
    public static async Task DeleteQueue(SqlConnection connection, string schema, string queueName)
    {
        var deleteScript = $@"
            IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[{schema}].[{queueName}]') AND type in (N'U'))
            DROP TABLE [{schema}].[{queueName}]";
        using (var command = new SqlCommand(deleteScript, connection))
        {
            await command.ExecuteNonQueryAsync()
                .ConfigureAwait(false);
        }
    }
}

To delete all queues for a given endpoint

Edit
public static async Task DeleteQueuesForEndpoint(SqlConnection connection, string schema, string endpointName)
{
    // main queue
    await QueueDeletionUtils.DeleteQueue(connection, schema, endpointName)
        .ConfigureAwait(false);

    // callback queue
    await QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.{Environment.MachineName}")
        .ConfigureAwait(false);

    // timeout queue
    await QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Timeouts")
        .ConfigureAwait(false);

    // timeout dispatcher queue
    await QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.TimeoutsDispatcher")
        .ConfigureAwait(false);

    // retries queue
    // TODO: Only required in Versions 2 and below
    await QueueDeletionUtils.DeleteQueue(connection, schema, $"{endpointName}.Retries")
        .ConfigureAwait(false);
}
Edit
using (var sqlConnection = new SqlConnection(connectionString))
{
    await sqlConnection.OpenAsync()
        .ConfigureAwait(false);
    await DeleteQueuesForEndpoint(
            connection: sqlConnection,
            schema: "dbo",
            endpointName: "myendpoint")
        .ConfigureAwait(false);
}

To delete shared queues

Edit
using (var sqlConnection = new SqlConnection(connectionString))
{
    await sqlConnection.OpenAsync()
        .ConfigureAwait(false);
    await QueueDeletionUtils.DeleteQueue(
            connection: sqlConnection,
            schema: "dbo",
            queueName: "audit")
        .ConfigureAwait(false);
    await QueueDeletionUtils.DeleteQueue(
            connection: sqlConnection,
            schema: "dbo",
            queueName: "error")
        .ConfigureAwait(false);
}

Return message to source queue

The retry helper methods

A retry involves the following actions:

  • Read a message from the error queue table.
  • Forward that message to another queue table to be retried.
Since the connection information for the endpoint that failed is not contained in the error queue table that information is explicitly passed in.
Edit
public static async Task ReturnMessageToSourceQueue(
    string errorQueueConnectionString,
    string errorQueueName,
    string retryConnectionString,
    string retryQueueName,
    Guid messageId)
{
    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        var messageToRetry = await ReadAndDelete(errorQueueConnectionString, errorQueueName, messageId)
            .ConfigureAwait(false);
        await RetryMessage(retryConnectionString, retryQueueName,  messageToRetry)
            .ConfigureAwait(false);
        scope.Complete();
    }
}

class MessageToRetry
{
    public Guid Id;
    public string Headers;
    public byte[] Body;
}

static async Task RetryMessage(string connectionString, string queueName, MessageToRetry messageToRetry)
{
    var sql = $@"INSERT INTO [{queueName}] (
            Id,
            Recoverable,
            Headers,
            Body)
        VALUES (
            @Id,
            @Recoverable,
            @Headers,
            @Body)";
    using (var connection = new SqlConnection(connectionString))
    {
        await connection.OpenAsync()
            .ConfigureAwait(false);
        using (var command = new SqlCommand(sql, connection))
        {
            var parameters = command.Parameters;
            command.CommandType = CommandType.Text;
            parameters.Add("Id", SqlDbType.UniqueIdentifier).Value = messageToRetry.Id;
            parameters.Add("Headers", SqlDbType.VarChar).Value = messageToRetry.Headers;
            parameters.Add("Body", SqlDbType.VarBinary).Value = messageToRetry.Body;
            parameters.Add("Recoverable", SqlDbType.Bit).Value = true;
            await command.ExecuteNonQueryAsync()
                .ConfigureAwait(false);
        }
    }
}

static async Task<MessageToRetry> ReadAndDelete(string connectionString, string queueName, Guid messageId)
{
    var sql = $@"DELETE FROM [{queueName}]
    OUTPUT
        DELETED.Headers,
        DELETED.Body
    WHERE Id = @Id";
    using (var connection = new SqlConnection(connectionString))
    {
        await connection.OpenAsync()
            .ConfigureAwait(false);
        using (var command = new SqlCommand(sql, connection))
        {
            command.Parameters.AddWithValue("Id", messageId);
            using (var reader = await command.ExecuteReaderAsync(CommandBehavior.SingleRow)
                .ConfigureAwait(false))
            {
                if (!await reader.ReadAsync()
                    .ConfigureAwait(false))
                {
                    var message = $"Could not find error entry with messageId '{messageId}'";
                    throw new Exception(message);
                }
                return new MessageToRetry
                {
                    Id = messageId,
                    Headers = reader.GetString(0),
                    Body = reader.GetSqlBinary(1).Value
                };
            }
        }
    }
}

Using the retry helper methods

Edit
await ReturnMessageToSourceQueue(
        errorQueueConnectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
        errorQueueName: "errors",
        retryConnectionString: @"Data Source=.\SqlExpress;Database=samples;Integrated Security=True",
        retryQueueName: "target",
        messageId: Guid.Parse("1667B60E-2948-4EF0-8BB1-8C851A9407D2")
    )
    .ConfigureAwait(false);

Archiving SqlTransport audit log to long term storage

There are several ways to achieve this including using techniques like Table Partitioning and Snapshot Replication. In this example BCP utility will be used.

Create helper "archive" table

Create an audit_archive table with this SQL script.

CREATE TABLE [dbo].[audit_archive](
	[Id] [uniqueidentifier] NOT NULL,
	[CorrelationId] [varchar](255),
	[ReplyToAddress] [varchar](255),
	[Recoverable] [bit] NOT NULL,
	[Expires] [datetime],
	[Headers] [varchar](max) NOT NULL,
	[Body] [varbinary](max),
	[RowVersion] [bigint] NOT NULL
) 
ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]

Move records to archive table

This script moves the contents of the audit table into audit_archive table.

DELETE FROM [dbo].[audit]
OUTPUT [deleted].*
INTO [dbo].[audit_archive]

This can be run with a scheduled job to clear the archive regularly.

Execute BCP

Once that query completes the records can be archived to disk. In a command prompt use the BCP to create an archive on disk.

bcp samples.dbo.audit_archive out archive.csv -c -q -T -S .\SQLExpress

Truncate the archive table

The audit records will still have to clear using the following script.

TRUNCATE TABLE  [dbo].[audit_archive];

Last modified