Message forwarding

Component: NServiceBus
NuGet Package NServiceBus (7.x)

In complex upgrade scenarios it can be useful to forward a copy of processed messages to another destination. This allows an old version of an endpoint to run side-by-side with a new version, both processing the message until the old version can be fully retired.

Running the sample

  1. Open the solution in Visual Studio
  2. Press F5
  3. Follow the instructions in the sender's console window to send a message to the OriginalDestination endpoint
  4. The message is processed by the OriginalDestination endpoint
  5. The message is also processed by the UpgradedDestination endpoint

The sender is configured to send messages to OriginalDestination, and OriginalDestination is configured to forward a copy of every message it processes to UpgradedDestination.

Code walk-through

The sample contains four projects.

Sender

The sender contains routing configuration to send ImportantMessage messages to the OriginalDestination endpoint.

var config = new EndpointConfiguration("Sender");
var transport = config.UseTransport<LearningTransport>();
var routing = transport.Routing();

routing.RouteToEndpoint(typeof(ImportantMessage), "OriginalDestination");

OriginalDestination

This endpoint configures the forwarding address for ImportantMessage messages.

var config = new EndpointConfiguration("OriginalDestination");
config.UseTransport<LearningTransport>();

config.ForwardMessagesAfterProcessingTo("UpgradedDestination");

The endpoint also contains the original handler for ImportantMessage messages.

class ImportantMessageHandler :
    IHandleMessages<ImportantMessage>
{
    public Task Handle(ImportantMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine($"Got {message.Text}");
        return Task.CompletedTask;
    }
}

UpgradedDestination

This endpoint contains the new handler for ImportantMessage messages.

class ImportantMessageHandler :
    IHandleMessages<ImportantMessage>
{
    public Task Handle(ImportantMessage message, IMessageHandlerContext context)
    {
        Console.WriteLine($"Got {message.Text}");
        return Task.CompletedTask;
    }
}

NServiceBus.MessageForwarding

This project contains the implementation logic for the message forwarding behavior. This behavior forks the incoming physical context into the routing context after the message has been processed.

The first behavior forks the incoming physical context into the forwarding context after the message has been processed. Note that the behavior copies the headers and body before the message is processed so that an exact copy of the received message is forwarded.

class ForwardProcessedMessagesBehavior : ForkConnector<IIncomingPhysicalMessageContext, IRoutingContext>
{
    private string forwardingAddress;

    public ForwardProcessedMessagesBehavior(string forwardingAddress)
    {
        this.forwardingAddress = forwardingAddress;
    }

    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next, Func<IRoutingContext, Task> fork)
    {
        var messageToForward = new OutgoingMessage(
            context.Message.MessageId,
            new Dictionary<string, string>(context.Message.Headers),
            context.Message.Body);

        await next()
            .ConfigureAwait(false);

        var forwardingRoutingStrategy = new UnicastRoutingStrategy(forwardingAddress);

        var routingContext = new ForwardedRoutingContext(
            messageToForward,
            forwardingRoutingStrategy,
            context);

        await fork(routingContext)
            .ConfigureAwait(false);
    }
}

This project also contains a configuration extension to specify the forwarding address and wire up the behavior.

public static class MessageForwardingConfigurationExtensions
{
    public static void ForwardMessagesAfterProcessingTo(this EndpointConfiguration endpointConfiguration, string forwardingAddress)
    {
        endpointConfiguration.Pipeline.Register(
            new ForwardProcessedMessagesBehavior(forwardingAddress), 
            "Forwards a copy of each processed message to a forwarding address"
        );
    }
}

Messages

A shared assembly containing the ImportantMessage message type.


Last modified