Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Injecting repository pattern services

Component: Sql Persistence
NuGet Package: NServiceBus.Persistence.Sql (7.x)
Target Version: NServiceBus 8.x

The recommended method of accessing business data in an NServiceBus message handler is to use the synchronized storage session to get access to the native database connection that is being used by NServiceBus. The synchronized storage session gives access to the same native connection and transaction being used by the NServiceBus persistence. This ensures that all business data changes in the message handler are stored atomically with any NServiceBus internal data, such as outbox records.

Some developers prefer to inject data services into message handlers (for example, a class implementing the repository pattern) to interact with business data. In this case, it's important to ensure that the injected data service also shares the same connection and transaction with the message handler.

In this sample, using SQL Server transport and SQL persistence, the message handler accepts an IDataService instance injected through the constructor. A message pipeline behavior is used to access the transport's ambient connection and transaction information. Data using an intermediate ConnectionHolder class that is used when the DataService implementation is constructed by the dependency injection container.

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesInjectedServices.

Dependency injection relationships

This diagram shows the relationships between components:

graph TD SqlConnectionBehavior -.accesses.-> TransportTransaction SqlConnectionBehavior -.populates.-> ConnectionHolder MessageHandler1 --dependency--> DataService1 DataService1 --dependency--> ConnectionHolder MessageHandler2 --dependency--> DataService2 DataService2 --dependency--> ConnectionHolder

The ConnectionHolder class is a central storage point for the database connection and transaction and can be shared by multiple data services. The SqlConnectionBehavior executes before the message handlers, accesses the TransportTransaction, and populates the details from the ConnectionHolder. Each data service is able to access the connection and transaction for the message currently being processed by taking a dependency on the connection holder.

Connection holder

The ConnectionHolder class stores the current connection and transaction in properties:

public class ConnectionHolder
{
    public SqlConnection Connection { get; set; }
    public SqlTransaction Transaction { get; set; }
}

Data service interface

The IDataService interface represents a data service. The service specifies an operation to save business data. It also has an IsSame() method which is used to verify that the data service's connection and transaction are the same ones that would be used if business data was accessed via SQL Persistence instead.

public interface IDataService
{
    Task SaveBusinessDataAsync(Guid receivedId);
    bool IsSame(SqlConnection conn, SqlTransaction tx);
}

Data service implementation

By taking a constructor dependency on ConnectionHolder, the data service implementation is able to get access to the SqlConnection and SqlTransaction stored there.

public class DataService : IDataService
{
    readonly SqlConnection connection;
    readonly SqlTransaction transaction;

    public DataService(ConnectionHolder connectionHolder)
    {
        connection = connectionHolder.Connection;
        transaction = connectionHolder.Transaction;
    }

    public async Task SaveBusinessDataAsync(Guid receivedId)
    {
        var cmdText =
            "insert into ReceivedMessageIds (MessageId) values (@MessageId)";

        using var cmd = new SqlCommand(cmdText, connection, transaction);

        cmd.Parameters.AddWithValue("MessageId", receivedId);
        await cmd.ExecuteNonQueryAsync();

    }

    public bool IsSame(SqlConnection conn, SqlTransaction tx)
    {
        return connection == conn && transaction == tx;
    }
}

Dependency injection configuration

In the configuration, both ConnectionHolder and IDataService are registered as Scoped, which, in NServiceBus, means that each message being processed will get a new instance. DataService is registered as the implementation for IDataService.

endpointConfiguration.RegisterComponents(services =>
{
    services.AddScoped<ConnectionHolder>();
    services.AddScoped<IDataService, DataService>();
});

Behavior

In order for the ConnectionHolder to be filled with the details from the current message processing pipeline, a pipeline behavior is used, which will run before the message handler.

public class SqlConnectionBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // Get the SQL connection/transaction used by SQL Transport
        var transportTx = context.Extensions.Get<TransportTransaction>();

        // Get this message's ConnectionHolder from the DI container
        var connectionHolder = context.Builder.GetRequiredService<ConnectionHolder>();

        // Assign the connection/transaction for the data service to use later
        connectionHolder.Connection = transportTx.Get<SqlConnection>(
            "System.Data.SqlClient.SqlConnection");

        connectionHolder.Transaction = transportTx.Get<SqlTransaction>(
            "System.Data.SqlClient.SqlTransaction");

        // Invoke the next stage of the pipeline,
        // which includes the message handler
        return next();
    }
}

Retrieving the current connection/transaction information will depend on the transport and persistence in use. In this case, the SQL Server transport is managing the connection and transaction as part of receiving the message, so the necessary data resides in the TransportTransaction which is accessible as an extension point on the behavior's context variable.

Using Behavior<IIncomingLogicalMessageContext> ensures that the behavior will be run before the message handler is invoked. For more information on the ordering of pipeline stages see Steps, stages, and connectors.

Once the behavior class exists, it must be registered with the pipeline:

endpointConfiguration.Pipeline.Register(typeof(SqlConnectionBehavior),
    "Forwards the NServiceBus SqlConnection/SqlTransaction to data services injected into message handlers.");

Message handler

With the connection holder, data service, and behavior established, message handlers can now take a dependency on an IDataService instance that will automatically use the ambient connection and transaction:

public class TestMessageHandler : IHandleMessages<TestMsg>
{
    readonly IDataService dataService;

    public TestMessageHandler(IDataService dataService)
    {
        this.dataService = dataService;
    }

    public async Task Handle(TestMsg message, IMessageHandlerContext context)
    {
        // Not necessary-shows that dataService details are same as NServiceBus
        var storageSession = context.SynchronizedStorageSession
            .SqlPersistenceSession();

        var currentConnection = storageSession.Connection as SqlConnection;
        var currentTransaction = storageSession.Transaction as SqlTransaction;

        var isSame = dataService.IsSame(currentConnection, currentTransaction);

        Console.WriteLine($"DataService details same as NServiceBus: {isSame}");

        // Use the DataService to write business data to the database
        Console.WriteLine($"Saving business data: {message.Id}");

        await dataService.SaveBusinessDataAsync(message.Id);
    }
}

When executed, the sample generates output similar to the following:

Press S to send a message, or Enter to exit
DataService details same as NServiceBus: True
Saving business data: 76a9419b-bde0-470e-b1fa-83cefb544f84
DataService details same as NServiceBus: True
Saving business data: ebe1b32b-e1b9-4262-95ed-a29e9b68dc7a

This shows that the business data is persisted to the database using the same connection and transaction as the SQL Persistence session.

Related Articles

  • Accessing data via SQL persistence
    How to access business data using connections managed by NServiceBus SQL persistence.
  • SQL Persistence
    A persister that targets relational databases, including SQL Server, Oracle, MySQL, PostgreSQL, AWS Aurora MySQL and AWS Aurora PostgreSQL.
  • SQL Server transport
    An overview of the NServiceBus SQL Server transport.