Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Using NServiceBus.Router for atomic update-and-publish in WebAPI

NuGet Package: NServiceBus.Router (3.x)

This is a community-maintained project
Target Version: NServiceBus 7.x
Particular Software's NServiceBus.MessagingBridge package offers similar functionality to the NService.Router community package and should be considered for multi-transport operations.

This sample shows how to use the SQL Server transport in the ASP.NET Core WebAPI application to implement atomic update-and-publish while using a different transport for the backend. Basic concepts related to hosting NServiceBus endpoints in the MVC application are intentionally skipped but available in the basic MVC Core sample.

Very often the web controller needs to modify the data in a database and publish a message as part of handling a single HTTP request. For consistency these two operations need to be done atomically. That means that if either of them fails, nothing is visible to the outside world. If the transport used by the system does not support distributed transactions, the solution is to use a dedicated specialized transport only for the frontend, and route messages between the two transports using NServiceBus.Router.

The most convenient transport for the frontend is SQL Server transport as it can easily share the connection and transaction with a data access library such as EntityFramework.

The NServiceBus.Connector.SqlServer community project takes this idea even further and exposes a familiar IMessageSession API for web controllers. Internally the connector hosts a SQL-to-target transport router.

Running the solution

When the solution is run, four console windows are open. Press enter in the Client window to make a HTTP call to the Frontend. The HTTP request is handled by an async WebAPI controller. The client creates an Order entity through EntityFramework and publishes an NServiceBus even. The event is routed, via the Router, to the Backend application which logs the fact that it received the message.

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

This sample automatically creates database update_and_publish.

Code walk-through

The solution consists of five projects.

Shared

The Shared project contains the message contracts.

Client

The Client project calls the Frontend HTTP API to create orders. It does not use NServiceBus.

Frontend

The Frontend project contains an NServiceBus endpoint that runs the SQL Server transport.

var endpointConfiguration = new EndpointConfiguration("Samples.Router.UpdateAndPublish.Frontend");

var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionString);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.EnableInstallers();

In contains an async controller for handling API requests that stores data in a database and publishes an event.

[HttpPost]
public async Task<string> Post([FromBody] string orderId)
{
    using (var connection = connectionFactory())
    {
        await connection.OpenAsync().ConfigureAwait(false);
        using (var transaction = connection.BeginTransaction())
        {
            var dataContext = new FrontendDataContext(connection);
            dataContext.Database.UseTransaction(transaction);

            dataContext.Orders.Add(new Order
            {
                OrderId = orderId
            });

            var message = new OrderAccepted
            {
                OrderId = orderId
            };
            var options = new PublishOptions();
            var transportTransaction = new TransportTransaction();
            transportTransaction.Set(connection);
            transportTransaction.Set(transaction);
            options.GetExtensions().Set(transportTransaction);
            await messageSession.Publish(message, options).ConfigureAwait(false);

            await dataContext.SaveChangesAsync().ConfigureAwait(false);
            transaction.Commit();
        }
    }

    log.Info($"Order {orderId} accepted.");
    return $"Order {orderId} accepted.";

}
Both connection and transaction objects need to be passed to EntityFramework session. Connection and transaction are passed to NServiceBus transport using the TransportTransaction API.

Backend

The Backend project contains an NServiceBus endpoint that runs the Learning transport. The Backend processes OrderAccepted events. Because these events are published by an endpoint that is hosted behind a router, NServiceBus.Router subscription API has to be used.

var routerConnector = transport.Routing().ConnectToRouter("Samples.Router.UpdateAndPublish.Router");
routerConnector.RegisterPublisher(
    eventType: typeof(OrderAccepted),
    publisherEndpointName: "Samples.Router.UpdateAndPublish.Frontend");
The publisher (Frontend endpoint) does not need any routing configuration.

In real-life systems the backend is likely to not be monolithic (single logical endpoint) but rather a collection of logical endpoints communicating via NServiceBus. Since this sample assumes that the backend transport does not support distributed transactions, in order to provide Exactly-Once semantic message processing guarantees in the backend endpoints the Outbox feature should be used.

Router

The Router project sets up a bi-directional connection between SQL Server and Learning transports.

var routerConfig = new RouterConfiguration("Samples.Router.UpdateAndPublish.Router");

var frontendInterface = routerConfig.AddInterface<SqlServerTransport>("SQL", t =>
{
    t.ConnectionString(ConnectionString);
    t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});

var backendInterface = routerConfig.AddInterface<LearningTransport>("Learning", t => { });

var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("SQL", "Learning");
staticRouting.AddForwardRoute("Learning", "SQL");

routerConfig.AutoCreateQueues();

Samples

Related Articles


Last modified