Routing system extensibility points

Component: NServiceBus
NuGet Package NServiceBus (6.x)

Extending the routing with a custom data source makes sense in following scenarios:

  • When centralizing all routing information in a database.
  • When dynamically calculating routes based on endpoint discovery protocol (similar of UDDI).
  • When using a convention based on message naming.

Routing APIs

The routing system can be extended by accessing the APIs via the settings bag.

To learn more about implementing a routing extension see the custom routing sample.

Command routing

Routing extensions can access the route table from EndpointConfiguration level or from the feature level:

var settings = endpointConfiguration.GetSettings();
var routingTable = settings.Get<UnicastRoutingTable>();
routingTable.AddOrReplaceRoutes("MySource",
    new List<RouteTableEntry>
    {
        new RouteTableEntry(typeof(MyCommand),
            UnicastRoute.CreateFromEndpointName("MyEndpoint"))
    });

In the latter case the route table can be modified in the feature set up phase or can be passed further e.g. to a FeatureStartupTask and updated periodically when the source of routing information changes.

protected override void Setup(FeatureConfigurationContext context)
{
    var routingTable = context.Settings.Get<UnicastRoutingTable>();
    var refresherTask = new Refresher(routingTable);
    context.RegisterStartupTask(refresherTask);
}
public Refresher(UnicastRoutingTable routeTable)
{
    this.routeTable = routeTable;
}

protected override Task OnStart(IMessageSession session)
{
    timer = new Timer(
        callback: _ =>
        {
            routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
        },
        state: null,
        dueTime: TimeSpan.FromSeconds(30),
        period: TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

The source parameter is used as a unique key. When AddOrReplaceRoutes is called the first time with a given source key, the routes are added to the table. When it is called subsequently, the routes previously registered under that source key are replaced by the new routes.

The route table API is thread-safe and atomic, meaning either all the changes from the passed collection are successfully applied or none is.

The routing system prevents route ambiguity. If the new or replaced routes conflict with existing ones, an exception is thrown. It is up to the route extension to deal with that exception but usually it is best practice to trigger the endpoint shutdown preventing the incorrect routing of messages.

public RobustRefresher(UnicastRoutingTable routeTable, CriticalError criticalError)
{
    this.routeTable = routeTable;
    this.criticalError = criticalError;
}

protected override Task OnStart(IMessageSession session)
{
    timer = new Timer(
        callback: _ =>
        {
            try
            {
                routeTable.AddOrReplaceRoutes("MySource", LoadRoutes());
            }
            catch (Exception exception)
            {
                criticalError.Raise("Ambiguous route detected", exception);
            }
        },
        state: null,
        dueTime: TimeSpan.FromSeconds(30),
        period: TimeSpan.FromSeconds(30));
    return Task.CompletedTask;
}

Event routing

Event routing differs depending on the transport capabilities. Multicast transports which support Publish-Subscribe pattern natively implement the event routing themselves. Refer to specific transport documentation for details on extensibility points.

Transports without that support rely on NServiceBus core routing for event delivery. The key concept is the collection of publishers. For each event it contains information on the logical endpoint that publishes it. Routing extensions can access the publishers collections from EndpointConfiguration or from the Feature set up code:

protected override void Setup(FeatureConfigurationContext context)
{
    var publishers = context.Settings.Get<Publishers>();
    var publisherAddress = PublisherAddress.CreateFromEndpointName("PublisherEndpoint");
    publishers.AddOrReplacePublishers("MySource",
        new List<PublisherTableEntry>
        {
            new PublisherTableEntry(typeof(MyEvent), publisherAddress)
        });
}

The source parameter has the same meaning and effect as in the routes collection.

The publishers collection is thread-safe and all operations on that collection are atomic.

Physical routing

Physical routing is responsible for mapping the destination logical endpoint to the actual transport address (queue name).

When using a broker transport, the physical routing is entirely managed by NServiceBus and does not require any configuration.

When using a bus transport, the physical routing is important because the transport address has to contain the information about the node of the bus that the endpoint is using. In MSMQ each machine runs a single node of the MSMQ system.

Routing extensions can influence the physical routing by modifying the endpoint instances collection. This is especially important for bus transports in a dynamically changing environment such as the cloud. Endpoints can be elastically scaled out and in and the routing, to be able to stay in sync, needs to derive the physical information from the current state of the environment, not from a static file.

See the article dedicated to MSMQ Transport for details.

MessageEndpointMappings

The routing system can be extended in a static manner (once at startup) by providing custom sources of routing information to enrich or replace the standard routing configuration (UnicastBusConfig/MessageEndpointMappings configuration section in app.config file).

It can be done either by using a configuration source:

public class ConfigurationSource :
    IConfigurationSource
{
    public T GetConfiguration<T>() where T : class, new()
    {
        if (typeof(T) == typeof(UnicastBusConfig))
        {
            // read from existing config
            var config = (UnicastBusConfig)ConfigurationManager
                .GetSection(typeof(UnicastBusConfig).Name);
            if (config == null)
            {
                // create new config if it doesn't exist
                config = new UnicastBusConfig
                {
                    MessageEndpointMappings = new MessageEndpointMappingCollection()
                };
            }
            // append mapping to config
            var endpointMapping = new MessageEndpointMapping
            {
                AssemblyName = "assembly",
                Endpoint = "queue@machinename"
            };
            config.MessageEndpointMappings.Add(endpointMapping);
            return config as T;
        }

        return ConfigurationManager.GetSection(typeof(T).Name) as T;
    }
}
endpointConfiguration.CustomConfigurationSource(new ConfigurationSource());

or a configuration provider:

public class ProvideConfiguration :
    IProvideConfiguration<UnicastBusConfig>
{
    public UnicastBusConfig GetConfiguration()
    {
        // read from existing config
        var config = (UnicastBusConfig) ConfigurationManager
            .GetSection(typeof(UnicastBusConfig).Name);
        if (config == null)
        {
            // create new config if it doesn't exist
            config = new UnicastBusConfig
            {
                MessageEndpointMappings = new MessageEndpointMappingCollection()
            };
        }
        // append mapping to config
        var endpointMapping = new MessageEndpointMapping
        {
            AssemblyName = "assembly",
            Endpoint = "queue@machinename"
        };
        config.MessageEndpointMappings.Add(endpointMapping);
        return config;
    }
}

or in the app.config

<configuration>
  <configSections>
    <section name="UnicastBusConfig"
             type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
  </configSections>
  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="MyMessages" 
           Endpoint="Sales" />

      <add Assembly="MyMessages"
           Namespace="PriorityMessages"
           Endpoint="Preferred" />

      <add Assembly="MyMessages"
           Type="MyMessages.SendOrder"
           Endpoint="Sending" />
    </MessageEndpointMappings>
  </UnicastBusConfig>
</configuration>

The MessageEndpointMappings collection can be populated based on any external source. It is read during the endpoint start-up, before any messages are sent.

The route table is not updated during run-time, even if the contents of the mappings collection change. In case the routing data changes frequently, consider implementing a mechanism that would restart the endpoint when the change is detected.
For backwards compatibility a Messages attribute can be used instead of Type and Namespace attributes.

Related Articles


Last modified