SQL Server Transport and SQL Persistence

Component: SQL Server Transport | Nuget: NServiceBus.SqlServer (Version: 3.x)
Target NServiceBus Version: 6.x

In this sample, the SQL Server Transport is used in conjunction with SQL Persistence. The sample shows how to use the same database connection for both transport and persistence operations, and how to access the SQL connection from within a message handler to persist business objects to the database.

Prerequisites

  1. Make sure an instance of SQL Server Express is installed and accessible as .\SQLEXPRESS.
  2. Create a database named shared and add two schemas to it: sender and receiver (schemas are stored under the Security directory in the SQL Server Management Studio database tree).
  3. Execute the Receiver.Orders.sql script, found in the solution root directory, against the database to create the [receiver].[Orders] table:
Edit
create table receiver.Orders (
    OrderId varchar(5) not null primary key clustered,
    Value decimal(19, 5) null
)

Procedure

  1. Start the Sender and Receiver projects.
  2. In the Sender's console notice the Press <enter> to send a message text when the app is ready.
  3. Hit .
  4. On the Receiver console notice that order was submitted.
  5. On the Sender console notice that the order was accepted.
  6. Finally, after a couple of seconds, on the Receiver console notice that the timeout message has been received.
  7. Open SQL Server Management Studio and go to the shared database. Verify that there is a row in the saga state table (receiver.OrderLifecycleSaga) and in the orders table (receiver.Orders)

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code including the messages definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the order message.

Sender and Receiver use different schemas within one database. This creates a separation on logical level (schemas can be secured independently) while retaining the benefits of having a single physical database. Each schema contains, apart from business data, queues for the NServiceBus endpoint and tables for NServiceBus persistence. If no schema is specified, the transport will default to using the dbo schema.

Sender project

The Sender does not store any data. It mimics the front-end system where orders are submitted by the users and passed as messages to the back-end. It is configured to use the SQL Server transport with SQL persistence. The transport is configured to use a non-standard schema sender and to send messages addressed to the receiver endpoint to a different schema.

Edit
var connectionString = @"Data Source=.\SqlExpress;Database=shared;Integrated Security=True";
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connectionString);
transport.DefaultSchema("sender");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlVariant(SqlVariant.MsSqlServer);
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connectionString);
    });
persistence.Schema("sender");
persistence.TablePrefix("");
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

The connection strings for both persistence and transport are the same.

Receiver project

The Receiver mimics a back-end system. It is also configured to use the SQL Server transport with SQL persistence, but with a different schema.

Edit
var endpointConfiguration = new EndpointConfiguration("Samples.SqlTransportSqlPersistence.Receiver");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
var connectionString = @"Data Source=.\SqlExpress;Database=shared;Integrated Security=True";
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connectionString);
transport.DefaultSchema("receiver");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");
transport.UseSchemaForQueue("Samples.SqlTransportSqlPersistence.Sender", "sender");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlVariant(SqlVariant.MsSqlServer);
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connectionString);
    });
persistence.Schema("receiver");
persistence.TablePrefix("");
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

When the message arrives at the Receiver, a TransactionScope is created that encompasses

  • Dequeuing the message
  • Accessing the shared connection and persisting business data using Dapper
  • Persisting saga data of OrderLifecycleSaga
  • Sending the reply message and the timeout request
Edit
var orderAccepted = new OrderAccepted
{
    OrderId = message.OrderId,
};
return context.Reply(orderAccepted);
Edit
return RequestTimeout<OrderTimeout>(context, TimeSpan.FromSeconds(5));

The shared session is managed by NServiceBus, so there is no need to explicitly commit a transaction or dispose the connection.

Edit
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
var connection = session.Connection as SqlConnection;

const string sqlCommand = "INSERT INTO [receiver].[Orders] (OrderId, Value) VALUES (@OrderId, @Value)";

using (var dbCommand = new SqlCommand(sqlCommand, connection))
{
    dbCommand.Parameters.AddWithValue("OrderId", message.OrderId);
    dbCommand.Parameters.AddWithValue("Value", message.Value);
    dbCommand.ExecuteNonQuery();
}

With SQL persistence, the OrderLifecycleSaga inherits from SqlSaga.

Edit
public class OrderLifecycleSaga :
    SqlSaga<OrderLifecycleSaga.SagaData>,
    IAmStartedByMessages<OrderSubmitted>,
    IHandleTimeouts<OrderTimeout>
{
    static ILog log = LogManager.GetLogger<OrderLifecycleSaga>();

    protected override void ConfigureMapping(IMessagePropertyMapper mapper)
    {
        mapper.ConfigureMapping<OrderSubmitted>(_ => _.OrderId);
    }

    protected override string CorrelationPropertyName => nameof(SagaData.OrderId);

    public Task Handle(OrderSubmitted message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        return RequestTimeout<OrderTimeout>(context, TimeSpan.FromSeconds(5));
    }

    public Task Timeout(OrderTimeout state, IMessageHandlerContext context)
    {
        log.Info("Got timeout");
        return Task.CompletedTask;
    }

    public class SagaData :
        ContainSagaData
    {
        public string OrderId { get; set; }
    }
}

Script Output

Both Sender and Receiver projects contain an attribute in the AssemblyInfo.cs file which instructs the SQL persister to generate table creation scripts for only Microsoft SQL Server.

Edit
[assembly: SqlPersistenceSettings(
    MsSqlServerScripts= true)]

After opening the Sender or Receiver project directory in Windows File Explorer, the SQL scripts generated by the SQL persister at build time can be viewed in the bin\Debug\NServiceBus.Persistence.Sql directory.

Related Articles


Last modified