Forwarding address

Component: NServiceBus
NuGet Package NServiceBus (7.x)

When a handler is moved to a new endpoint, there may still be "in-flight" messages, bound for the original endpoint. When those messages arrive at the original endpoint, they need to be re-routed to the new endpoint.

An endpoint may be configured to forward a copy of every successfully processed message to another endpoint. This sample is different in a few key ways:

  1. This sample only forwards messages which have a configured forwarding address. Each message type can have a different forwarding address. The built-in forwarding forwards every message to a single destination address.
  2. This sample forwards messages to a logical endpoint, rather than a physical address. This allows it to interact with sender-side distribution if required.

Running the sample

  1. Open the solution in Visual Studio.
  2. Press F5.
  3. Follow the instructions in sender's console window to send a message to the OriginalDestination endpoint.
  4. The message will be processed by the OriginalDestination endpoint.
  5. The message will also be processed by the NewDestination endpoint.

The sender is configured to send messages to OriginalDestination, and OriginalDestination is configured to forward a copy to NewDestination.

Remove the handler code from OriginalDestination and run the sample again. Note that the sender is still configured to send the message to OriginalDestination, which will forward a copy to NewDestination even though it no longer contains a handler for the message.

Code walk-through

The sample contains four projects.

Sender

Contains routing configuration to send ImportantMessage messages to the OriginalDestination endpoint.

var config = new EndpointConfiguration("Sender");
var transport = config.UseTransport<LearningTransport>();
var routing = transport.Routing();

routing.RouteToEndpoint(typeof(ImportantMessage), "OriginalDestination");

OriginalDestination

This endpoint configures the forwarding address for ImportantMessage messages.

var config = new EndpointConfiguration("OriginalDestination");
var transport = config.UseTransport<LearningTransport>();
var routing = transport.Routing();

routing.ForwardToEndpoint(typeof(ImportantMessage), "NewDestination");

The endpoint also contains the original handler for ImportantMessage messages.

class ImportantMessageHandler :
    IHandleMessages<ImportantMessage>
{
    public Task Handle(ImportantMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine("Got a very important message");
        return Task.CompletedTask;
    }
}
This handler will still be called, but can be safely removed, if no longer required.

NewDestination

This endpoint contains the new handler for ImportantMessage messages.

class ImportantMessageHandler :
    IHandleMessages<ImportantMessage>
{
    public Task Handle(ImportantMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine("Got a very important message from the new handler!");
        return Task.CompletedTask;
    }
}

NServiceBus.ForwardingAddress

Contains the implementation logic for specifying a forwarding address. This is done by adding two behaviors to the pipeline.

The first behavior forks the incoming physical context into the forwarding context after the message has been processed.

class ForwardMessagesWithForwardingAddress :
    ForkConnector<IIncomingPhysicalMessageContext, IRoutingContext>
{
    public override async Task Invoke(
        IIncomingPhysicalMessageContext context,
        Func<Task> next,
        Func<IRoutingContext, Task> fork)
    {
        var state = new MessageForwardingState();
        context.Extensions.Set(state);

        await next()
            .ConfigureAwait(false);

        var forwardingRoutes = state.GetRoutingStrategies();

        if (!forwardingRoutes.Any())
        {
            return;
        }
        await fork(context.CreateRoutingContext(forwardingRoutes))
            .ConfigureAwait(false);
    }
}

The second behavior is installed in the incoming logical context and matches incoming messages to a forwarding address.

class RerouteMessagesWithForwardingAddress :
    Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var messageForwardingState = context.Extensions.Get<MessageForwardingState>();

        foreach (var forwardingRoute in forwardingRoutesLookup[context.Message.MessageType])
        {
            Log.InfoFormat("Message {0} has forwarding address to {1}", context.MessageId, forwardingRoute);

            var routingStrategy = ResolveForwardingRoute(context, forwardingRoute);

            messageForwardingState.AddRoutingStrategy(routingStrategy);

            context.MessageHandled = true;
        }

        return next();
    }

    UnicastRoutingStrategy ResolveForwardingRoute(IIncomingLogicalMessageContext context, UnicastRoute forwardingRoute)
    {
        var outgoingLogicalMessage = new OutgoingLogicalMessage(
            context.Message.MessageType,
            context.Message.Instance);

        return routeResolver.ResolveRoute(
            forwardingRoute,
            outgoingLogicalMessage,
            context.MessageId,
            context.Headers,
            context.Extensions);
    }

    public RerouteMessagesWithForwardingAddress(ILookup<Type, UnicastRoute> forwardingRoutesLookup, UnicastRouteResolver routeResolver)
    {
        this.forwardingRoutesLookup = forwardingRoutesLookup;
        this.routeResolver = routeResolver;
    }

    ILookup<Type, UnicastRoute> forwardingRoutesLookup;
    UnicastRouteResolver routeResolver;

    static ILog Log = LogManager.GetLogger<RerouteMessagesWithForwardingAddress>();
}
This behavior sets context.MessageHandled to true. This allows the message handler to be removed from the endpoint containing the forwarding address.

This project also contains a feature to wire up the two main behaviors.

class LeaveForwardingAddressFeature :
    Feature
{
    protected override void Setup(FeatureConfigurationContext context)
    {
        var settings = context.Settings;
        var forwardingAddressDirectory = settings.Get<ForwardingAddressDirectory>();
        var transportInfrastructure = settings.Get<TransportInfrastructure>();

        var routeResolver = new UnicastRouteResolver(
            i => transportInfrastructure.ToTransportAddress(LogicalAddress.CreateRemoteAddress(i)),
            settings.Get<EndpointInstances>(),
            settings.Get<DistributionPolicy>()
        );

        var rerouteBehavior = new RerouteMessagesWithForwardingAddress(
            forwardingAddressDirectory.ToLookup(),
            routeResolver
        );

        var invokeForwardingPipeline = new ForwardMessagesWithForwardingAddress();

        var pipeline = context.Pipeline;
        pipeline.Register(rerouteBehavior, "Finds forwarding addresses and resolves them");
        pipeline.Register(invokeForwardingPipeline, "Forwards messages to their matching forwarding addresses");
    }
}

The routing configuration extension enables the forwarding address feature and records the forwarding address.

public static class RoutingExtensions
{
    public static void ForwardToEndpoint(this RoutingSettings routing, Type messageTypeToForward, string destinationEndpointName)
    {
        var settings = routing.GetSettings();

        var endpointRoute = UnicastRoute.CreateFromEndpointName(destinationEndpointName);

        settings.GetOrCreate<ForwardingAddressDirectory>()
            .ForwardToRoute(messageTypeToForward, endpointRoute);

        settings.EnableFeatureByDefault<LeaveForwardingAddressFeature>();
    }
}

Messages

A shared assembly containing the ImportantMessage message type.


Last modified