SQL Server Transport - Native

Source
NuGet Package NServiceBus.SqlServer.Native (2.x) | License
This is a community maintained project. License and support are independent of Particular Software.
Target NServiceBus Version: 7.x

SQL Server Transport Native is a shim providing low-level access to the SQL Server Transport with no NServiceBus or SQL Server Transport reference required.

Usage scenarios

  • Error or Audit queue handling: Allows to consume messages from error and audit queues, for example to move them to a long-term archive. NServiceBus expects to have a queue per message type, so NServiceBus endpoints are not suitable for processing error or audit queues. SQL Native allows manipulation or consumption of queues containing multiple types of messages.
  • Corrupted or malformed messages: Allows to process poison messages which can't be deserialized by NServiceBus. In SQL Native message headers and body are treated as a raw string and byte array, so corrupted or malformed messages can be read and manipulated in code to correct any problems.
  • Deployment or decommission: Allows to perform common operational activities, similar to operations scripts. Running installers requires starting a full endpoint. This is not always ideal during the execution of a deployment or decommission. SQL Native allows creating or deleting of queues with no running endpoint, and with significantly less code. This also makes it a better candidate for usage in deployment scripting languages like PowerShell.
  • Bulk operations: SQL Native supports sending and receiving of multiple messages within a single SQLConnection and SQLTransaction.
  • Explicit connection and transaction management: NServiceBus abstracts the SQLConnection and SQLTransaction creation and management. SQL Native allows any consuming code to manage the scope and settings of both the SQLConnection and SQLTransaction.
  • Message pass through: SQL Native reduces the amount of boilerplate code and simplifies development, it provides functionality similar to shown in HTTP Message Pass Through Sample with no custom pipeline or mutators required.

Some notes on the below snippets:

  • All methods that return a Task also accept an optional CancellationToken.
  • While a string SqlConnection is used in all APIs for simplicity, an overload that takes a SqlTransaction also exists for each.

Main Queue

Queue management

Queue management for the native delayed delivery functionality.

See also SQL Server Transport - SQL statements.

Create

The queue can be created using the following:

var queueManager = new QueueManager("endpointTable", sqlConnection);
await queueManager.Create().ConfigureAwait(false);

Delete

The queue can be deleted using the following:

var queueManager = new QueueManager("endpointTable", sqlConnection);
await queueManager.Drop().ConfigureAwait(false);

Sending messages

Sending to the main transport queue.

Single

Sending a single message.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var message = new OutgoingMessage(
    id: Guid.NewGuid(),
    headers: headers,
    bodyBytes: body);
await queueManager.Send(message)
    .ConfigureAwait(false);

Batch

Sending a batch of messages.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var messages = new List<OutgoingMessage>
{
    new OutgoingMessage(
        id: Guid.NewGuid(),
        headers: headers1,
        bodyBytes: body1),
    new OutgoingMessage(
        id: Guid.NewGuid(),
        headers: headers2,
        bodyBytes: body2),
};
await queueManager.Send(messages)
    .ConfigureAwait(false);

Reading messages

"Reading" a message returns the data from the database without deleting it.

Single

Reading a single message.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var message = await queueManager.Read(rowVersion: 10)
    .ConfigureAwait(false);

Console.WriteLine(message.Headers);
using (var reader = new StreamReader(message.Body))
{
    var bodyText = await reader.ReadToEndAsync()
        .ConfigureAwait(false);
    Console.WriteLine(bodyText);
}

Batch

Reading a batch of messages.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var result = await queueManager.Read(
        size: 5,
        startRowVersion: 10,
        action: async message =>
        {
            Console.WriteLine(message.Headers);
            using (var reader = new StreamReader(message.Body))
            {
                var bodyText = await reader.ReadToEndAsync()
                    .ConfigureAwait(false);
                Console.WriteLine(bodyText);
            }
        })
    .ConfigureAwait(false);

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

RowVersion tracking

For many scenarios, it is likely to be necessary to keep track of the last message RowVersion that was read. A lightweight implementation of the functionality is provided by RowVersionTracker. RowVersionTracker stores the current RowVersion in a table containing a single column and row.

var versionTracker = new RowVersionTracker();

// create table
await versionTracker.CreateTable(sqlConnection)
    .ConfigureAwait(false);

// save row version
await versionTracker.Save(sqlConnection, newRowVersion)
    .ConfigureAwait(false);

// get row version
var startingRow = await versionTracker.Get(sqlConnection)
    .ConfigureAwait(false);

Note that this is only one possible implementation of storing the current RowVersion.

Processing loop

For scenarios where continual processing (reading and executing some code with the result) of incoming messages is required, MessageProcessingLoop can be used.

An example use case is monitoring an error queue. Some action should be taken when a message appears in the error queue, but it should remain in that queue in case it needs to be retried.

Note that in the below snippet, the above RowVersionTracker is used for tracking the current RowVersion.

var rowVersionTracker = new RowVersionTracker();

var startingRow = await rowVersionTracker.Get(sqlConnection)
    .ConfigureAwait(false);

async Task Callback(SqlTransaction transaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync()
            .ConfigureAwait(false);
        Console.WriteLine($"Message received in error message:\r\n{bodyText}");
    }
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message processing loop failed", exception);
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(connectionString, cancellation);
}

Task PersistRowVersion(SqlTransaction transaction, long rowVersion, CancellationToken token)
{
    return rowVersionTracker.Save(sqlConnection, rowVersion, token);
}

var processingLoop = new MessageProcessingLoop(
    table: "error",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback,
    startingRow: startingRow,
    persistRowVersion: PersistRowVersion);
processingLoop.Start();

Console.ReadKey();

await processingLoop.Stop()
    .ConfigureAwait(false);

Consuming messages

"Consuming" a message returns the data from the database and also deletes that message.

Single

Consume a single message.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var message = await queueManager.Consume()
    .ConfigureAwait(false);

Console.WriteLine(message.Headers);
using (var reader = new StreamReader(message.Body))
{
    var bodyText = await reader.ReadToEndAsync()
        .ConfigureAwait(false);
    Console.WriteLine(bodyText);
}

Batch

Consuming a batch of messages.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var result = await queueManager.Consume(
        size: 5,
        action: async message =>
        {
            Console.WriteLine(message.Headers);
            using (var reader = new StreamReader(message.Body))
            {
                var bodyText = await reader.ReadToEndAsync()
                    .ConfigureAwait(false);
                Console.WriteLine(bodyText);
            }
        })
    .ConfigureAwait(false);

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

Consuming loop

For scenarios where continual consumption (consuming and executing some code with the result) of incoming messages is required, MessageConsumingLoop can be used.

An example use case is monitoring an audit queue. Some action should be taken when a message appears in the audit queue, and it should be purged from the queue to free up the storage space.

async Task Callback(SqlTransaction transaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync()
            .ConfigureAwait(false);
        Console.WriteLine($"Reply received:\r\n{bodyText}");
    }
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(connectionString, cancellation);
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message consuming loop failed", exception);
}

// start consuming
var consumingLoop = new MessageConsumingLoop(
    table: "endpointTable",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback);
consumingLoop.Start();

// stop consuming
await consumingLoop.Stop()
    .ConfigureAwait(false);

Delayed Queue

Queue management

Queue management for the native delayed delivery functionality.

See also SQL Server Transport - SQL statements.

Create

The queue can be created using the following:

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await queueManager.Create().ConfigureAwait(false);

Delete

The queue can be deleted using the following:

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await queueManager.Drop()
    .ConfigureAwait(false);

Sending messages

Single

Sending a single message.

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var message = new OutgoingDelayedMessage(
    due: DateTime.UtcNow.AddDays(1),
    headers: headers,
    bodyBytes: body);
await queueManager.Send(message)
    .ConfigureAwait(false);

Batch

Sending a batch of messages.

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var messages = new List<OutgoingDelayedMessage>
{
    new OutgoingDelayedMessage(
        due: DateTime.UtcNow.AddDays(1),
        headers: headers1,
        bodyBytes: body1),
    new OutgoingDelayedMessage(
        due: DateTime.UtcNow.AddDays(1),
        headers: headers2,
        bodyBytes: body2),
};
await queueManager.Send(messages)
    .ConfigureAwait(false);

Reading messages

"Reading" a message returns the data from the database without deleting it.

Single

Reading a single message.

var queueManager = new DelayedQueueManager("endpointTable", sqlConnection);
var message = await queueManager.Read(rowVersion: 10)
    .ConfigureAwait(false);

Console.WriteLine(message.Headers);
using (var reader = new StreamReader(message.Body))
{
    var bodyText = await reader.ReadToEndAsync()
        .ConfigureAwait(false);
    Console.WriteLine(bodyText);
}

Batch

Reading a batch of messages.

var queueManager = new DelayedQueueManager("endpointTable", sqlConnection);
var result = await queueManager.Read(
        size: 5,
        startRowVersion: 10,
        action: async message =>
        {
            Console.WriteLine(message.Headers);
            using (var reader = new StreamReader(message.Body))
            {
                var bodyText = await reader.ReadToEndAsync()
                    .ConfigureAwait(false);
                Console.WriteLine(bodyText);
            }
        })
    .ConfigureAwait(false);

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

Consuming messages

"Consuming" a message returns the data from the database and also deletes that message.

Single

Consume a single message.

var queueManager = new DelayedQueueManager("endpointTable", sqlConnection);
var message = await queueManager.Consume()
    .ConfigureAwait(false);

Console.WriteLine(message.Headers);
using (var reader = new StreamReader(message.Body))
{
    var bodyText = await reader.ReadToEndAsync()
        .ConfigureAwait(false);
    Console.WriteLine(bodyText);
}

Batch

Consuming a batch of messages.

var queueManager = new DelayedQueueManager("endpointTable", sqlConnection);
var result = await queueManager.Consume(
        size: 5,
        action: async message =>
        {
            Console.WriteLine(message.Headers);
            using (var reader = new StreamReader(message.Body))
            {
                var bodyText = await reader.ReadToEndAsync()
                    .ConfigureAwait(false);
                Console.WriteLine(bodyText);
            }
        })
    .ConfigureAwait(false);

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

Headers

There is a headers helpers class NServiceBus.Transport.SqlServerNative.Headers.

It contains several header related utilities:

Deduplication

Some scenarios, such as HTTP message pass through, require message deduplication.

Table management

Create

The table can be created using the following:

var queueManager = new DeduplicationManager(sqlConnection, "DeduplicationTable");
await queueManager.Create().ConfigureAwait(false);

Delete

The table can be deleted using the following:

var queueManager = new DeduplicationManager(sqlConnection, "DeduplicationTable");
await queueManager.Drop()
    .ConfigureAwait(false);

Sending messages

Sending to the main transport queue with deduplication.

Single

Sending a single message with deduplication.

var queueManager = new QueueManager("endpointTable", sqlConnection, "DeduplicationTable");
var message = new OutgoingMessage(
    id: Guid.NewGuid(),
    headers: headers,
    bodyBytes: body);
await queueManager.Send(message)
    .ConfigureAwait(false);

Batch

Sending a batch of messages with deduplication.

var queueManager = new QueueManager("endpointTable", sqlConnection, "DeduplicationTable");
var messages = new List<OutgoingMessage>
{
    new OutgoingMessage(
        id: Guid.NewGuid(),
        headers: headers1,
        bodyBytes: body1),
    new OutgoingMessage(
        id: Guid.NewGuid(),
        headers: headers2,
        bodyBytes: body2),
};
await queueManager.Send(messages)
    .ConfigureAwait(false);

Deduplication cleanup

Deduplication records need to live for a period of time after the initial corresponding message has been send. In this way an subsequent message, with the same message id, can be ignored. This necessitates a periodic cleanup process of deduplication records. This is achieved by using DeduplicationCleanerJob:

At application startup, start an instance of DeduplicationCleanerJob.

var cleaner = new DeduplicationCleanerJob(
    table: "Deduplication",
    connectionBuilder: cancellation =>
    {
        return ConnectionHelpers.OpenConnection(connectionString, cancellation);
    },
    criticalError: exception => { },
    expireWindow: TimeSpan.FromHours(1),
    frequencyToRunCleanup: TimeSpan.FromMinutes(10));
cleaner.Start();

Then at application shutdown stop the instance.

await cleaner.Stop().ConfigureAwait(false);

JSON headers

Serialization

Serialize a Dictionary<string, string> to a JSON string.

var headers = new Dictionary<string, string>
{
    {Headers.EnclosedMessageTypes, "SendMessage"}
};
var serialized = Headers.Serialize(headers);

Deserialization

Deserialize a JSON string to a Dictionary<string, string>.

var headers = Headers.DeSerialize(headersString);

Copied header constants

Contains all the string constants copied from NServiceBus.Headers.

Duplicated timestamp functionality

A copy of the timestamp format methods ToWireFormattedString and ToUtcDateTime.

ConnectionHelpers

The APIs of this extension target either a SQLConnection and SQLTransaction. Given that in configuration those values are often expressed as a connection string, ConnectionHelpers supports converting that string to a SQLConnection or SQLTransaction. It provides two methods OpenConnection and BeginTransaction with the effective implementation of those methods being:

public static async Task<SqlConnection> OpenConnection(string connectionString)
{
    var connection = new SqlConnection(connectionString);
    try
    {
        await connection.OpenAsync()
            .ConfigureAwait(false);
        return connection;
    }
    catch
    {
        connection.Dispose();
        throw;
    }
}

public static async Task<SqlTransaction> BeginTransaction(string connectionString)
{
    var connection = await OpenConnection(connectionString)
        .ConfigureAwait(false);
    return connection.BeginTransaction();
}

MARS

All SqlConnections must have Multiple Active Result Sets (MARS) as multiple concurrent async request can be performed.

Samples


Last modified