Getting Started

Entity Framework Core integration with SQL Persistence

Component: Sql Persistence
NuGet Package: NServiceBus.Persistence.Sql (5.x)
Target Version: NServiceBus 7.x


An instance of SQL Server Express, accessible as .\SqlExpress.

At startup, each endpoint creates its required SQL assets, including databases, tables, and schemas.

The database created by this sample is NsbSamplesEfCoreUowSql.

Running the project

  1. Start the solution
  2. The text Press <enter> to send a message will appear in both console windows
  3. Press enter in both console windows
If exceptions occur when running the sample, delete the tables from the database used by the code. By default, Entity Framework will not update table schemas. If tables use an old schema, the code won't execute properly.

Verifying that the sample works

  1. CreateOrderHandler displays information that an order was submitted.
  2. OrderLifecycleSaga displays information that the order process has been started.
  3. CreateShipmentHandler displays information that the shipment has been created.
  4. After a few seconds, CompleteOrderHandler displays information that the order is going to be completed.

Open SQL Server Management Studio and go to the database. Verify that there is a row in the saga state table (dbo.OrderLifecycleSagaData), in the orders table (dbo.Orders), and in the shipments table (dbo.Shipments).

Code walk-through

This sample contains the following projects:

  • Messages: A class library containing the message definitions.
  • Endpoint.SqlPersistence: A console application running the endpoint with SQL persistence.

Endpoint projects

The endpoint mimics a back-end system. It is configured to use the SQL Server transport. It uses Entity Framework to store business data (orders and shipments).

When the message arrives at the receiver, a single transactional data access context is created to ensure consistency of the entire message-handling process:

  • the message is removed from the input queue by the SQL Server transport
  • a new saga instance is created and stored by the SQL persistence
  • a new Order entity is created
var order = new Order
    OrderId = message.OrderId,
    Value = message.Value
  • a new Shipment entity is created
var order = context.DataContext().Orders.Local.Single(x => x.OrderId == message.OrderId);

var shipment = new Shipment
    Id = Guid.NewGuid(),
    Order = order,
    Location = message.ShipTo
  • a reply message is inserted to the queue
  • a timeout request is inserted to the queue

Notice how storing the shipment retrieves the Order from the session cache of Entity Framework. The Order is not yet persisted to the database.

Unit of work

The integration with Entity Framework allows users to take advantage of Unit of Work semantics of Entity Framework's DataContext. A single instance of the context is shared among all handlers and the SaveChanges method is called after all handlers do their work.

Setting up

The setup behavior makes sure that there is an instance of the unit of work wrapper class before the handlers are called.

public class UnitOfWorkSetupBehavior
    : Behavior<IIncomingLogicalMessageContext>
    Func<SynchronizedStorageSession, ReceiverDataContext> contextFactory;

    public UnitOfWorkSetupBehavior(Func<SynchronizedStorageSession, ReceiverDataContext> contextFactory)
        this.contextFactory = contextFactory;

    public override async Task Invoke(
        IIncomingLogicalMessageContext context, Func<Task> next)
        var uow = new EntityFrameworkUnitOfWork(contextFactory);
        await next().ConfigureAwait(false);

Creating data context

The data context is created only once, before it is first accessed from a handler. To maintain consistency, the business data has to reuse the same connection context as NServiceBus persistence. With SQL persistence, this is achieved by using the same ADO.NET connection and transaction objects in both NServiceBus and Entity Framework.

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new UnitOfWorkSetupBehavior(storageSession =>
    var dbConnection = storageSession.SqlPersistenceSession().Connection;

    var context = new ReceiverDataContext(new DbContextOptionsBuilder<ReceiverDataContext>()

    //Use the same underlying ADO.NET transaction

    //Call SaveChanges before completing storage session
    storageSession.SqlPersistenceSession().OnSaveChanges(x => context.SaveChangesAsync());

    return context;
}), "Sets up unit of work for the message");

The OnSaveChanges event is used to make sure SaveChangesAsync is called when the storage session completes successfully.

Related Articles

  • SQL Persistence
    A persister that targets relational databases, including SQL Server, Oracle. MySQL, and PostgreSQL.

Last modified