Getting Started
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Previews
Samples

Using TransactionalSession with Entity Framework and ASP.NET Core

NuGet Package: NServiceBus.Persistence.Sql.TransactionalSession (6.x)
Target Version: NServiceBus 7.x
NServiceBus integrates with ASP.NET Core 3.x using GenericHost. For older versions of ASP.NET Core use the community package Community.NServiceBus.WebHost. ASP.NET Core 2.x has a race condition and the community package implements a workaround. It's recommended to upgrade to ASP.NET Core 3.0 and use NServiceBus.Extensions.Hosting package.

This sample shows how to send messages and modify data in a database atomically within the scope of a web request using the NServiceBus.TransactionalSession package in conjunction with ASP.NET Core. The operations are triggered by an incoming HTTP request to ASP.NET Core which will manage the ITransactionalSession lifetime inside a request middleware.

Prerequisites

  • Visual Studio 2019
  • LocalDB support. Alternatively, a custom connection string can be provided

Running the solution

When the solution is run, a new browser window/tab opens, as well as a console application. The browser will navigate to http://localhost:58118/.

An async WebAPI controller handles the request. It stores a new document using Entity Framework and sends an NServiceBus message to the endpoint hosted in the console application.

The message will be processed by the NServiceBus message handler and result in "Message received at endpoint"-message printed to the console. In addition, the handler will update the previously created entity.

Configuration

The endpoint is configured using the UseNServiceBus extension method:

.UseNServiceBus(context =>
{
    var endpointConfiguration = new EndpointConfiguration("Samples.ASPNETCore.Sender");
    var transport = endpointConfiguration.UseTransport<LearningTransport>();
    transport.Transactions(TransportTransactionMode.ReceiveOnly);
    endpointConfiguration.EnableInstallers();

    var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
    persistence.SqlDialect<SqlDialect.MsSqlServer>();
    persistence.ConnectionBuilder(() => new SqlConnection(ConnectionString));

    persistence.EnableTransactionalSession();

    endpointConfiguration.EnableOutbox();

    return endpointConfiguration;
})

The transactional session is enabled via the endpointConfiguration.EnableTransactionalSession() method call. Note that the transactional session feature requires the outbox to be configured in order to ensure that operations across the storage and the message broker are atomic.

ASP.NET Core uses ConfigureWebHostDefaults for configuration and a custom middleware is registered for the ITransactionalSession lifetime management:

app.UseMiddleware<MessageSessionMiddleware>();

Entity Framework support is configured by registering the DbContext:

.ConfigureServices(c =>
{
    // Configure Entity Framework to attach to the synchronized storage session
    c.AddScoped(b =>
    {
        var session = b.GetRequiredService<ISqlStorageSession>();
        var context = new MyDataContext(new DbContextOptionsBuilder<MyDataContext>()
            .UseSqlServer(session.Connection)
            .Options);

        //Use the same underlying ADO.NET transaction
        context.Database.UseTransaction(session.Transaction);

        //Ensure context is flushed before the transaction is committed
        session.OnSaveChanges((s) => context.SaveChangesAsync());

        return context;
    });
})

The registration ensures that the MyDataContext type is built using the same session and transaction that is used by the ITransactionalSession. Once the transactional session is committed, it notifies the Entity Framework context to call SaveChangesAsync.

Using the session

The message session is injected into SendMessageController via constructor injection along with the Entity Framework database context. Message operations executed on the ITransactionalSession API are transactionally consistent with the database operations performed on the MyDataContext.

[HttpGet]
public async Task<string> Get()
{
    var id = Guid.NewGuid().ToString();

    await dataContext.MyEntities.AddAsync(new MyEntity { Id = id, Processed = false });

    var message = new MyMessage { EntityId = id };
    await messageSession.SendLocal(message)
        .ConfigureAwait(false);

    return $"Message with entity ID '{id}' sent to endpoint";
}

The lifecycle of the session is managed by the MessageSessionMiddleware. It opens the session when an HTTP request arrives and takes care of committing the session once the ASP.NET pipeline completes:

public async Task InvokeAsync(HttpContext httpContext, ITransactionalSession session)
{
    await session.Open(new SqlPersistenceOpenSessionOptions());

    await next(httpContext);

    await session.Commit();
}

This diagram visualizes the interaction between the middleware, ITransactionalSession, and the Web API controller:

sequenceDiagram autonumber User->>Middleware: Http Request activate Middleware Middleware->>TransactionalSession: Open activate TransactionalSession TransactionalSession-->>Middleware: Reply Middleware->>Controller: next() activate Controller Controller->>TransactionalSession: Send/Publish... Controller->>TransactionalSession: Use SynchronizedStorageSession deactivate Controller Middleware->>TransactionalSession: Commit deactivate TransactionalSession Middleware-->>User: Reply deactivate Middleware

Handling the message

The MyHandler handles the message sent by the ASP.NET controller and accesses the previously committed data stored by the controller:

public class MyHandler : IHandleMessages<MyMessage>
{
    static readonly ILog log = LogManager.GetLogger<MyHandler>();
    readonly MyDataContext dataContext;

    public MyHandler(MyDataContext dataContext)
    {
        this.dataContext = dataContext;
    }

    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        log.Info("Message received at endpoint");

        var entity = await dataContext.MyEntities.Where(e => e.Id == message.EntityId)
           .FirstAsync();
        entity.Processed = true;
    }
}

Related Articles


Last modified