SQL Server Transport - Native

Project Hosting
NuGet Package NServiceBus.SqlServer.Native (1-pre) | License
This is a community run project. License and support are independent of Particular Software.
Target NServiceBus Version: 7.x
This page targets a pre-release version and is subject to change prior to the final release.

SQL Server Transport Native is a shim providing low-level access to the SQL Server Transport with no NServiceBus or SQL Server Transport reference required.

Usage scenarios

  • Error or Audit queue handling: Allows to consume messages from error and audit queues, for example to move them to a long-term archive. NServiceBus expects to have a queue per message type, so NServiceBus endpoints are not suitable for processing error or audit queues. SQL Native allows manipulation or consumption of queues containing multiple types of messages.
  • Corrupted or malformed messages: Allows to process poison messages which can't be deserialized by NServiceBus. In SQL Native message headers and body are treated as a raw string and byte array, so corrupted or malformed messages can be read and manipulated in code to correct any problems.
  • Deployment or decommission: Allows to perform common operatorial activities, similar to operations scripts. Running installers requires starting a full endpoint. This is not always ideal during the execution of a deployment or decommission. SQL Native allows creating or deleting of queues with no running endpoint, and with significantly less code. This also makes it a better candidate for usage in deployment scripting languages like PowerShell.
  • Bulk operations: SQL Native supports sending and receiving of multiple messages within a single SQLConnection and SQLTransaction.
  • Explicit connection and transaction management: NServiceBus abstracts the SQLConnection and SQLTransaction creation and management. SQL Native allows any consuming code to manage the scope and settings of both the SQLConnection and SQLTransaction.
  • Message pass through: SQL Native reduces the amount of boilerplate code and simplifies development, it provides functionality similar to shown in HTTP Message Pass Through Sample with no custom pipeline or mutators required.

Some notes on the below snippets:

  • All methods that return a Task also accept an optional CancellationToken.
  • While a string SqlConnection is used in all snippets for simplicity, an overload that takes a SqlTransaction also exists.

Queue management

Main queue

Queue management for the native delayed delivery functionality.

See also SQL Server Transport - SQL statements.

Create

The queue can be created using the following:

var queueManager = new QueueManager("endpointTable", sqlConnection);
await queueManager.Create().ConfigureAwait(false);

Delete

The queue can be deleted using the following:

var queueManager = new QueueManager("endpointTable", sqlConnection);
await queueManager.Drop().ConfigureAwait(false);

Delayed queue

Queue management for the native delayed delivery functionality.

See also SQL Server Transport - SQL statements.

Create

The queue can be created using the following:

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await queueManager.Create().ConfigureAwait(false);

Delete

The queue can be deleted using the following:

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
await queueManager.Drop()
    .ConfigureAwait(false);

Sending a message

A variety of message sending functionality is provided.

Main queue

Sending from the main transport queue.

Single

Sending a single message.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var message = new OutgoingMessage(
    id: Guid.NewGuid(),
    headers: headers,
    bodyBytes: body);
await queueManager.Send(message)
    .ConfigureAwait(false);

Batch

Sending a batch of messages.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var messages = new List<OutgoingMessage>
{
    new OutgoingMessage(
        id: Guid.NewGuid(),
        headers: headers1,
        bodyBytes: body1),
    new OutgoingMessage(
        id: Guid.NewGuid(),
        headers: headers2,
        bodyBytes: body2),
};
await queueManager.Send(messages)
    .ConfigureAwait(false);

Delayed queue

Sending from the native delayed delivery queue.

Single

Sending a single message.

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var message = new OutgoingDelayedMessage(
    due: DateTime.UtcNow.AddDays(1),
    headers: headers,
    bodyBytes: body);
await queueManager.Send(message)
    .ConfigureAwait(false);

Batch

Sending a batch of messages.

var queueManager = new DelayedQueueManager("endpointTable.Delayed", sqlConnection);
var messages = new List<OutgoingDelayedMessage>
{
    new OutgoingDelayedMessage(
        due: DateTime.UtcNow.AddDays(1),
        headers: headers1,
        bodyBytes: body1),
    new OutgoingDelayedMessage(
        due: DateTime.UtcNow.AddDays(1),
        headers: headers2,
        bodyBytes: body2),
};
await queueManager.Send(messages)
    .ConfigureAwait(false);

Reading messages

"Reading" a message returns the data from the database without deleting it.

Single

Reading a single message.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var message = await queueManager.Read(rowVersion: 10)
    .ConfigureAwait(false);

Console.WriteLine(message.Headers);
using (var reader = new StreamReader(message.Body))
{
    var bodyText = await reader.ReadToEndAsync()
        .ConfigureAwait(false);
    Console.WriteLine(bodyText);
}

Batch

Reading a batch of messages.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var result = await queueManager.Read(
        size: 5,
        startRowVersion: 10,
        action: async message =>
        {
            Console.WriteLine(message.Headers);
            using (var reader = new StreamReader(message.Body))
            {
                var bodyText = await reader.ReadToEndAsync()
                    .ConfigureAwait(false);
                Console.WriteLine(bodyText);
            }
        })
    .ConfigureAwait(false);

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

RowVersion tracking

For many scenarios, it is likely to be necessary to keep track of the last message RowVersion that was read. A lightweight implementation of the functionality is provided by RowVersionTracker. RowVersionTracker stored the current RowVersion in a table containing a single column and row.

var versionTracker = new RowVersionTracker();

// create table
await versionTracker.CreateTable(sqlConnection)
    .ConfigureAwait(false);

// save row version
await versionTracker.Save(sqlConnection, newRowVersion)
    .ConfigureAwait(false);

// get row version
var startingRow = await versionTracker.Get(sqlConnection)
    .ConfigureAwait(false);

Note that is is only one possible implementation of storing the current RowVersion.

Processing loop

For scenarios where continual processing (reading and executing some code with the result) of incoming messages is required, MessageProcessingLoop can be used.

An example use case is monitoring an error queue. Some action should be taken when a message appears in the error queue, but it should remain in that queue in case it needs to be retried.

Note that in the below snippet, the above RowVersionTracker is used for tracking the current RowVersion.

var rowVersionTracker = new RowVersionTracker();

var startingRow = await rowVersionTracker.Get(sqlConnection)
    .ConfigureAwait(false);

async Task Callback(SqlTransaction transaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync()
            .ConfigureAwait(false);
        Console.WriteLine($"Message received in error message:\r\n{bodyText}");
    }
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message processing loop failed", exception);
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(connectionString, cancellation);
}

Task PersistRowVersion(SqlTransaction transaction, long rowVersion, CancellationToken token)
{
    return rowVersionTracker.Save(sqlConnection, rowVersion, token);
}

var processingLoop = new MessageProcessingLoop(
    table: "error",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback,
    startingRow: startingRow,
    persistRowVersion: PersistRowVersion);
processingLoop.Start();

Console.ReadKey();

await processingLoop.Stop()
    .ConfigureAwait(false);

Consuming messages

"Consuming" a message returns the data from the database and also deletes that message.

Single

Consume a single message.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var message = await queueManager.Consume()
    .ConfigureAwait(false);

Console.WriteLine(message.Headers);
using (var reader = new StreamReader(message.Body))
{
    var bodyText = await reader.ReadToEndAsync()
        .ConfigureAwait(false);
    Console.WriteLine(bodyText);
}

Batch

Consuming a batch of messages.

var queueManager = new QueueManager("endpointTable", sqlConnection);
var result = await queueManager.Consume(
        size: 5,
        action: async message =>
        {
            Console.WriteLine(message.Headers);
            using (var reader = new StreamReader(message.Body))
            {
                var bodyText = await reader.ReadToEndAsync()
                    .ConfigureAwait(false);
                Console.WriteLine(bodyText);
            }
        })
    .ConfigureAwait(false);

Console.WriteLine(result.Count);
Console.WriteLine(result.LastRowVersion);

Consuming loop

For scenarios where continual consumption (consuming and executing some code with the result) of incoming messages is required, MessageConsumingLoop can be used.

An example use case is monitoring an audit queue. Some action should be taken when a message appears in the audit queue, and it should be purged from the queue so as to free up the storage space.

async Task Callback(SqlTransaction transaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync()
            .ConfigureAwait(false);
        Console.WriteLine($"Reply received:\r\n{bodyText}");
    }
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(connectionString, cancellation);
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message consuming loop failed", exception);
}

// start consuming
var consumingLoop = new MessageConsumingLoop(
    table: "endpointTable",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback);
consumingLoop.Start();

// stop consuming
await consumingLoop.Stop()
    .ConfigureAwait(false);

Headers

There is a headers helpers class NServiceBus.Transport.SqlServerNative.Headers.

It contains several header related utilities:

JSON headers

Serialization

Serialize a Dictionary<string, string> to a JSON string.

var headers = new Dictionary<string, string>
{
    {Headers.EnclosedMessageTypes, "SendMessage"}
};
var serialized = Headers.Serialize(headers);

Deserialization

Deserialize a JSON string to a Dictionary<string, string>.

var headers = Headers.DeSerialize(headersString);

Copied header constants

Contains all the string constants copied from NServiceBus.Headers.

Duplicated timestamp functionality

A copy of the timestamp format methods ToWireFormattedString and ToUtcDateTime.

ConnectionHelpers

The APIs of this extension target either a SQLConnection and SQLTransaction. Given that in configuration those values are often expressed as a connection string, ConnectionHelpers supports converting that string to a SQLConnection or SQLTransaction. It provides two methods OpenConnection and BeginTransaction with the effective implementation of those methods being:

public static async Task<SqlConnection> OpenConnection(string connectionString)
{
    var connection = new SqlConnection(connectionString);
    try
    {
        await connection.OpenAsync()
            .ConfigureAwait(false);
        return connection;
    }
    catch
    {
        connection.Dispose();
        throw;
    }
}

public static async Task<SqlTransaction> BeginTransaction(string connectionString)
{
    var connection = await OpenConnection(connectionString)
        .ConfigureAwait(false);
    return connection.BeginTransaction();
}

Last modified