In complex upgrade scenarios, it can be useful to forward a copy of processed messages to another destination. This allows an old version of an endpoint to run side-by-side with a new version, both processing the message until the old version can be fully retired.
Running the sample
- Open the solution in Visual Studio
- Press F5
- Follow the instructions in the sender's console window to send a message to the OriginalDestination endpoint
- The message is processed by the OriginalDestination endpoint
- The message is also processed by the UpgradedDestination endpoint
The sender is configured to send messages to OriginalDestination, and OriginalDestination is configured to forward a copy of every message it processes to UpgradedDestination.
Code walk-through
The sample contains four projects.
Sender
The sender contains routing configuration to send ImportantMessage
messages to the OriginalDestination endpoint.
var config = new EndpointConfiguration("Sender");
var transport = config.UseTransport<LearningTransport>();
var routing = transport.Routing();
routing.RouteToEndpoint(typeof(ImportantMessage), "OriginalDestination");
OriginalDestination
This endpoint configures the forwarding address for ImportantMessage
messages.
var config = new EndpointConfiguration("OriginalDestination");
config.UseTransport<LearningTransport>();
config.ForwardMessagesAfterProcessingTo("UpgradedDestination");
The endpoint also contains the original handler for ImportantMessage
messages.
class ImportantMessageHandler :
IHandleMessages<ImportantMessage>
{
public Task Handle(ImportantMessage message, IMessageHandlerContext context)
{
Console.WriteLine($"Got {message.Text}");
return Task.CompletedTask;
}
}
UpgradedDestination
This endpoint contains the new handler for ImportantMessage
messages.
class ImportantMessageHandler :
IHandleMessages<ImportantMessage>
{
public Task Handle(ImportantMessage message, IMessageHandlerContext context)
{
Console.WriteLine($"Got {message.Text}");
return Task.CompletedTask;
}
}
NServiceBus.MessageForwarding
This project contains the implementation logic for the message-forwarding behavior. This behavior forks the incoming physical context into the routing context after the message has been processed.
The first behavior forks the incoming physical context into the forwarding context after the message has been processed. Note that the behavior copies the headers and body before the message is processed so that an exact copy of the received message is forwarded.
class ForwardProcessedMessagesBehavior : ForkConnector<IIncomingPhysicalMessageContext, IRoutingContext>
{
private string forwardingAddress;
public ForwardProcessedMessagesBehavior(string forwardingAddress)
{
this.forwardingAddress = forwardingAddress;
}
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next, Func<IRoutingContext, Task> fork)
{
var messageToForward = new OutgoingMessage(
context.Message.MessageId,
new Dictionary<string, string>(context.Message.Headers),
context.Message.Body);
await next();
var forwardingRoutingStrategy = new UnicastRoutingStrategy(forwardingAddress);
var routingContext = new ForwardedRoutingContext(
messageToForward,
forwardingRoutingStrategy,
context);
await fork(routingContext);
}
}
This project also contains a configuration extension to specify the forwarding address and wire up the behavior.
public static class MessageForwardingConfigurationExtensions
{
public static void ForwardMessagesAfterProcessingTo(this EndpointConfiguration endpointConfiguration, string forwardingAddress)
{
endpointConfiguration.Pipeline.Register(
new ForwardProcessedMessagesBehavior(forwardingAddress),
"Forwards a copy of each processed message to a forwarding address"
);
}
}
Messages
A shared assembly containing the ImportantMessage
message type.