Routing system extensibility points

Component: NServiceBus
NuGet Package NServiceBus (7-pre)
This page targets a pre-release version and is subject to change prior to the final release.

Extending the routing with a custom data source makes sense in following scenarios:

  • When centralizing all routing information in a database.
  • When dynamically calculating routes based on endpoint discovery protocol (similar of UDDI).
  • When using a convention based on message naming.

Routing APIs

The routing system can be extended by accessing the APIs via the settings bag.

To learn more about implementing a routing extension see the custom routing sample.

Command routing

Routing extensions can access the route table from EndpointConfiguration level or from the feature level:

var settings = endpointConfiguration.GetSettings();
var routingTable = settings.Get<UnicastRoutingTable>();
routingTable.AddOrReplaceRoutes("MySource",
    new List<RouteTableEntry>
    {
        new RouteTableEntry(typeof(MyCommand),
            UnicastRoute.CreateFromEndpointName("MyEndpoint"))
    });

In the latter case the route table can be modified in the feature set up phase or can be passed further e.g. to a FeatureStartupTask and updated periodically when the source of routing information changes.

protected override void Setup(FeatureConfigurationContext context)
{
    var routingTable = context.Settings.Get<UnicastRoutingTable>();
    var refresherTask = new Refresher(routingTable);
    context.RegisterStartupTask(refresherTask);
}
public Refresher(UnicastRoutingTable routeTable)
{
    this.routeTable = routeTable;
}

protected override Task OnStart(IMessageSession session)
{
    timer = new Timer(
        callback: _ =>
        {
            routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
        },
        state: null,
        dueTime: TimeSpan.FromSeconds(30),
        period: TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

The source parameter is used as a unique key. When AddOrReplaceRoutes is called the first time with a given source key, the routes are added to the table. When it is called subsequently, the routes previously registered under that source key are replaced by the new routes.

The route table API is thread-safe and atomic, meaning either all the changes from the passed collection are successfully applied or none is.

The routing system prevents route ambiguity. If the new or replaced routes conflict with existing ones, an exception is thrown. It is up to the route extension to deal with that exception but usually it is best practice to trigger the endpoint shutdown preventing the incorrect routing of messages.

public RobustRefresher(UnicastRoutingTable routeTable, CriticalError criticalError)
{
    this.routeTable = routeTable;
    this.criticalError = criticalError;
}

protected override Task OnStart(IMessageSession session)
{
    timer = new Timer(
        callback: _ =>
        {
            try
            {
                routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
            }
            catch (Exception exception)
            {
                criticalError.Raise("Ambiguous route detected", exception);
            }
        },
        state: null,
        dueTime: TimeSpan.FromSeconds(30),
        period: TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

Event routing

Event routing differs depending on the transport capabilities. Multicast transports which support Publish-Subscribe pattern natively implement the event routing themselves. Refer to specific transport documentation for details on extensibility points.

Transports without that support rely on NServiceBus core routing for event delivery. The key concept is the collection of publishers. For each event it contains information on the logical endpoint that publishes it. Routing extensions can access the publishers collections from EndpointConfiguration or from the Feature set up code:

protected override void Setup(FeatureConfigurationContext context)
{
    var publishers = context.Settings.Get<Publishers>();
    var publisherAddress = PublisherAddress.CreateFromEndpointName("PublisherEndpoint");
    publishers.AddOrReplacePublishers(
        sourceKey: "MySource",
        entries: new List<PublisherTableEntry>
        {
            new PublisherTableEntry(typeof(MyEvent), publisherAddress)
        });
}

The source parameter has the same meaning and effect as in the routes collection.

The publishers collection is thread-safe and all operations on that collection are atomic.

Physical routing

Physical routing is responsible for mapping the destination logical endpoint to the actual transport address (queue name).

When using a broker transport, the physical routing is entirely managed by NServiceBus and does not require any configuration.

When using a bus transport, the physical routing is important because the transport address has to contain the information about the node of the bus that the endpoint is using. In MSMQ each machine runs a single node of the MSMQ system.

Routing extensions can influence the physical routing by modifying the endpoint instances collection. This is especially important for bus transports in a dynamically changing environment such as the cloud. Endpoints can be elastically scaled out and in and the routing, to be able to stay in sync, needs to derive the physical information from the current state of the environment, not from a static file.

See the article dedicated to MSMQ Transport for details.

Related Articles


Last modified