SqlTransportNative

Component: SQL Server Transport - Native
NuGet Package NServiceBus.SqlServer.Native (2.x)
This is a community maintained project
Target NServiceBus Version: 7.x

This sample demonstrates using SQL Transport - Native to send and manage messages when running on the SqlServer Transport.

Prerequisites

An instance of SQL Server Express is installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

The database created by this sample is NsbSamplesSqlNative.

Running the sample

When the solution is started four endpoints will start up:

  • NsbEndpoint
  • NativeEndpoint
  • AuditConsumer
  • ErrorProcessor

On NativeEndpoint press:

  • s to send a message that will succeed or
  • f to send a message that will fail

The successful message will be received by NsbEndpoint and AuditConsumer. The failure message will be received by NsbEndpoint and ErrorProcessor.

Code walk-through

NsbEndpoint

This is a standard NServiceBus endpoint.

EndpointConfiguration

The endpoint is configured as follows:

  • Use the SQL Server Transport.
  • Use the Newtonsoft JSON serializer. The choice of serializer is important since that format will need to be consistent when sending/receiving in the native context.
  • Forward failed messages to a queue named error.
  • Forward successful messages to a queue named audit.
var endpointConfiguration = new EndpointConfiguration("NsbEndpoint");
endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.SendFailedMessagesTo("error");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
transport.ConnectionString(SqlHelper.ConnectionString);

Handler

The handler receives a message and replies with another.

public class MessageHandler :
    IHandleMessages<SendMessage>
{
    static ILog log = LogManager.GetLogger<SendMessage>();

    public Task Handle(SendMessage message, IMessageHandlerContext context)
    {
        log.Info($"Message received. Property={message.Property}");

        var replyMessage = new ReplyMessage
        {
            Property = "Hello from NsbEndpoint"
        };
        return context.Reply(replyMessage);
    }
}

Message contract

There are two message contracts. One for sending, and one for replying.

public class SendMessage :
    IMessage
{
    public string Property { get; set; }
}

public class ReplyMessage :
    IMessage
{
    public string Property { get; set; }
}

Note that the messages exist only in this endpoint and do not need to be used, via a reference, in any of the other projects.

NativeEndpoint

This project uses the NServiceBus.SqlServer.Native NuGet Package to send and receive messages.

Sending Messages

Messages are sent to NsbEndpoint using NServiceBus.Transport.SqlServerNative.Sender.

static async Task SendMessage(string messageBody)
{
    var headers = new Dictionary<string, string>
    {
        {Headers.EnclosedMessageTypes, "SendMessage"},
        {Headers.ReplyToAddress, "NativeEndpoint"},
    };
    var message = new OutgoingMessage(
        id: Guid.NewGuid(),
        expires: null,
        headers: Headers.Serialize(headers),
        bodyBytes: Encoding.UTF8.GetBytes(messageBody));
    using (var connection = await ConnectionHelpers.OpenConnection(SqlHelper.ConnectionString)
        .ConfigureAwait(false))
    {
        var queueManager = new QueueManager("NsbEndpoint", connection);
        await queueManager.Send(message)
            .ConfigureAwait(false);
    }
}
Message that will succeed

This message will be successfully processed by NsbEndpoint. NsbEndpoint will then send a reply back to NativeEndpoint and a copy of the sent message will be forwarded to the audit queue and hence AuditConsumer.

var message = @"{ Property: 'Hello from NativeEndpoint' }";
Message that will fail

This message will fail to be processed by NsbEndpoint and be sent to the error queue and hence ErrorProcessor.

var message = @"{ invalid json }";

Receiving Messages

NativeEndpoint received the reply message from NsbEndpoint using NServiceBus.Transport.SqlServerNative.MessageProcessingLoop:

async Task Callback(SqlTransaction sqlTransaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync().ConfigureAwait(false);
        Console.WriteLine($"Reply received:\r\n{bodyText}");
    }
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message consuming loop failed", exception);
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(SqlHelper.ConnectionString, cancellation);
}

var messageConsumingLoop = new MessageConsumingLoop(
    table: "NativeEndpoint",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback);
messageConsumingLoop.Start();

AuditConsumer

The audit queue is consumed using NServiceBus.Transport.SqlServerNative.MessageConsumingLoop

async Task Callback(SqlTransaction transaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync().ConfigureAwait(false);
        Console.WriteLine($"Message received in audit queue:\r\n{bodyText}");
    }
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message consuming loop failed", exception);
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(SqlHelper.ConnectionString, cancellation);
}

var consumingLoop = new MessageConsumingLoop(
    table: "audit",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback);
consumingLoop.Start();

Console.ReadKey();

await consumingLoop.Stop()
    .ConfigureAwait(false);

ErrorProcessor

The error queue is processed using NServiceBus.Transport.SqlServerNative.MessageProcessingLoop:

long startingRow;

var rowVersionTracker = new RowVersionTracker();
using (var connection = await ConnectionHelpers.OpenConnection(SqlHelper.ConnectionString)
    .ConfigureAwait(false))
{
    await rowVersionTracker.CreateTable(connection).ConfigureAwait(false);
    startingRow = await rowVersionTracker.Get(connection).ConfigureAwait(false);
}

async Task Callback(SqlTransaction sqlTransaction, IncomingMessage message, CancellationToken cancellation)
{
    using (var reader = new StreamReader(message.Body))
    {
        var bodyText = await reader.ReadToEndAsync().ConfigureAwait(false);
        Console.WriteLine($"Message received in error queue:\r\n{bodyText}");
    }
}

void ErrorCallback(Exception exception)
{
    Environment.FailFast("Message processing loop failed", exception);
}

Task PersistRowVersion(SqlTransaction transaction, long rowVersion, CancellationToken cancellation)
{
    return rowVersionTracker.Save(transaction, rowVersion, cancellation);
}

Task<SqlTransaction> TransactionBuilder(CancellationToken cancellation)
{
    return ConnectionHelpers.BeginTransaction(SqlHelper.ConnectionString, cancellation);
}

var processingLoop = new MessageProcessingLoop(
    table: "error",
    delay: TimeSpan.FromSeconds(1),
    transactionBuilder: TransactionBuilder,
    callback: Callback,
    errorCallback: ErrorCallback,
    startingRow: startingRow,
    persistRowVersion: PersistRowVersion);
processingLoop.Start();

Console.ReadKey();

await processingLoop.Stop()
    .ConfigureAwait(false);

Since in this scenario, messages are not deleted as they are being processed, it is necessary to keep track of the last processed RowVersion. This is done using NServiceBus.Transport.SqlServerNative.RowVersionTracker, which stores the RowVersion in a single row table named RowVersionTracker.

MARS

All SqlConnections must have Multiple Active Result Sets (MARS) as multiple concurrent async request can be performed.

Related Articles


Last modified