SQL Server Transport and SQL Persistence

Component: NServiceBus
NuGet Package NServiceBus (6.x)

In this sample, the SQL Server Transport is used in conjunction with SQL Persistence. The sample shows how to use the same database connection for both transport and persistence operations, and how to access (using multiple ORMs) the current SQL connection and transaction from within a message handler to persist business objects to the database.

Because the persistence is able to reuse the connection and transaction managed by SQL Server transport the endpoints can run in the TransportTransactionMode.SendsAtomicWithReceive while ensuring exactly once message processing guarantees, as long as persistence session APIs are used to access connection and transaction.

Prerequisites

An instance of SQL Server Express is installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

The database created by this sample is NsbSamplesSql.

Procedure

  1. Start the Sender and Receiver projects.
  2. In the Sender's console notice the Press <enter> to send a message text when the app is ready.
  3. Hit enter.
  4. On the Receiver console notice that order was submitted.
  5. On the Sender console notice that the order was accepted.
  6. Finally, after a couple of seconds, on the Receiver console notice that the timeout message has been received.
  7. Open SQL Server Management Studio and go to the NsbSamplesSql database. Verify that there is a row in the saga state table (receiver.OrderLifecycleSaga) and in the orders table (receiver.SubmittedOrder)

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code including the messages definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the order message.

Sender and Receiver use different schemas within one database. This creates a separation on logical level (schemas can be secured independently) while retaining the benefits of having a single physical database. Each schema contains, apart from business data, queues for the NServiceBus endpoint and tables for NServiceBus persistence.

Sender project

The Sender mimics the front-end system where orders are submitted by the users and passed as messages to the back-end. It is configured to use the SQL Server Transport and the In Memory Persistence. In Memory Persistence is used since for the purposes of this sample Sender does not need to persist and data. The transport is configured to use sender for the schema.

var connection = @"Data Source=.\SqlExpress;Database=NsbSamplesSql;Integrated Security=True;Max Pool Size=100";
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.DefaultSchema("sender");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");

endpointConfiguration.UsePersistence<InMemoryPersistence>();

The connection strings for both persistence and transport are the same.

Receiver project

The Receiver mimics a back-end system. It is also configured to use the SQL Server Transport and the SQL Persistence. The transport is configured to use receiver for the schema and to send messages addressed to the receiver endpoint to a different schema.

var endpointConfiguration = new EndpointConfiguration("Samples.Sql.Receiver");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
var connection = @"Data Source=.\SqlExpress;Database=NsbSamplesSql;Integrated Security=True;Max Pool Size=100";


var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.DefaultSchema("receiver");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");
transport.UseSchemaForQueue("Samples.Sql.Sender", "sender");

transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

var routing = transport.Routing();
routing.RouteToEndpoint(typeof(OrderAccepted), "Samples.Sql.Sender");
routing.RegisterPublisher(typeof(OrderSubmitted).Assembly, "Samples.Sql.Sender");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
dialect.Schema("receiver");
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connection);
    });
persistence.TablePrefix("");
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

When the message arrives at the Receiver, a TransactionScope is created that encompasses

  • Dequeuing the message.
  • Opening a DBConnection and starting a DBTransaction and supplying both to all handlers and sagas.
  • Persisting saga data of OrderLifecycleSaga.
  • Sending the reply message and the timeout request.
var orderAccepted = new OrderAccepted
{
    OrderId = message.OrderId
};
return context.Reply(orderAccepted);
return RequestTimeout<OrderTimeout>(context, TimeSpan.FromSeconds(5));

With SQL persistence, the OrderLifecycleSaga inherits from SqlSaga.

public class OrderLifecycleSaga :
    SqlSaga<OrderLifecycleSaga.SagaData>,
    IAmStartedByMessages<OrderSubmitted>,
    IHandleTimeouts<OrderTimeout>
{
    static ILog log = LogManager.GetLogger<OrderLifecycleSaga>();

    protected override void ConfigureMapping(IMessagePropertyMapper mapper)
    {
        mapper.ConfigureMapping<OrderSubmitted>(_ => _.OrderId);
    }

    protected override string CorrelationPropertyName => nameof(SagaData.OrderId);

    public Task Handle(OrderSubmitted message, IMessageHandlerContext context)
    {
        Data.OrderId = message.OrderId;
        return RequestTimeout<OrderTimeout>(context, TimeSpan.FromSeconds(5));
    }

    public Task Timeout(OrderTimeout state, IMessageHandlerContext context)
    {
        log.Info("Got timeout");
        return Task.CompletedTask;
    }

    public class SagaData :
        ContainSagaData
    {
        public Guid OrderId { get; set; }
    }
}

Script Output

Receiver contain an attribute in the SqlPersistenceSettings.cs file which instructs the SQL persister to generate table creation scripts for only Microsoft SQL Server.

[assembly: SqlPersistenceSettings(
    MsSqlServerScripts = true)]

The SQL scripts, generated by the SQL persister at build time, will be created in the following directory:

Receiver\bin\Debug\net462\NServiceBus.Persistence.Sql

Accessing the ambient database details

When persisting data to the same database it is recommended to use the same DBConnection and DBTransaction that is used by the underlying persistence and transport. The approach to this differs depending on the approach used to persist the data. Below are listed several approaches to data access including raw ADO.NET and several popular ORM.

The connection and transaction are managed by NServiceBus, so there is no need to explicitly commit a transaction or dispose the connection. Using the database state managed by NServiceBus ensures that database interactions, both in handlers and sagas, execute in the same connection and transaction context.

Raw ADO.NET

Persisting Data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();

var sql = @"insert into receiver.SubmittedOrder
                        (Id, Value)
            values      (@Id, @Value)";
using (var command = new SqlCommand(
    cmdText: sql,
    connection: (SqlConnection) session.Connection,
    transaction: (SqlTransaction) session.Transaction))
{
    var parameters = command.Parameters;
    parameters.AddWithValue("Id", $"Raw-{message.OrderId}");
    parameters.AddWithValue("Value", message.Value);
    await command.ExecuteNonQueryAsync()
        .ConfigureAwait(false);
}

Dapper

Model
public class SubmittedOrder
{
    public string Id { get; set; }
    public int Value { get; set; }
}
Persisting Data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();

var order = new SubmittedOrder
{
    Id = $"Dapper-{message.OrderId}",
    Value = message.Value,
};

var sql = @"insert into receiver.SubmittedOrder
                        (Id, Value)
            values      (@Id, @Value)";
return SqlMapper.ExecuteAsync(
    cnn: session.Connection,
    sql: sql,
    param: order,
    transaction: session.Transaction);

EntityFramework

Model

The schema is defined using an attribute. The table name is redefined as the class name as it is a required parameter.

[Table("SubmittedOrder", Schema = "receiver")]
public class SubmittedOrder
{
    public string Id { get; set; }
    public int Value { get; set; }
}
DBContext

Entity Framework required an implementation of DBContext to persist data.

public class SubmittedOrderDbContext : DbContext
{
    DbConnection connection;

    public DbSet<SubmittedOrder> SubmittedOrder { get; set; }

    public SubmittedOrderDbContext(DbConnection connection)
    {
        this.connection = connection;
    }

    protected override void OnConfiguring(DbContextOptionsBuilder options)
    {
        options.UseSqlServer(connection);
    }
}
Persisting Data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();

var order = new SubmittedOrder
{
    Id = $"EF-{message.OrderId}",
    Value = message.Value,
};

using (var dbContext = new SubmittedOrderDbContext(session.Connection))
{
    dbContext.Database.UseTransaction(session.Transaction);
    await dbContext.SubmittedOrder.AddAsync(order)
        .ConfigureAwait(false);
    await dbContext.SaveChangesAsync()
        .ConfigureAwait(false);
}

ServiceStack OrmLite

Model

The schema is defined using an attribute.

[Schema("receiver")]
public class SubmittedOrder
{
    public string Id { get; set; }
    public int Value { get; set; }
}
Persisting Data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();

var order = new SubmittedOrder
{
    Id = $"OrmLite-{message.OrderId}",
    Value = message.Value,
};
return OrmLiteWriteApiAsync.UpdateAsync(
    dbConn: session.Connection,
    obj: order,
    commandFilter: command =>
    {
        command.Transaction = session.Transaction;
    });

Related Articles


Last modified