Outbox - SQL Transport and NHibernate

Component: SQL Server Transport | Nuget: NServiceBus.SqlServer (Version: 3.x)
Target NServiceBus Version: 6.x

Prerequisites

  1. Make sure SQL Server Express is installed and accessible as .\SQLEXPRESS.
  2. Create database called nservicebus.
  3. The Outbox feature is designed to provide exactly once delivery guarantees without the Distributed Transaction Coordinator (DTC) running. Disable the DTC service to avoid seeing warning messages in the console window. If the DTC service is not disabled, when the sample project is started it will display DtcRunningWarning message in the console window.

Running the project

  1. Start the Sender project (right-click on the project, select the Debug > Start new instance option).
  2. The text Press <enter> to send a message should be displayed in the Sender's console window.
  3. Start the Receiver project (right-click on the project, select the Debug > Start new instance option).
  4. The Sender should display subscription confirmation Subscribe from Receiver on message type OrderSubmitted.
  5. Hit <enter> to send a new message.

Verifying that the sample works correctly

  1. The Receiver displays information that an order was submitted.
  2. The Sender displays information that the order was accepted.
  3. Finally, after a couple of seconds, the Receiver displays confirmation that the timeout message has been received.
  4. Open SQL Server Management Studio and go to the nservicebus database. Verify that there is a row in saga state table (dbo.OrderLifecycleSagaData) and in the orders table (dbo.Orders).
  5. Verify that there are messages in the dbo.audit table and, if any message failed processing, messages in dbo.error table.
The handling code has built-in chaotic behavior. There is a 50% chance that a given message fails processing. This is to demonstrate how recoverability works. Since recoverability is disabled, the message that couldn't be processed is immediately moved to the configured error queue.

The retries are disabled using the following settings:

Edit
endpointConfiguration.Recoverability()
    .Immediate(immediate => immediate.NumberOfRetries(0))
    .Delayed(delayed => delayed.NumberOfRetries(0));

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code including messages definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the OrderSubmitted message, sending OrderAccepted message and randomly generating exceptions.

Sender project

The Sender does not store any data. It mimics the front-end system where orders are submitted by the users and passed via the bus to the back-end. It is configured to use SQL Server transport with NHibernate persistence and Outbox.

Edit
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(@"Data Source=.\SqlExpress;Database=nservicebus;Integrated Security=True;Max Pool Size=100; Min Pool Size=100");

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=nservicebus;Integrated Security=True;Max Pool Size=100; Min Pool Size=100");

endpointConfiguration.EnableOutbox();

Receiver project

The Receiver mimics a back-end system. It is also configured to use SQL Server transport with NHibernate persistence and Outbox.

Edit
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(@"Data Source=.\SqlExpress;Database=nservicebus;Integrated Security=True;Max Pool Size=100; Min Pool Size=100");

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.UseConfiguration(hibernateConfig);

endpointConfiguration.EnableOutbox();

In order for the Outbox to work, the business data has to reuse the same connection string as NServiceBus persistence:

Edit
var hibernateConfig = new Configuration();
hibernateConfig.DataBaseIntegration(x =>
{
    x.ConnectionString = @"Data Source=.\SqlExpress;Database=nservicebus;Integrated Security=True;Max Pool Size=100; Min Pool Size=100";
    x.Dialect<MsSql2012Dialect>();
});
var mapper = new ModelMapper();
mapper.AddMapping<OrderMap>();
hibernateConfig.AddMapping(mapper.CompileMappingForAllExplicitlyAddedEntities());

When the message arrives at the Receiver, it is dequeued using a native SQL Server transaction. Then a TransactionScope is created that encompasses

  • persisting business data:
Edit
var nhibernateSession = context.SynchronizedStorageSession.Session();
var order = new Order
{
    OrderId = message.OrderId,
    Value = message.Value
};
nhibernateSession.Save(order);
  • persisting saga data of OrderLifecycleSaga,
  • storing the reply message and the timeout request in the outbox:
Edit
var orderAccepted = new OrderAccepted
{
    OrderId = message.OrderId,
};
await context.Reply(orderAccepted)
    .ConfigureAwait(false);
Edit
await RequestTimeout(context, TimeSpan.FromSeconds(5), new OrderTimeout())
    .ConfigureAwait(false);

Finally the messages in the Outbox are pushed to their destinations. The timeout message gets stored in NServiceBus timeout store and is sent back to the saga after requested delay of 5 seconds.

How it works

All the data manipulations happen atomically because SQL Server 2008 and later allows multiple (but not overlapping) instances of SqlConnection to enlist in a single TransactionScope without the need to escalate to DTC. The SQL Server manages these transactions like they were just one SqlTransaction.

Related Articles


Last modified