The recommended method of accessing business data in an NServiceBus message handler is to use the synchronized storage session to get access to the native database connection that is already being used by NServiceBus. That way, the same connection and transaction used, for example, by the outbox feature, is the same connection and transaction used to store business data, ensuring that all data stored by the message handler is done so atomically.
In message handlers, some developers prefer to use an injected data service (for example, a class implementing the repository pattern) to interact with business data. In this case it's important to ensure that the injected data service also shares the same connection and transaction with the message handler.
In this sample, which uses SQL Server transport and SQL persistence, the message handler accepts an IDataService
which is injected through the constructor. The sample will show how to use a message pipeline behavior to access the transport's ambient connection and transaction information, and then store that information within an intermediate ConnectionHolder
class that can be used when the DataService
implementation is constructed by the container.
Prerequisites
Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost
and port 1433
. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.
in a terminal.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
The database created by this sample is NsbSamplesInjectedServices
.
Dependency injection relationships
This diagram shows the relationships between components:
Because there may be many data services, a ConnectionHolder
class serves as a storage point for the database connection and transaction information. The SqlConnectionBehavior
executes earlier than the message handlers, so at that point, the behavior accesses the TransportTransaction
(containing the connection and transaction used to receive the message from the SQL Server transport) and populates the details of the ConnectionHolder
. By taking a dependency on the connection holder, each data service is able to access the connection and transaction for the message currently being processed.
Connection holder
The ConnectionHolder
class is just a bucket to store the connection and transaction in properties:
public class ConnectionHolder
{
public SqlConnection Connection { get; set; }
public SqlTransaction Transaction { get; set; }
}
Data service interface
The IDataService
interface represents a data service. The service specifies an operation to save business data, as well as an IsSame()
method which will be used later to show that the data service's connection and transaction are the same as would be used if accessing data via SQL persistence.
public interface IDataService
{
Task SaveBusinessDataAsync(Guid receivedId);
bool IsSame(SqlConnection conn, SqlTransaction tx);
}
Data service implementation
By taking a constructor dependency on ConnectionHolder
, the data service implementation is able to get access to the SqlConnection and SqlTransaction stored there.
public class DataService : IDataService
{
SqlConnection connection;
SqlTransaction transaction;
public DataService(ConnectionHolder connectionHolder)
{
this.connection = connectionHolder.Connection;
this.transaction = connectionHolder.Transaction;
}
public async Task SaveBusinessDataAsync(Guid receivedId)
{
var cmdText =
"insert into ReceivedMessageIds (MessageId) values (@MessageId)";
using (var cmd = new SqlCommand(cmdText, connection, transaction))
{
cmd.Parameters.AddWithValue("MessageId", receivedId);
await cmd.ExecuteNonQueryAsync();
}
}
public bool IsSame(SqlConnection conn, SqlTransaction tx)
{
return this.connection == conn && this.transaction == tx;
}
}
Dependency injection configuration
The sample uses NServiceBus.Extensions.DependencyInjection to integrate with the Microsoft dependency injection system. If the sample were using the .NET Generic Host, the NServiceBus.Extensions.Hosting package would be used instead, which would also integrate with Microsoft's dependency injection abstractions.
In the configuration, both ConnectionHolder
and IDataService
are registered as Scoped, which in NServiceBus means that each message being processed will get a new instance. DataService
is registered as the implementation for IDataService
.
endpointConfiguration.RegisterComponents(services =>
{
services.AddScoped<ConnectionHolder>();
services.AddScoped<IDataService, DataService>();
});
Behavior
In order for the ConnectionHolder
to be filled with the details from the current message processing pipeline, a pipeline behavior is used, which will run before the message handler.
public class SqlConnectionBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// Get the SQL connection/transaction used by SQL Transport
var transportTx = context.Extensions.Get<TransportTransaction>();
// Get this message's ConnectionHolder from the DI container
var connectionHolder = context.Builder.GetRequiredService<ConnectionHolder>();
// Assign the connection/transaction for the data service to use later
connectionHolder.Connection = transportTx.Get<SqlConnection>(
"System.Data.SqlClient.SqlConnection");
connectionHolder.Transaction = transportTx.Get<SqlTransaction>(
"System.Data.SqlClient.SqlTransaction");
// Invoke the next stage of the pipeline,
// which includes the message handler
return next();
}
}
The hardest part is retrieving the current connection/transaction information, as this is different based on the transport and persistence in use. In this case, the SQL Server transport is managing the connection and transaction as part of receiving the message, so the necessary data resides in the TransportTransaction
which is accessible as an extension point on the behavior's context
variable.
Using Behavior
ensures that the behavior will be run in the stage just before the message handler is invoked. Calling the next()
delegate determines where the next step of the pipeline chain (i.e. the message handler) will be called. For more information on the ordering of pipeline stages see Steps, stages, and connectors.
Once the behavior class exists, it must be registered with the pipeline:
endpointConfiguration.Pipeline.Register(typeof(SqlConnectionBehavior),
"Forwards the NServiceBus SqlConnection/SqlTransaction to data services injected into message handlers.");
Message handler
With the connection holder, data service, and behavior established, message handlers can now take a dependency on IDataService
and have a data service injected that already knows about the ambient connection and transaction:
public class TestMessageHandler : IHandleMessages<TestMsg>
{
IDataService dataService;
public TestMessageHandler(IDataService dataService)
{
this.dataService = dataService;
}
public async Task Handle(TestMsg message, IMessageHandlerContext context)
{
// Not necessary-shows that dataService details are same as NServiceBus
var storageSession = context.SynchronizedStorageSession
.SqlPersistenceSession();
var currentConnection = storageSession.Connection as SqlConnection;
var currentTransaction = storageSession.Transaction as SqlTransaction;
var isSame = dataService.IsSame(currentConnection, currentTransaction);
Console.WriteLine($"DataService details same as NServiceBus: {isSame}");
// Use the DataService to write business data to the database
Console.WriteLine($"Saving business data: {message.Id}");
await dataService.SaveBusinessDataAsync(message.Id);
}
}
When executed, the sample generates output similar to the following:
Press S to send a message, or Enter to exit
DataService details same as NServiceBus: True
Saving business data: 76a9419b-bde0-470e-b1fa-83cefb544f84
DataService details same as NServiceBus: True
Saving business data: ebe1b32b-e1b9-4262-95ed-a29e9b68dc7a
This shows that the business data is persisted to the database, and that the connection and transaction used are the same that would be obtained by calling .
.