NServiceBus integrates with ASP.NET Core 3.x using GenericHost
. For older versions of ASP.NET Core use the community package Community.NServiceBus.WebHost. ASP.NET Core 2.x has a race condition and the community package implements a workaround. It's recommended to upgrade to ASP.NET Core 3.0 and use NServiceBus.Extensions.Hosting package.
This sample shows how to send messages and modify data in a database atomically within the scope of a web request using the NServiceBus.
package with ASP.NET Core. The operations are triggered by an incoming HTTP request to ASP.NET Core which will manage the ITransactionalSession
lifetime using a request middleware.
Prerequisites
Ensure an instance of SQL Server (Version 2012 or above) is installed and accessible on localhost
and port 1433
.
Alternatively, change the connection string to point to a different SQL Server instance.
At startup, each endpoint will create the required SQL assets including databases, tables, and schemas.
Running the solution
When the solution is run, a new browser window/tab opens, as well as a console application. The browser will navigate to http:/
.
An async WebAPI controller handles the request. It stores a new document using Entity Framework and sends an NServiceBus message to the endpoint hosted in the console application.
The message will be processed by the NServiceBus message handler and result in "Message received at endpoint"
printed to the console. In addition, the handler will update the previously created entity.
To query all the stored entities, navigate to http:/
. To apply a complex object hierarchy using the transactional session on an endpoint, navigate to http:/
.
Configuration
The endpoint is configured using the UseNServiceBus
extension method:
.UseNServiceBus(context =>
{
var endpointConfiguration = new EndpointConfiguration("Samples.ASPNETCore.Sender");
var transport = endpointConfiguration.UseTransport<LearningTransport>();
transport.Transactions(TransportTransactionMode.ReceiveOnly);
endpointConfiguration.EnableInstallers();
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(() => new SqlConnection(ConnectionString));
persistence.EnableTransactionalSession();
endpointConfiguration.EnableOutbox();
return endpointConfiguration;
})
The transactional session is enabled via the endpointConfiguration.
method call. Note that the transactional session feature requires the outbox to be configured to ensure that operations across the storage and the message broker are atomic.
ASP.NET Core uses ConfigureWebHostDefaults
for configuration and a custom result filter is registered for the ITransactionalSession
lifetime management:
s.AddScoped<MessageSessionFilter>();
s.AddControllers(o => o.Filters.AddService<MessageSessionFilter>());
Entity Framework support is configured by registering the DbContext
:
.ConfigureServices(c =>
{
// Configure Entity Framework to attach to the synchronized storage session when required
c.AddScoped(b =>
{
var synchronizedStorageSession = b.GetService<SynchronizedStorageSession>();
if (synchronizedStorageSession != null)
{
var session = synchronizedStorageSession.SqlPersistenceSession();
var context = new MyDataContext(new DbContextOptionsBuilder<MyDataContext>()
.UseSqlServer(session.Connection)
.Options);
//Use the same underlying ADO.NET transaction
context.Database.UseTransaction(session.Transaction);
//Ensure context is flushed before the transaction is committed
session.OnSaveChanges((s) => context.SaveChangesAsync());
return context;
}
else
{
var context = new MyDataContext(new DbContextOptionsBuilder<MyDataContext>()
.UseSqlServer(ConnectionString)
.Options);
return context;
}
});
})
The registration ensures that the MyDataContext
type is built using the same session and transaction that is used by the ITransactionalSession
. Once the transactional session is committed, it notifies the Entity Framework context to call SaveChangesAsync
. When the transactional session is not used, a data context with a dedicated connection is returned.
Using the session
The message session is injected into SendMessageController
via method injection. Message operations executed on the ITransactionalSession
API are transactionally consistent with the database operations performed on the MyDataContext
.
[HttpGet]
public async Task<string> Get([FromServices] ITransactionalSession messageSession)
{
var id = Guid.NewGuid().ToString();
await dataContext.MyEntities.AddAsync(new MyEntity { Id = id, Processed = false });
var message = new MyMessage { EntityId = id };
await messageSession.SendLocal(message);
return $"Message with entity ID '{id}' sent to endpoint";
}
The lifecycle of the session is managed by the MessageSessionFilter
which hooks into the result filter of the ASP.NET pipeline. When a controller action with an ITransactionalSession
parameter is called, the filter opens the session, performs the next action, and then commits the session:
public class MessageSessionFilter : IAsyncResourceFilter
{
public async Task OnResourceExecutionAsync(ResourceExecutingContext context, ResourceExecutionDelegate next)
{
if (context.ActionDescriptor.Parameters.Any(p => p.ParameterType == typeof(ITransactionalSession)))
{
var session = context.HttpContext.RequestServices.GetRequiredService<ITransactionalSession>();
await session.Open(new SqlPersistenceOpenSessionOptions());
var result = await next();
if(result.Exception is null)
{
await session.Commit();
}
}
else
{
await next();
}
}
}
The resource filter could be extended to return problem details (for example, with context.
) in cases where the transactional session cannot be committed. This is omitted from the sample.
For controller actions that do not have ITransactionalSession
parameter, navigate to http:/
, a data context with a dedicated connection is used.
[HttpGet("/all")]
public async Task<List<MyEntity>> GetAll()
{
return await dataContext.MyEntities.ToListAsync();
}
The sample uses method injection as an opinionated way of expressing the need for having the transactional boundaries managed by the infrastructure. If it is preferred to express the transactional boundaries with an attribute to make sure even complex dependency chains get access to the transactional session, without needing to inject that into the controller action, an action attribute must be used to annotate controllers or actions.
For example, navgiate to http:/
which injects a service that depends on ITransactionalSession
.
[HttpGet("/service")]
[RequiresTransactionalSession]
public async Task<string> Get([FromServices] ServiceUsingTransactionalSession service)
{
var id = await service.Execute();
return $"Message with entity ID '{id}' sent to endpoint";
}
The [RequiresTransactionalSession]
attribute makes sure the session is opened and committed.
public sealed class RequiresTransactionalSessionAttribute : TypeFilterAttribute
{
public RequiresTransactionalSessionAttribute() : base(typeof(TransactionalSessionFilter))
{
}
private class TransactionalSessionFilter : IAsyncResourceFilter
{
public TransactionalSessionFilter(ITransactionalSession transactionalSession)
{
this.transactionalSession = transactionalSession;
}
public async Task OnResourceExecutionAsync(ResourceExecutingContext context, ResourceExecutionDelegate next)
{
await transactionalSession.Open(new SqlPersistenceOpenSessionOptions());
var result = await next();
if(result.Exception is null)
{
await transactionalSession.Commit();
}
}
private readonly ITransactionalSession transactionalSession;
}
}
as long as the attribute is registered in the configuration
s.AddScoped<ServiceUsingTransactionalSession>();
s.AddScoped<RequiresTransactionalSessionAttribute>();
This diagram visualizes the interaction between the resource filter, ITransactionalSession
, and the Web API controller:
Handling the message
The MyHandler
handles the message sent by the ASP.NET controller and accesses the previously committed data stored by the controller:
public class MyHandler : IHandleMessages<MyMessage>
{
static readonly ILog log = LogManager.GetLogger<MyHandler>();
readonly MyDataContext dataContext;
public MyHandler(MyDataContext dataContext)
{
this.dataContext = dataContext;
}
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
log.Info("Message received at endpoint");
var entity = await dataContext.MyEntities.Where(e => e.Id == message.EntityId)
.FirstAsync();
entity.Processed = true;
}
}