Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Using TransactionalSession with Entity Framework and ASP.NET Core

This sample shows how to send messages and modify data in a database atomically within the scope of a web request using the NServiceBus.TransactionalSession package with ASP.NET Core. The operations are triggered by an incoming HTTP request to ASP.NET Core which will manage the ITransactionalSession lifetime using a request middleware.

Prerequisites

Ensure an instance of SQL Server (Version 2012 or above) is installed and accessible on localhost and port 1433.

Alternatively, change the connection string to point to a different SQL Server instance.

At startup, each endpoint will create the required SQL assets including databases, tables, and schemas.

Running the solution

When the solution is run, a new browser window/tab opens, as well as a console application. The browser will navigate to http://localhost:58118/.

An async WebAPI controller handles the request. It stores a new document using Entity Framework and sends an NServiceBus message to the endpoint hosted in the console application.

The message will be processed by the NServiceBus message handler and result in "Message received at endpoint" printed to the console. In addition, the handler will update the previously created entity.

To query all the stored entities, navigate to http://localhost:58118/all. To apply a complex object hierarchy using the transactional session on an endpoint, navigate to http://localhost:58118/service.

Configuration

The endpoint is configured using the UseNServiceBus extension method:

var endpointConfiguration = new EndpointConfiguration("Samples.ASPNETCore.Sender");

endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.EnableInstallers();
endpointConfiguration.UseTransport(new LearningTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly });

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(ConnectionString));

persistence.EnableTransactionalSession();

endpointConfiguration.EnableOutbox();

hostBuilder.UseNServiceBus(endpointConfiguration);

The transactional session is enabled via the endpointConfiguration.EnableTransactionalSession() method call. Note that the transactional session feature requires the outbox to be configured to ensure that operations across the storage and the message broker are atomic.

ASP.NET Core uses ConfigureWebHostDefaults for configuration and a custom result filter is registered for the ITransactionalSession lifetime management:

hostBuilder.Services.AddScoped<MessageSessionFilter>();
hostBuilder.Services.AddControllers(o => o.Filters.AddService<MessageSessionFilter>());

Entity Framework support is configured by registering the DbContext:

// Configure Entity Framework to attach to the synchronized storage session when required
hostBuilder.Services.AddScoped(b =>
{
    if (b.GetService<ISynchronizedStorageSession>() is ISqlStorageSession { Connection: not null } session)
    {
        var context = new MyDataContext(new DbContextOptionsBuilder<MyDataContext>()
            .UseSqlServer(session.Connection)
            .Options);

        //Use the same underlying ADO.NET transaction
        context.Database.UseTransaction(session.Transaction);

        //Ensure context is flushed before the transaction is committed
        session.OnSaveChanges((s, cancellationToken) => context.SaveChangesAsync(cancellationToken));

        return context;
    }
    else
    {
        var context = new MyDataContext(new DbContextOptionsBuilder<MyDataContext>()
            .UseSqlServer(ConnectionString)
            .Options);
        return context;
    }
});

The registration ensures that the MyDataContext type is built using the same session and transaction that is used by the ITransactionalSession. Once the transactional session is committed, it notifies the Entity Framework context to call SaveChangesAsync. When the transactional session is not used, a data context with a dedicated connection is returned.

Using the session

The message session is injected into SendMessageController via method injection. Message operations executed on the ITransactionalSession API are transactionally consistent with the database operations performed on the MyDataContext.

[HttpGet]
public async Task<string> Get([FromServices] ITransactionalSession messageSession)
{
    var id = Guid.NewGuid().ToString();

    await dataContext.MyEntities.AddAsync(new MyEntity { Id = id, Processed = false });

    var message = new MyMessage { EntityId = id };
    await messageSession.SendLocal(message);

    return $"Message with entity ID '{id}' sent to endpoint";
}

The lifecycle of the session is managed by the MessageSessionFilter which hooks into the result filter of the ASP.NET pipeline. When a controller action with an ITransactionalSession parameter is called, the filter opens the session, performs the next action, and then commits the session:

public class MessageSessionFilter : IAsyncResourceFilter
{
    public async Task OnResourceExecutionAsync(ResourceExecutingContext context, ResourceExecutionDelegate next)
    {
        if (context.ActionDescriptor.Parameters.Any(p => p.ParameterType == typeof(ITransactionalSession)))
        {
            var session = context.HttpContext.RequestServices.GetRequiredService<ITransactionalSession>();
            await session.Open(new SqlPersistenceOpenSessionOptions());

            var result = await next();

            if (result.Exception is null)
            {
                await session.Commit();
            }
        }
        else
        {
            await next();
        }
    }
}

For controller actions that do not have ITransactionalSession parameter, navigate to http://localhost:58118/all, a data context with a dedicated connection is used.

[HttpGet("/all")]
public async Task<List<MyEntity>> GetAll()
{
    return await dataContext.MyEntities.ToListAsync();
}

For example, navgiate to http://localhost:58118/service which injects a service that depends on ITransactionalSession.

[HttpGet("/service")]
[RequiresTransactionalSession]
public async Task<string> Get([FromServices] ServiceUsingTransactionalSession service)
{
    var id = await service.Execute();

    return $"Message with entity ID '{id}' sent to endpoint";
}

The [RequiresTransactionalSession] attribute makes sure the session is opened and committed.

public sealed class RequiresTransactionalSessionAttribute : TypeFilterAttribute
{
    public RequiresTransactionalSessionAttribute() : base(typeof(TransactionalSessionFilter))
    {
    }

    private class TransactionalSessionFilter : IAsyncResourceFilter
    {
        public TransactionalSessionFilter(ITransactionalSession transactionalSession)
        {
            this.transactionalSession = transactionalSession;
        }

        public async Task OnResourceExecutionAsync(ResourceExecutingContext context, ResourceExecutionDelegate next)
        {
            await transactionalSession.Open(new SqlPersistenceOpenSessionOptions());

            var result = await next();

            if (result.Exception is null)
            {
                await transactionalSession.Commit();
            }
        }

        private readonly ITransactionalSession transactionalSession;
    }
}

as long as the attribute is registered in the configuration

hostBuilder.Services.AddScoped<ServiceUsingTransactionalSession>();
hostBuilder.Services.AddScoped<RequiresTransactionalSessionAttribute>();

This diagram visualizes the interaction between the resource filter, ITransactionalSession, and the Web API controller:

sequenceDiagram autonumber User->>Filter: Http Request activate Filter Filter->>TransactionalSession: Open activate TransactionalSession TransactionalSession-->>Filter: Reply Filter->>Controller: next() activate Controller Controller->>TransactionalSession: Send/Publish... Controller->>TransactionalSession: Use SynchronizedStorageSession deactivate Controller Filter->>TransactionalSession: Commit deactivate TransactionalSession Filter-->>User: Reply deactivate Filter

Handling the message

The MyHandler handles the message sent by the ASP.NET controller and accesses the previously committed data stored by the controller:

public class MyHandler : IHandleMessages<MyMessage>
{
    static readonly ILog log = LogManager.GetLogger<MyHandler>();
    readonly MyDataContext dataContext;

    public MyHandler(MyDataContext dataContext)
    {
        this.dataContext = dataContext;
    }

    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        log.Info("Message received at endpoint");

        var entity = await dataContext.MyEntities.Where(e => e.Id == message.EntityId)
           .FirstAsync(cancellationToken: context.CancellationToken);
        entity.Processed = true;
    }
}

Related Articles