Getting Started
Architecture
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Routing system extensibility points

Component: NServiceBus
NuGet Package: NServiceBus (8.1)

Extending the NServiceBus routing subsystem with a custom data source makes sense in following scenarios:

  • When centralizing all routing information in a database.
  • When dynamically calculating routes based on endpoint discovery protocol (similar to UDDI).
  • When using a convention based on message naming.

Applying routing strategies

Routing a message involves the following steps:

  • Select the routing strategy based on the message intent
  • Determine the specific properties for the selected routing strategy based on the message type. e.g. as destination address in case of a command/unicast strategy

Command routing

To extend command routing, routing extensions can access the route table on the EndpointConfiguration level or from the feature level:

var settings = endpointConfiguration.GetSettings();
var routingTable = settings.GetOrCreate<UnicastRoutingTable>();
routingTable.AddOrReplaceRoutes("MySource",
    new List<RouteTableEntry>
    {
        new RouteTableEntry(typeof(MyCommand),
            UnicastRoute.CreateFromEndpointName("MyEndpoint"))
    });

The route table can be modified in the feature set up phase or can be passed further, e.g. to a FeatureStartupTask, and updated periodically when the source of the routing information changes.

protected override void Setup(FeatureConfigurationContext context)
{
    var routingTable = context.Settings.Get<UnicastRoutingTable>();
    var refresherTask = new Refresher(routingTable);
    context.RegisterStartupTask(refresherTask);
}
public Refresher(UnicastRoutingTable routeTable)
{
    this.routeTable = routeTable;
}

protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken)
{
    timer = new Timer(
        callback: _ =>
        {
            routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
        },
        state: null,
        dueTime: TimeSpan.FromSeconds(30),
        period: TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

The source parameter is used as a unique key. When AddOrReplaceRoutes is called the first time with a given source key, the routes are added to the table. When it is called subsequently, the routes previously registered under that source key are replaced by the new routes.

The route table API is thread-safe and atomic, meaning either all the changes from the passed collection are successfully applied or none are.

The routing system prevents route ambiguity. If new or replaced routes conflict with existing ones, an exception is thrown. It is up to the route extension to deal with the exception but usually it is a good practice to trigger the endpoint shutdown preventing the incorrect routing of messages.

public RobustRefresher(UnicastRoutingTable routeTable, CriticalError criticalError)
{
    this.routeTable = routeTable;
    this.criticalError = criticalError;
}

protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken)
{
    timer = new Timer(
        callback: _ =>
        {
            try
            {
                routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
            }
            catch (Exception exception)
            {
                criticalError.Raise("Ambiguous route detected", exception, cancellationToken);
            }
        },
        state: null,
        dueTime: TimeSpan.FromSeconds(30),
        period: TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

Event routing

Event routing differs depending on the transport capabilities. Multicast transports which support the Publish-Subscribe pattern natively implement the event routing themselves. Refer to specific transport documentation for details on extensibility points.

Transports without that support rely on NServiceBus core routing for event delivery. The key concept is the collection of publishers. For each event, it contains information on the logical endpoint that publishes it. Routing extensions can access the publishers collections from EndpointConfiguration or from the Feature setup code:

protected override void Setup(FeatureConfigurationContext context)
{
    var publishers = context.Settings.Get<Publishers>();
    var publisherAddress = PublisherAddress.CreateFromEndpointName("PublisherEndpoint");
    publishers.AddOrReplacePublishers(
        sourceKey: "MySource",
        entries: new List<PublisherTableEntry>
        {
            new PublisherTableEntry(typeof(MyEvent), publisherAddress)
        });
}

The source parameter has the same meaning and effect as in the routes collection.

The publishers collection is thread-safe and all operations on that collection are atomic.

Advanced routing customizations

If there's a need to adjust the routing based on criteria other than the message type, the routing pipeline stage allows routing customization for all messages emitted by the endpoint.

Related Articles