SQL Server Transport and NHibernate Persistence

Component: NServiceBus
NuGet Package NServiceBus (6.x)

In this sample, the SQL Server transport is used in conjunction with the NHibernate persister. The sample shows how to use the same database connection for both transport and persistence operations, and how to access (using multiple ORMs) the current SQL connection and transaction from within a message handler to persist business objects to the database.

Because the persistence is able to reuse the connection and transaction managed by SQL Server transport the endpoints can run in the TransportTransactionMode.SendsAtomicWithReceive while ensuring exactly once message processing guarantees, as long as persistence session APIs are used to access connection and transaction.

Prerequisites

An instance of SQL Server Express installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesSqlNHibernate.

Procedure

  1. Start the Sender and Receiver projects.
  2. In the Sender's console, press enter> to send a message when the app is ready.
  3. On the Receiver console notice that order was submitted.
  4. On the Sender console notice that the order was accepted.
  5. After a few seconds, on the Receiver console notice that the timeout message has been received.
  6. Open SQL Server Management Studio and go to the Samples.SqlNHibernate database. Verify that there is a row in the saga state table (receiver.OrderLifecycleSagaData) and in the orders table (receiver.Orders)

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code including the message definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the order message.

Sender and Receiver use different schemas within one database. This creates a logical separation (since schemas can be secured independently) while retaining the benefits of having a single physical database. Apart from business data, each schema contains queues for the NServiceBus endpoint and tables for the NServiceBus persister. If no schema is specified, the transport will default to the dbo schema.

Sender project

The Sender does not store any data. It mimics the front-end system where orders are submitted by customers and passed to the back-end via NServiceBus. It is configured to use the SQL Server transport with NHibernate persistence. The transport is configured to use a non-standard schema sender and to send messages addressed to the receiver endpoint with a different schema.

var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.DefaultSchema("sender");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");
transport.UseNativeDelayedDelivery().DisableTimeoutManagerCompatibility();

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.UseConfiguration(hibernateConfig);

The connection strings for both persistence and transport must be exactly the same.

Receiver project

The Receiver mimics a back-end system. It is also configured to use the SQL Server transport with NHibernate persistence but instead of hard-coding the other endpoint's schema, it uses a convention based on the endpoint's queue name.

var endpointConfiguration = new EndpointConfiguration("Samples.SqlNHibernate.Receiver");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connection);
transport.DefaultSchema("receiver");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");
transport.UseSchemaForQueue("Samples.SqlNHibernate.Sender", "sender");
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
transport.UseNativeDelayedDelivery().DisableTimeoutManagerCompatibility();

var routing = transport.Routing();
routing.RouteToEndpoint(typeof(OrderAccepted), "Samples.SqlNHibernate.Sender");
routing.RegisterPublisher(typeof(OrderSubmitted).Assembly, "Samples.SqlNHibernate.Sender");

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.UseConfiguration(hibernateConfig);
hibernateConfig.SetProperty("default_schema", "receiver");

When the message arrives at the Receiver, a TransactionScope is created that:

  • dequeues the message
  • persists business data using the shared session,
  • persists saga data for OrderLifecycleSaga ,
  • sends the reply message and the timeout request.
var orderAccepted = new OrderAccepted
{
    OrderId = message.OrderId,
};
return context.Reply(orderAccepted);
return RequestTimeout<OrderTimeout>(context, TimeSpan.FromSeconds(5));

The shared session is managed by NServiceBus so there's no need to explicitly begin a transaction or Flush() the session.

var nhibernateSession = context.SynchronizedStorageSession.Session();
var order = new Order
{
    OrderId = message.OrderId,
    Value = message.Value
};
nhibernateSession.Save(order);

One disadvantage of this approach is that it is impossible to use NHibernate's second-level cache feature since it requires that NHibernate manages the transactions and database connections, both of which are disabled when operating in shared connection mode.

Related Articles


Last modified