Routing system extensibility points

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

Extending the routing with a custom data source makes sense in following scenarios:

  • When centralizing all routing information in a database.
  • When dynamically calculating routes based on endpoint discovery protocol (similar of UDDI).
  • When using a convention based on message naming.

Routing APIs

The routing system can be extended by accessing the APIs via the settings bag.

To learn more about implementing a routing extension see the custom routing sample.

Command routing

Routing extensions can access the route table from EndpointConfiguration level or from the feature level:

Edit
var routingTable = endpointConfiguration.GetSettings().Get<UnicastRoutingTable>();
routingTable.AddOrReplaceRoutes("MySource",
        new List<RouteTableEntry>
        {
            new RouteTableEntry(typeof(MyCommand),
                UnicastRoute.CreateFromEndpointName("MyEndpoint"))
        });

In the latter case the route table can be modified in the feature set up phase or can be passed further e.g. to a FeatureStartupTask and updated periodically when the source of routing information changes.

Edit
protected override void Setup(FeatureConfigurationContext context)
{
    var routingTable = context.Settings.Get<UnicastRoutingTable>();
    var refresherTask = new Refresher(routingTable);
    context.RegisterStartupTask(refresherTask);
}
Edit
public Refresher(UnicastRoutingTable routeTable)
{
    this.routeTable = routeTable;
}

protected override Task OnStart(IMessageSession session)
{
    timer = new Timer(_ =>
    {
        routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
    }, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

The source parameter is used as a unique key. When AddOrReplaceRoutes is called the first time with a given source key, the routes are added to the table. When it is called subsequently, the routes previously registered under that source key are replaced by the new routes.

The route table API is thread-safe and atomic, meaning either all the changes from the passed collection are successfully applied or none is.

The routing system prevents route ambiguity. If the new or replaced routes conflict with existing ones, an exception is thrown. It is up to the route extension to deal with that exception but usually it is best practice to trigger the endpoint shutdown preventing the incorrect routing of messages.

Edit
public RobustRefresher(UnicastRoutingTable routeTable, CriticalError criticalError)
{
    this.routeTable = routeTable;
    this.criticalError = criticalError;
}

protected override Task OnStart(IMessageSession session)
{
    timer = new Timer(_ =>
    {
        try
        {
            routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
        }
        catch (Exception ex)
        {
            criticalError.Raise("Ambiguous route detected", ex);
        }
    }, null, TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

Event routing

Event routing differs depending on the transport capabilities. Multicast transports which support Publish-Subscribe pattern natively implement the event routing themselves. Refer to specific transport documentation for details on extensibility points.

Transports without that support rely on NServiceBus core routing for event delivery. The key concept is the collection of publishers. For each event it contains information on the logical endpoint that publishes it. Routing extensions can access the publishers collections from EndpointConfiguration or from the Feature set up code:

Edit
protected override void Setup(FeatureConfigurationContext context)
{
    var publishers = context.Settings.Get<Publishers>();
    var publisherAddress = PublisherAddress.CreateFromEndpointName("PublisherEndpoint");
    publishers.AddOrReplacePublishers("MySource",
        new List<PublisherTableEntry>
        {
            new PublisherTableEntry(typeof(MyEvent), publisherAddress)
        });
}

The source parameter has the same meaning and effect as in the routes collection.

The publishers collection is thread-safe and all operations on that collection are atomic.

Physical routing

Physical routing is responsible for mapping the destination logical endpoint to the actual transport address (queue name).

When using a broker transport, the physical routing is entirely managed by NServiceBus and does not require any configuration.

When using a bus transport, the physical routing is important because the transport address has to contain the information about the node of the bus that the endpoint is using. In MSMQ each machine runs a single node of the MSMQ system.

Routing extensions can influence the physical routing by modifying the endpoint instances collection. This is especially important for bus transports in a dynamically changing environment such as the cloud. Endpoints can be elastically scaled out and in and the routing, to be able to stay in sync, needs to derive the physical information from the current state of the environment, not from a static file.

Edit
protected override void Setup(FeatureConfigurationContext context)
{
    var endpointInstances = context.Settings.Get<EndpointInstances>();
    endpointInstances.AddOrReplaceInstances("MySource",
        new List<EndpointInstance>
        {
            new EndpointInstance("MyEndpoint").AtMachine("VM-1"),
            new EndpointInstance("MyEndpoint").AtMachine("VM-2")
        });
}

The source parameter has the same meaning and effect as in the routes collection.

The instances collection is thread-safe. It allows registering multiple instance of a given endpoint. In case there is more than one, message distribution is involved.

Message distribution

Every message is always delivered to a single physical instance of the logical endpoint. When scaling out an endpoint with a bus transport there are multiple instances of a single logical endpoint registered in the routing system. Each outgoing message has to undergo the distribution process to determine which instance is going to receive this particular message. By default a round-robin algorithm is used to determine the destination. Routing extensions can override this behavior by registering a custom DistributionStrategy for a given destination endpoint.

Edit
var transport = endpointConfiguration.UseTransport<MsmqTransport>();
var routing = transport.Routing();
routing.SetMessageDistributionStrategy(new RandomStrategy("Sales", DistributionStrategyScope.Send));
6.x NServiceBus
Edit
class RandomStrategy :
    DistributionStrategy
{
    static Random random = new Random();

    public RandomStrategy(string endpoint, DistributionStrategyScope scope) : base(endpoint, scope)
    {
    }

    public override string SelectReceiver(string[] receiverAddresses)
    {
        return receiverAddresses[random.Next(receiverAddresses.Length)];
    }
}
6.2 - N NServiceBus
Edit
class RandomStrategy :
    DistributionStrategy
{
    static Random random = new Random();

    public RandomStrategy(string endpoint, DistributionStrategyScope scope) : base(endpoint, scope)
    {
    }

    // Method will not be called since SelectDestination doesn't call base.SelectDestination
    public override string SelectReceiver(string[] receiverAddresses)
    {
        throw new NotImplementedException();
    }

    public override string SelectDestination(DistributionContext context)
    {
        // access to headers, payload...
        return context.ReceiverAddresses[random.Next(context.ReceiverAddresses.Length)];
    }
}

In Version 6.2 and above it is possible to override the virtual method SelectDestination. The method provides access to the DistributionStrategyContext that enables implementing more advanced distribution scenarios, such as distributing based on the headers of the message. When SelectDestination is overridden, do not call base.SelectDestination since the base method calls SelectReceiver for backward compatibility reasons. SelectReceiver can throw a NotImplementedException.

To learn more about creating custom distribution strategies see the fair distribution sample.

MessageEndpointMappings

The routing system can be extended in a static manner (once at startup) by providing custom sources of routing information to enrich or replace the standard routing configuration (UnicastBusConfig/MessageEndpointMappings configuration section in app.config file).

It can be done either by using a configuration source:

Edit
public class ConfigurationSource :
    IConfigurationSource
{
    public T GetConfiguration<T>() where T : class, new()
    {
        if (typeof(T) == typeof(UnicastBusConfig))
        {
            // read from existing config
            var config = (UnicastBusConfig)ConfigurationManager
                .GetSection(typeof(UnicastBusConfig).Name);
            if (config == null)
            {
                // create new config if it doesn't exist
                config = new UnicastBusConfig
                {
                    MessageEndpointMappings = new MessageEndpointMappingCollection()
                };
            }
            // append mapping to config
            var endpointMapping = new MessageEndpointMapping
            {
                AssemblyName = "assembly",
                Endpoint = "queue@machinename"
            };
            config.MessageEndpointMappings.Add(endpointMapping);
            return config as T;
        }

        return ConfigurationManager.GetSection(typeof(T).Name) as T;
    }
}
Edit
endpointConfiguration.CustomConfigurationSource(new ConfigurationSource());

or a configuration provider:

Edit
public class ProvideConfiguration :
    IProvideConfiguration<UnicastBusConfig>
{
    public UnicastBusConfig GetConfiguration()
    {
        // read from existing config
        var config = (UnicastBusConfig) ConfigurationManager
            .GetSection(typeof(UnicastBusConfig).Name);
        if (config == null)
        {
            // create new config if it doesn't exist
            config = new UnicastBusConfig
            {
                MessageEndpointMappings = new MessageEndpointMappingCollection()
            };
        }
        // append mapping to config
        var endpointMapping = new MessageEndpointMapping
        {
            AssemblyName = "assembly",
            Endpoint = "queue@machinename"
        };
        config.MessageEndpointMappings.Add(endpointMapping);
        return config;
    }
}

The MessageEndpointMappings collection can be populated based on any external source. It is read during the endpoint start-up, before any messages are sent.

The route table is not updated during run-time, even if the contents of the mappings collection change. In case the routing data changes frequently, consider implementing a mechanism that would restart the endpoint when the change is detected.

Related Articles


Last modified