Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring

Entity Framework Core integration with SQL Persistence

Component: Sql Persistence
NuGet Package: NServiceBus.Persistence.Sql (6.x)
Target Version: NServiceBus 7.x

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesEfCoreUowSql.

Running the project

  1. Start the solution
  2. The text Press <enter> to send a message will appear in both console windows
  3. Press enter in both console windows

Verifying that the sample works

  1. CreateOrderHandler displays information that an order was submitted.
  2. OrderLifecycleSaga displays information that the order process has been started.
  3. CreateShipmentHandler displays information that the shipment has been created.
  4. After a few seconds, CompleteOrderHandler displays information that the order is going to be completed.

Open SQL Server Management Studio and go to the database. Verify that there is a row in the saga state table (dbo.OrderLifecycleSagaData), in the orders table (dbo.Orders), and in the shipments table (dbo.Shipments).

Code walk-through

This sample contains the following projects:

  • Messages: A class library containing the message definitions.
  • Endpoint.SqlPersistence: A console application running the endpoint with SQL persistence.

Endpoint projects

The endpoint mimics a back-end system. It is configured to use the SQL Server transport. It uses Entity Framework to store business data (orders and shipments).

When the message arrives at the receiver, a single transactional data access context is created to ensure consistency of the entire message-handling process:

  • the message is removed from the input queue by the SQL Server transport
  • a new saga instance is created and stored by the SQL persistence
  • a new Order entity is created
var order = new Order
{
    OrderId = message.OrderId,
    Value = message.Value
};
dataContext.Orders.Add(order);
  • a new Shipment entity is created
var order = dataContext.Orders.Local.Single(x => x.OrderId == message.OrderId);

var shipment = new Shipment
{
    Id = Guid.NewGuid(),
    Order = order,
    Location = message.ShipTo
};
dataContext.Shipments.Add(shipment);
  • a reply message is inserted to the queue
  • a timeout request is inserted to the queue

Notice how storing the shipment retrieves the Order from the session cache of Entity Framework. The Order is not yet persisted to the database.

Unit of work

The integration with Entity Framework allows users to take advantage of Unit of Work semantics of Entity Framework's DataContext. A single instance of the context is shared among all handlers and the SaveChanges method is called after all handlers do their work.

Creating data context

The data context is created only once, before it is first accessed from a handler. To maintain consistency, the business data has to reuse the same connection context as NServiceBus persistence. With SQL persistence, this is achieved by using the same ADO.NET connection and transaction objects in both NServiceBus and Entity Framework.

endpointConfiguration.RegisterComponents(c =>
{
    c.ConfigureComponent(b =>
    {
        var session = b.Build<ISqlStorageSession>();

        var context = new ReceiverDataContext(new DbContextOptionsBuilder<ReceiverDataContext>()
            .UseSqlServer(session.Connection)
            .Options);

        //Use the same underlying ADO.NET transaction
        context.Database.UseTransaction(session.Transaction);

        //Ensure context is flushed before the transaction is committed
        session.OnSaveChanges(s => context.SaveChangesAsync());

        return context;
    }, DependencyLifecycle.InstancePerUnitOfWork);
});

The OnSaveChanges event is used to make sure SaveChangesAsync is called when the storage session completes successfully.

Related Articles

  • SQL Persistence
    A persister that targets relational databases, including SQL Server, Oracle, MySQL, PostgreSQL, AWS Aurora MySQL and AWS Aurora PostgreSQL.