Getting Started
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Previews
Samples

Injecting repository pattern services

Component: Sql Persistence
NuGet Package: NServiceBus.Persistence.Sql (7.x)

The recommended method of accessing business data in an NServiceBus message handler is to use the synchronized storage session to get access to the native database connection that is already being used by NServiceBus. That way, the same connection and transaction used, for example, by the outbox feature, is the same connection and transaction used to store business data, ensuring that all data stored by the message handler is done so atomically.

In message handlers, some developers prefer to use an injected data service (for example, a class implementing the repository pattern) to interact with business data. In this case it's important to ensure that the injected data service also shares the same connection and transaction with the message handler.

In this sample, which uses SQL Server transport and SQL persistence, the message handler accepts an IDataService which is injected through the constructor. The sample will show how to use a message pipeline behavior to access the transport's ambient connection and transaction information, and then store that information within an intermediate ConnectionHolder class that can be used when the DataService implementation is constructed by the container.

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesInjectedServices.

Dependency injection relationships

This diagram shows the relationships between components:

graph TD SqlConnectionBehavior -.accesses.-> TransportTransaction SqlConnectionBehavior -.populates.-> ConnectionHolder MessageHandler1 --dependency--> DataService1 DataService1 --dependency--> ConnectionHolder MessageHandler2 --dependency--> DataService2 DataService2 --dependency--> ConnectionHolder

Because there may be many data services, a ConnectionHolder class serves as a storage point for the database connection and transaction information. The SqlConnectionBehavior executes earlier than the message handlers, so at that point, the behavior accesses the TransportTransaction (containing the connection and transaction used to receive the message from the SQL Server transport) and populates the details of the ConnectionHolder. By taking a dependency on the connection holder, each data service is able to access the connection and transaction for the message currently being processed.

Connection holder

The ConnectionHolder class is just a bucket to store the connection and transaction in properties:

public class ConnectionHolder
{
    public SqlConnection Connection { get; set; }
    public SqlTransaction Transaction { get; set; }
}

Data service interface

The IDataService interface represents a data service. The service specifies an operation to save business data, as well as an IsSame() method which will be used later to show that the data service's connection and transaction are the same as would be used if accessing data via SQL persistence.

public interface IDataService
{
    Task SaveBusinessDataAsync(Guid receivedId);
    bool IsSame(SqlConnection conn, SqlTransaction tx);
}

Data service implementation

By taking a constructor dependency on ConnectionHolder, the data service implementation is able to get access to the SqlConnection and SqlTransaction stored there.

public class DataService : IDataService
{
    SqlConnection connection;
    SqlTransaction transaction;

    public DataService(ConnectionHolder connectionHolder)
    {
        this.connection = connectionHolder.Connection;
        this.transaction = connectionHolder.Transaction;
    }

    public async Task SaveBusinessDataAsync(Guid receivedId)
    {
        var cmdText =
            "insert into ReceivedMessageIds (MessageId) values (@MessageId)";

        using (var cmd = new SqlCommand(cmdText, connection, transaction))
        {
            cmd.Parameters.AddWithValue("MessageId", receivedId);
            await cmd.ExecuteNonQueryAsync();
        }
    }

    public bool IsSame(SqlConnection conn, SqlTransaction tx)
    {
        return this.connection == conn && this.transaction == tx;
    }
}

Dependency injection configuration

The sample uses NServiceBus.Extensions.DependencyInjection to integrate with the Microsoft dependency injection system. If the sample were using the .NET Generic Host, the NServiceBus.Extensions.Hosting package would be used instead, which would also integrate with Microsoft's dependency injection abstractions.

In the configuration, both ConnectionHolder and IDataService are registered as Scoped, which in NServiceBus means that each message being processed will get a new instance. DataService is registered as the implementation for IDataService.

endpointConfiguration.RegisterComponents(services =>
{
    services.AddScoped<ConnectionHolder>();
    services.AddScoped<IDataService, DataService>();
});

Behavior

In order for the ConnectionHolder to be filled with the details from the current message processing pipeline, a pipeline behavior is used, which will run before the message handler.

public class SqlConnectionBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // Get the SQL connection/transaction used by SQL Transport
        var transportTx = context.Extensions.Get<TransportTransaction>();

        // Get this message's ConnectionHolder from the DI container
        var connectionHolder = context.Builder.GetRequiredService<ConnectionHolder>();

        // Assign the connection/transaction for the data service to use later
        connectionHolder.Connection = transportTx.Get<SqlConnection>(
            "System.Data.SqlClient.SqlConnection");
        connectionHolder.Transaction = transportTx.Get<SqlTransaction>(
            "System.Data.SqlClient.SqlTransaction");

        // Invoke the next stage of the pipeline,
        // which includes the message handler
        return next();
    }
}

The hardest part is retrieving the current connection/transaction information, as this is different based on the transport and persistence in use. In this case, the SQL Server transport is managing the connection and transaction as part of receiving the message, so the necessary data resides in the TransportTransaction which is accessible as an extension point on the behavior's context variable.

For help finding the correct ambient connection/transaction information in a specific scenario, contact support.

Using Behavior<IIncomingLogicalMessageContext> ensures that the behavior will be run in the stage just before the message handler is invoked. Calling the next() delegate determines where the next step of the pipeline chain (i.e. the message handler) will be called. For more information on the ordering of pipeline stages see Steps, stages, and connectors.

Once the behavior class exists, it must be registered with the pipeline:

endpointConfiguration.Pipeline.Register(typeof(SqlConnectionBehavior),
    "Forwards the NServiceBus SqlConnection/SqlTransaction to data services injected into message handlers.");

Message handler

With the connection holder, data service, and behavior established, message handlers can now take a dependency on IDataService and have a data service injected that already knows about the ambient connection and transaction:

public class TestMessageHandler : IHandleMessages<TestMsg>
{
    IDataService dataService;

    public TestMessageHandler(IDataService dataService)
    {
        this.dataService = dataService;
    }

    public async Task Handle(TestMsg message, IMessageHandlerContext context)
    {
        // Not necessary-shows that dataService details are same as NServiceBus
        var storageSession = context.SynchronizedStorageSession
            .SqlPersistenceSession();
        var currentConnection = storageSession.Connection as SqlConnection;
        var currentTransaction = storageSession.Transaction as SqlTransaction;
        var isSame = dataService.IsSame(currentConnection, currentTransaction);
        Console.WriteLine($"DataService details same as NServiceBus: {isSame}");

        // Use the DataService to write business data to the database
        Console.WriteLine($"Saving business data: {message.Id}");
        await dataService.SaveBusinessDataAsync(message.Id);
    }
}

When executed, the sample generates output similar to the following:

Press S to send a message, or Enter to exit
DataService details same as NServiceBus: True
Saving business data: 76a9419b-bde0-470e-b1fa-83cefb544f84
DataService details same as NServiceBus: True
Saving business data: ebe1b32b-e1b9-4262-95ed-a29e9b68dc7a

This shows that the business data is persisted to the database, and that the connection and transaction used are the same that would be obtained by calling .SqlPersistenceSession().

Related Articles


Last modified