Outbox - SQL Transport, SQL Persistence and EF

Component: SQL Server Transport
NuGet NServiceBus.SqlServer (3-pre)
Target NServiceBus Version: 6.x
This page targets a pre-release version and is subject to change prior to the final release.

Prerequisites

  1. Make sure SQL Server Express is installed and accessible as .\SQLEXPRESS.
  2. Create database called nservicebus.
  3. In the database create schemas sender and receiver.
  4. The Outbox feature is designed to provide exactly once delivery guarantees without the Distributed Transaction Coordinator (DTC) running. Disable the DTC service to avoid seeing warning messages in the console window. If the DTC service is not disabled, when the sample project is started it will display DtcRunningWarning message in the console window.

Running the project

  1. Start the Solution
  2. The text Press <enter> to send a message should be displayed in the Sender's console window.
  3. Hit <enter> to send a new message.

Verifying that the sample works correctly

  1. The Receiver displays information that an order was submitted.
  2. The Sender displays information that the order was accepted.
  3. Finally, after a couple of seconds, the Receiver displays confirmation that the timeout message has been received.
  4. Open SQL Server Management Studio and go to the receiver database. Verify that there is a row in saga state table (Samples.SQLOutboxEF.Receiver.OrderLifecycleSaga) and in the orders table (dbo.Orders)

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code including the message definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the order message.

Sender and Receiver use different schemas in the same database. Apart from business data the database also contains tables representing queues for the NServiceBus endpoint and tables for NServiceBus persistence.

Sender project

The Sender does not store any data. It mimics the front-end system where orders are submitted by the users and passed via the bus to the back-end. It is configured to use SQLServer transport with Sql persistence and Outbox.

var connectionString = @"Data Source=.\SqlExpress;Database=nservicebus;Integrated Security=True";

var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connectionString);
transport.DefaultSchema("sender");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connectionString);
    });
persistence.Schema("sender");
persistence.TablePrefix("");

var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();

endpointConfiguration.EnableOutbox();

Receiver project

The Receiver mimics a back-end system. It is also configured to use SQLServer transport with SQL persistence and Outbox. It uses EntityFramework to store business data (orders).

var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(connectionString);
transport.DefaultSchema("receiver");
transport.UseSchemaForEndpoint("Samples.SQLOutboxEF.Sender", "sender");
transport.UseSchemaForQueue("error", "dbo");
transport.UseSchemaForQueue("audit", "dbo");

var routing = transport.Routing();
routing.RouteToEndpoint(typeof(OrderAccepted).Assembly, "Samples.SQLOutboxEF.Sender");
routing.RegisterPublisher(typeof(OrderAccepted).Assembly, "Samples.SQLOutboxEF.Sender");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connectionString);
    });
persistence.Schema("receiver");
persistence.TablePrefix("");

var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();

endpointConfiguration.EnableOutbox();

In order for the Outbox to work, the business data has to reuse the same connection string as NServiceBus persistence:

public ReceiverDataContext(string connection)
    : base(connection)
{
}
public ReceiverDataContext(IDbConnection connection)
    : base((DbConnection) connection, false)
{
}

When the message arrives at the Receiver, it is dequeued using a native SQL Server transaction. Then a TransactionScope is created that encompasses

  • persisting business data:
var storageContext = context.SynchronizedStorageSession.SqlPersistenceSession();

var dbConnection = storageContext.Connection;
using (var receiverDataContext = new ReceiverDataContext(dbConnection))
{
    receiverDataContext.Database.UseTransaction(storageContext.Transaction);
    var order = new Order
    {
        OrderId = message.OrderId,
        Value = message.Value
    };
    receiverDataContext.Orders.Add(order);
    await receiverDataContext.SaveChangesAsync()
        .ConfigureAwait(false);
}
  • persisting saga data of OrderLifecycleSaga ,
  • storing the reply message and the timeout request in the Outbox:
var orderAccepted = new OrderAccepted
{
    OrderId = message.OrderId,
};
await context.Reply(orderAccepted)
    .ConfigureAwait(false);
public Task Handle(OrderSubmitted message, IMessageHandlerContext context)
{
    var orderTimeout = new OrderTimeout();
    return RequestTimeout(context, TimeSpan.FromSeconds(5), orderTimeout);
}

Finally the messages in the Outbox are pushed to their destinations. The timeout message gets stored in the NServiceBus timeout store and is sent back to the saga after requested delay of 5 seconds.

How it works

All the data manipulations happen atomically because SQL Server 2008 and later allows multiple (but not overlapping) instances of SqlConnection to enlist in a single TransactionScope without the need to escalate to DTC. The SQL Server manages these transactions like they were a single SqlTransaction.

Related Articles

  • Outbox
    Reliable messaging without distributed transactions.
  • SQL Server Transport
    High-level description of NServiceBus SQL Server Transport.

Last modified