The recommended method of accessing business data in an NServiceBus message handler is to use the synchronized storage session to get access to the native database connection that is being used by NServiceBus. The synchronized storage session gives access to the same native connection and transaction being used by the NServiceBus persistence. This ensures that all business data changes in the message handler are stored atomically with any NServiceBus internal data, such as outbox records.
Some developers prefer to inject data services into message handlers (for example, a class implementing the repository pattern) to interact with business data. In this case, it's important to ensure that the injected data service also shares the same connection and transaction with the message handler.
In this sample, using SQL Server transport and SQL persistence, the message handler accepts an IDataService
instance injected through the constructor. A message pipeline behavior is used to access the transport's ambient connection and transaction information. Data using an intermediate ConnectionHolder
class that is used when the DataService
implementation is constructed by the dependency injection container.
Prerequisites
Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost
and port 1433
. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.
in a terminal.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
The database created by this sample is NsbSamplesInjectedServices
.
Dependency injection relationships
This diagram shows the relationships between components:
The ConnectionHolder
class is a central storage point for the database connection and transaction and can be shared by multiple data services. The SqlConnectionBehavior
executes before the message handlers, accesses the TransportTransaction
, and populates the details from the ConnectionHolder
. Each data service is able to access the connection and transaction for the message currently being processed by taking a dependency on the connection holder.
Connection holder
The ConnectionHolder
class stores the current connection and transaction in properties:
public class ConnectionHolder
{
public SqlConnection Connection { get; set; }
public SqlTransaction Transaction { get; set; }
}
Data service interface
The IDataService
interface represents a data service. The service specifies an operation to save business data. It also has an IsSame()
method which is used to verify that the data service's connection and transaction are the same ones that would be used if business data was accessed via SQL Persistence instead.
public interface IDataService
{
Task SaveBusinessDataAsync(Guid receivedId);
bool IsSame(SqlConnection conn, SqlTransaction tx);
}
Data service implementation
By taking a constructor dependency on ConnectionHolder
, the data service implementation is able to get access to the SqlConnection and SqlTransaction stored there.
public class DataService : IDataService
{
SqlConnection connection;
SqlTransaction transaction;
public DataService(ConnectionHolder connectionHolder)
{
this.connection = connectionHolder.Connection;
this.transaction = connectionHolder.Transaction;
}
public async Task SaveBusinessDataAsync(Guid receivedId)
{
var cmdText =
"insert into ReceivedMessageIds (MessageId) values (@MessageId)";
using (var cmd = new SqlCommand(cmdText, connection, transaction))
{
cmd.Parameters.AddWithValue("MessageId", receivedId);
await cmd.ExecuteNonQueryAsync();
}
}
public bool IsSame(SqlConnection conn, SqlTransaction tx)
{
return this.connection == conn && this.transaction == tx;
}
}
Dependency injection configuration
In the configuration, both ConnectionHolder
and IDataService
are registered as Scoped, which, in NServiceBus, means that each message being processed will get a new instance. DataService
is registered as the implementation for IDataService
.
var containerSettings = endpointConfiguration.UseContainer(new DefaultServiceProviderFactory());
var services = containerSettings.ServiceCollection;
services.AddScoped<ConnectionHolder>();
services.AddScoped<IDataService, DataService>();
Behavior
In order for the ConnectionHolder
to be filled with the details from the current message processing pipeline, a pipeline behavior is used, which will run before the message handler.
public class SqlConnectionBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// Get the SQL connection/transaction used by SQL Transport
var transportTx = context.Extensions.Get<TransportTransaction>();
// Get this message's ConnectionHolder from the DI container
var connectionHolder = context.Builder.Build<ConnectionHolder>();
// Assign the connection/transaction for the data service to use later
connectionHolder.Connection = transportTx.Get<SqlConnection>(
"System.Data.SqlClient.SqlConnection");
connectionHolder.Transaction = transportTx.Get<SqlTransaction>(
"System.Data.SqlClient.SqlTransaction");
// Invoke the next stage of the pipeline,
// which includes the message handler
return next();
}
}
Retrieving the current connection/transaction information will depend on the transport and persistence in use. In this case, the SQL Server transport is managing the connection and transaction as part of receiving the message, so the necessary data resides in the TransportTransaction
which is accessible as an extension point on the behavior's context
variable.
For help finding the correct ambient connection/transaction information in a specific scenario, contact support.
Using Behavior
ensures that the behavior will be run before the message handler is invoked. For more information on the ordering of pipeline stages see Steps, stages, and connectors.
Once the behavior class exists, it must be registered with the pipeline:
endpointConfiguration.Pipeline.Register(typeof(SqlConnectionBehavior),
"Forwards the NServiceBus SqlConnection/SqlTransaction to data services injected into message handlers.");
Message handler
With the connection holder, data service, and behavior established, message handlers can now take a dependency on an IDataService
instance that will automatically use the ambient connection and transaction:
public class TestMessageHandler : IHandleMessages<TestMsg>
{
IDataService dataService;
public TestMessageHandler(IDataService dataService)
{
this.dataService = dataService;
}
public async Task Handle(TestMsg message, IMessageHandlerContext context)
{
// Not necessary-shows that dataService details are same as NServiceBus
var storageSession = context.SynchronizedStorageSession
.SqlPersistenceSession();
var currentConnection = storageSession.Connection as SqlConnection;
var currentTransaction = storageSession.Transaction as SqlTransaction;
var isSame = dataService.IsSame(currentConnection, currentTransaction);
Console.WriteLine($"DataService details same as NServiceBus: {isSame}");
// Use the DataService to write business data to the database
Console.WriteLine($"Saving business data: {message.Id}");
await dataService.SaveBusinessDataAsync(message.Id);
}
}
When executed, the sample generates output similar to the following:
Press S to send a message, or Enter to exit
DataService details same as NServiceBus: True
Saving business data: 76a9419b-bde0-470e-b1fa-83cefb544f84
DataService details same as NServiceBus: True
Saving business data: ebe1b32b-e1b9-4262-95ed-a29e9b68dc7a
This shows that the business data is persisted to the database using the same connection and transaction as the SQL Persistence session.