Entity Framework integration

Component: NServiceBus
NuGet Package NServiceBus (6.x)

Prerequisites

An instance of SQL Server Express is installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

The databases created by this sample are NsbSamplesEfUowNh and NsbSamplesEfUowSql.

Running the project

  1. Start the Solution
  2. The text Press <enter> to send a message should be displayed in both console windows.
  3. Hit <enter> in both console windows.
In case of exceptions when running the sample, delete tables from the database used by the code. Entity Framework by default can't update table schemas. If tables use the old schema, the code won't execute properly.

Verifying that the sample works

The result in both windows (SQL persistence and NHibernate persistence) should be the same.

  1. CreateOrderHandler displays information that an order was submitted.
  2. OrderLifecycleSaga displays information that the order process has been started.
  3. CreateShipmentHandler displays information that the shipment has been created.
  4. Finally, after a couple of seconds, CompleteOrderHandler displays information that the order is going to be completed.

Open SQL Server Management Studio and go to the database. Verify that there is a row in the saga state table (dbo.OrderLifecycleSagaData), in the orders table (dbo.Orders), and in the shipments table (dbo.Shipments).

Code walk-through

This sample contains four projects:

  • Messages - A class library containing the message definitions.
  • Shared - A class library containing the common logic code including data context and handler definitions.
  • Endpoint.NHibernate - A console application running the endpoint with NHibernate persistence.
  • Endpoint.SqlPersistence - A console application running the endpoint with SQL persistence.

Endpoint projects

The endpoint mimics a back-end system. It is configured to use SQL Server transport. It uses EntityFramework to store business data (orders and shipments).

When the message arrives at the receiver, a single transactional data access context is created to ensure consistency of the whole message handling process

  • message is removed from the input queue by the SQL Server transport
  • a new saga instance is created and stored by NHibernate persistence
  • a new Order entity is created
var order = new Order
{
    OrderId = message.OrderId,
    Value = message.Value
};
context.DataContext().Orders.Add(order);
  • a new Shipment entity is created
var order = context.DataContext().Orders.Local.Single(x => x.OrderId == message.OrderId);

var shipment = new Shipment
{
    Id = Guid.NewGuid(),
    Order = order,
    Location = message.ShipTo
};
context.DataContext().Shipments.Add(shipment);
  • a reply message is inserted to the queue
  • a timeout request is inserted to the queue

Notice how storing the shipment retrieves the Order from the session cache of Entity Framework. The Order is not yet persisted to the database.

Unit of work

The integration with Entity Framework allows users to take advantage of Unit of Work semantics of Entity Framework's DataContext. A single instance of the context is shared between all the handlers and the SaveChanges method is called after all handlers do their work.

Setting up

The setup behavior makes sure that there is an instance of unit of work wrapper class before the handlers are called.

public class UnitOfWorkSetupBehaviorBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    Func<SynchronizedStorageSession, ReceiverDataContext> contextFactory;

    public UnitOfWorkSetupBehaviorBehavior(Func<SynchronizedStorageSession, ReceiverDataContext> contextFactory)
    {
        this.contextFactory = contextFactory;
    }

    public override async Task Invoke(
        IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var uow = new EntityFrameworkUnitOfWork(contextFactory);
        context.Extensions.Set(uow);
        await next().ConfigureAwait(false);
        context.Extensions.Remove<EntityFrameworkUnitOfWork>();
    }
}

Creating data context

The data context is created only once, before it is first accessed from a handler. In order for the outbox to work, the business data has to reuse the same connection context as NServiceBus persistence. With NHibernate persistence, this is achieved by using the same connection string while within the TransactionScope.

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new UnitOfWorkSetupBehaviorBehavior(storageSession =>
{
    var dbConnection = storageSession.Session().Connection;
    var context = new ReceiverDataContext(dbConnection);

    //Don't use transaction because connection is enlisted in the TransactionScope
    context.Database.UseTransaction(null);

    //Call SaveChanges before completing storage session
    storageSession.OnSaveChanges(x => context.SaveChangesAsync());

    return context;
}), "Sets up unit of work for the message");

With SQL persistence the same goal is achieved by using the same ADO.NET connection and transaction objects in both NServiceBus and Entity Framework.

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(new UnitOfWorkSetupBehaviorBehavior(storageSession =>
{
    var dbConnection = storageSession.SqlPersistenceSession().Connection;
    var context = new ReceiverDataContext(dbConnection);

    //Use the same underlying ADO.NET transaction
    context.Database.UseTransaction(storageSession.SqlPersistenceSession().Transaction);

    //Call SaveChanges before completing storage session
    storageSession.SqlPersistenceSession().OnSaveChanges(x => context.SaveChangesAsync());

    return context;
}), "Sets up unit of work for the message");

The OnSaveChanges event is used to make sure SaveChangesAsync is called when the storage session completes successfully.

Related Articles


Last modified