Forwarding address

Component: NServiceBus
NuGet Package NServiceBus (6.x)

When a handler is migrated to a new endpoint there can be outstanding messages in-flight. When these messages arrive at the original endpoint they need to be re-routed to the new endpoint.

NServiceBus can already be configured to forward a copy of every message that it processes to another address. This sample is different in a few key ways:

  1. This sample only forwards messages which have a configured forwarding address. Each message type can have a different forwarding address. The built-in forwarding forwards every message to a single destination address.
  2. This sample forwards messages to a logical endpoint, rather than a physical address. This allows it to interact with sender-side distribution if required.

Running the project

  1. Start all the projects by hitting F5.
  2. The text Endpoint Started. Press s to send a very important message. Any other key to exit should be displayed in the Sender's console window.
  3. Press S to send a message.
  4. The message will be processed by OriginalDestination
  5. The message will also be processed by NewDestination

Sender is configured to send messages to OriginalDestination which is configured to forward a copy to NewDestination.

Remove the handler code from OriginalDestination and run the sample again. Note that Sender is still configured to send the message to OriginalDestination which will forward a copy to NewDestination even though it no longer contains a handler of the correct type.

Code walk-through

This sample contains four projects.

Sender

Contains routing configuration to send ImportantMessage to the OriginalDestination endpoint.

var config = new EndpointConfiguration("Sender");
var transport = config.UseTransport<LearningTransport>();
var routing = transport.Routing();

routing.RouteToEndpoint(typeof(ImportantMessage), "OriginalDestination");

OriginalDestination

Configures the forwarding address for messages of the ImportantMessage type.

var endpointConfiguration = new EndpointConfiguration("OriginalDestination");
var transport = endpointConfiguration.UseTransport<LearningTransport>();
var routing = transport.Routing();

routing.ForwardToEndpoint(typeof(ImportantMessage), "NewDestination");

Original handler for ImportantMessage messages.

class ImportantMessageHandler :
    IHandleMessages<ImportantMessage>
{
    public Task Handle(ImportantMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine("Got a very important message");
        return Task.CompletedTask;
    }
}
This handler will still be called but can be safely removed if no longer required.

NewDestination

Contains the new handler for ImportantMessage messages.

class ImportantMessageHandler :
    IHandleMessages<ImportantMessage>
{
    public Task Handle(ImportantMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine("Got a very important message from the new handler");
        return Task.CompletedTask;
    }
}

NServiceBus.ForwardingAddress

Contains the implementation logic for leaving a forwarding address. There are two behaviors in the pipeline to implement this behavior.

The first behavior forks from the incoming physical context into the forwarding context after the message has been processed.

class ForwardMessagesWithForwardingAddress :
    ForkConnector<IIncomingPhysicalMessageContext, IRoutingContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next,
        Func<IRoutingContext, Task> fork)
    {
        var state = new MessageForwardingState();
        context.Extensions.Set(state);

        await next()
            .ConfigureAwait(false);

        var forwardingRoutes = state.GetRoutingStrategies();

        if (!forwardingRoutes.Any())
        {
            return;
        }
        await fork(context.CreateRoutingContext(forwardingRoutes))
            .ConfigureAwait(false);
    }
}

The second behavior is installed in the incoming logical context and matches incoming messages to a forwarding address.

class RerouteMessagesWithForwardingAddress :
    Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var messageForwardingState = context.Extensions.Get<MessageForwardingState>();

        foreach (var forwardingRoute in forwardingRoutesLookup[context.Message.MessageType])
        {
            Log.InfoFormat("Message {0} has forwarding address to {1}", context.MessageId, forwardingRoute);

            var routingStrategy = ResolveForwardingRoute(context, forwardingRoute);

            messageForwardingState.AddRoutingStrategy(routingStrategy);

            context.MessageHandled = true;
        }

        return next();
    }

    UnicastRoutingStrategy ResolveForwardingRoute(IIncomingLogicalMessageContext context, UnicastRoute forwardingRoute)
    {
        var outgoingLogicalMessage = new OutgoingLogicalMessage(
            context.Message.MessageType,
            context.Message.Instance);

        return routeResolver.ResolveRoute(
            forwardingRoute,
            outgoingLogicalMessage,
            context.MessageId,
            context.Headers,
            context.Extensions);
    }

    public RerouteMessagesWithForwardingAddress(ILookup<Type, UnicastRoute> forwardingRoutesLookup, UnicastRouteResolver routeResolver)
    {
        this.forwardingRoutesLookup = forwardingRoutesLookup;
        this.routeResolver = routeResolver;
    }

    ILookup<Type, UnicastRoute> forwardingRoutesLookup;
    UnicastRouteResolver routeResolver;

    static ILog Log = LogManager.GetLogger<RerouteMessagesWithForwardingAddress>();
}
This behavior sets the context.MessageHandled to true. This allows the message handler to be removed from the endpoint containing the forwarding address.

This project also contains a feature to wire up the two main behaviors.

class LeaveForwardingAddressFeature :
    Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        var settings = context.Settings;
        var forwardingAddressDirectory = settings.Get<ForwardingAddressDirectory>();
        var transportInfrastructure = settings.Get<TransportInfrastructure>();

        var routeResolver = new UnicastRouteResolver(
            i => transportInfrastructure.ToTransportAddress(LogicalAddress.CreateRemoteAddress(i)),
            settings.Get<EndpointInstances>(),
            settings.Get<DistributionPolicy>()
        );

        var rerouteBehavior = new RerouteMessagesWithForwardingAddress(
            forwardingAddressDirectory.ToLookup(),
            routeResolver
        );

        var invokeForwardingPipeline = new ForwardMessagesWithForwardingAddress();

        var pipeline = context.Pipeline;
        pipeline.Register(rerouteBehavior, "Finds forwarding addresses and resolves them");
        pipeline.Register(invokeForwardingPipeline, "Forwards messages to their matching forwarding addresses");
    }
}

The routing configuration extension enables the forwarding address feature and records the forwarding address.

public static class RoutingExtensions
{
    public static void ForwardToEndpoint(this RoutingSettings routing, Type messageTypeToForward, string destinationEndpointName)
    {
        var settings = routing.GetSettings();

        var endpointRoute = UnicastRoute.CreateFromEndpointName(destinationEndpointName);

        settings.GetOrCreate<ForwardingAddressDirectory>()
            .ForwardToRoute(messageTypeToForward, endpointRoute);

        settings.EnableFeatureByDefault<LeaveForwardingAddressFeature>();
    }
}

Messages

A shared assembly containing the ImportantMessage message type.


Last modified