Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring

SQL Server Transport and NHibernate Persistence

Component: NServiceBus
NuGet Package: NServiceBus (8.x)

In this sample, the SQL Server transport is used in conjunction with the NHibernate persister. The sample shows how to use the same database connection for both transport and persistence operations, and how to access (using multiple ORMs) the current SQL connection and transaction from within a message handler to persist business objects to the database.

The persister is able to reuse the connection and transaction managed by SQL Server transport, so the endpoints can run in the TransportTransactionMode.SendsAtomicWithReceive while ensuring exactly once message processing guarantees. This is true as long as persistence session APIs are used to access connection and transaction.

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesSqlNHibernate.

Procedure

  1. Start the Sender and Receiver projects.
  2. In the Sender's console, press enter> to send a message when the app is ready.
  3. On the Receiver console, notice that the order was submitted.
  4. On the Sender console, notice that the order was accepted.
  5. After a few seconds, on the Receiver console, notice that the timeout message has been received.
  6. Open SQL Server Management Studio and go to the Samples.SqlNHibernate database. Verify that there is a row in the saga state table (receiver.OrderLifecycleSagaData) and in the orders table (receiver.Orders)

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code, including the message definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the order message.

Sender and Receiver use different schemas within one database. This creates a logical separation (since schemas can be secured independently) while retaining the benefits of having a single physical database. Apart from business data, each schema contains queues for the NServiceBus endpoint and tables for the NServiceBus persister. If no schema is specified, the transport will default to the dbo schema.

Sender project

The Sender does not store any data. It mimics the front-end system where orders are submitted by customers and passed to the back-end via NServiceBus. It is configured to use the SQL Server transport with NHibernate persistence. The transport is configured to use a non-standard schema sender and to send messages addressed to the receiver endpoint with a different schema.

var transport = new SqlServerTransport(connectionString)
{
    DefaultSchema = "sender",
    TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive
};
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
transport.Subscriptions.SubscriptionTableName = new SubscriptionTableName("Subscriptions", "dbo");

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.UseConfiguration(hibernateConfig);

The connection strings for both persistence and transport must be exactly the same.

Receiver project

The Receiver mimics a back-end system. It is also configured to use the SQL Server transport with NHibernate persistence, but instead of hard-coding the other endpoint's schema, it uses a convention based on the endpoint's queue name.

var endpointConfiguration = new EndpointConfiguration("Samples.SqlNHibernate.Receiver");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
var transport = new SqlServerTransport(connectionString)
{
    DefaultSchema = "receiver"
};
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("Samples.SqlNHibernate.Sender", "sender");
transport.Subscriptions.SubscriptionTableName = new SubscriptionTableName("Subscriptions", "dbo");
transport.TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive;

var routing = endpointConfiguration.UseTransport(transport);
routing.RouteToEndpoint(typeof(OrderAccepted), "Samples.SqlNHibernate.Sender");

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.UseConfiguration(hibernateConfig);
hibernateConfig.SetProperty("default_schema", "receiver");

When the message arrives at the Receiver, a TransactionScope is created that:

  • Dequeues the message.
  • Persists business data using the shared session.
  • Persists saga data for OrderLifecycleSaga.
  • Sends the reply message and the timeout request.
var orderAccepted = new OrderAccepted
{
    OrderId = message.OrderId,
};
return context.Reply(orderAccepted);
return RequestTimeout<OrderTimeout>(context, TimeSpan.FromSeconds(5));

The shared session is managed by NServiceBus, so there is no need to explicitly begin a transaction or Flush() the session.

var nhibernateSession = context.SynchronizedStorageSession.Session();
var order = new Order
{
    OrderId = message.OrderId,
    Value = message.Value
};
nhibernateSession.Save(order);

One disadvantage of this approach is that it is impossible to use NHibernate's second-level cache feature since it requires that NHibernate manages the transactions and database connections, both of which are disabled when operating in shared connection mode.

Related Articles


Last modified