This sample demonstrates how to extend the NServiceBus routing model with a custom distribution strategy. The default built-in distribution strategy uses a simple round-robin approach. This sample shows a more sophisticated distribution strategy that keeps the queue length of all load-balanced instances equal, allowing for the effective use of non-heterogeneous worker clusters.
Prerequisites
Make sure MSMQ is installed and configured as described in the MSMQ transport - MSMQ configuration section.
Running the project
- Start all the projects by pressing F5.
- The text
Press
should be displayed in the Client's console window.<enter> to send a message - Hold down enter for a few seconds to send many messages.
Verifying that the sample works correctly
- Notice more messages are being sent to Server.1 than to Server.2
- Use an MSMQ viewing tool to inspect the queue contents.
- Keep pressing enter and observe the number of messages in the Server.1 and Server.2 queues.
- Notice that although Server.2 processes messages 50% slower than Server.1, the number of messages in both queues is almost equal.
Code walk-through
This sample contains four projects.
Client
The Client application submits the orders for processing by the server. Client routing is configured to send PlaceOrder
commands to two instances of the Server
endpoint:
routing.RouteToEndpoint(typeof(PlaceOrder), "Samples.FairDistribution.Server");
The following code enables fair load distribution (as opposed to the default round-robin algorithm):
endpointConfiguration.EnableFeature<FairDistribution>();
var routing = endpointConfiguration.UseTransport(new MsmqTransport());
var settings = endpointConfiguration.GetSettings();
var strategy = new FairDistributionStrategy(
settings: settings,
endpoint: "Samples.FairDistribution.Server",
scope: DistributionStrategyScope.Send);
routing.SetMessageDistributionStrategy(strategy);
Server
The Server application processes the PlaceOrder
commands. On the server side, there is no need to register the custom distribution strategy:
endpointConfiguration.EnableFeature<FairDistribution>();
In real-world scenarios, NServiceBus endpoints are scaled out by deploying multiple physical instances of a single logical endpoint to multiple machines. For simplicity, the scale-out in this sample is simulated by having two separate projects, Server and Server2.
Shared project
The shared project contains definitions for messages and the custom routing logic.
Marking messages
All outgoing messages are marked with sequence numbers to keep track of how many messages are in flight at any given point in time. Separate sequences are maintained for each destination queue. The number of in-flight messages is estimated as the difference between the last sequence number sent and the last sequence number acknowledged.
public override Task Invoke(IDispatchContext context, Func<Task> next)
{
foreach (var operation in context.Operations)
{
var headers = operation.Message.Headers;
if (!headers.ContainsKey("NServiceBus.FlowControl.ControlAddress") ||
!headers.ContainsKey("NServiceBus.FlowControl.SessionId"))
{
continue;
}
if (headers.ContainsKey("NServiceBus.FlowControl.Marker"))
{
continue;
}
var addressTag = operation.AddressTag as UnicastAddressTag;
if (addressTag == null)
{
continue;
}
var marker = flowManager.GetNextMarker(addressTag.Destination);
headers["NServiceBus.FlowControl.Marker"] = marker.ToString();
headers["NServiceBus.FlowControl.Key"] = addressTag.Destination;
}
return next();
}
Acknowledging message delivery
After receiving every N
th message, the downstream endpoint instance sends back an acknowledgment (ACK) message containing the highest sequence number it has processed so far. The ACK messages are sent separately to each upstream endpoint instance.
public override async Task Invoke(ITransportReceiveContext context, Func<Task> next, Func<IRoutingContext, Task> fork)
{
await next();
var headers = context.Message.Headers;
if (!headers.TryGetValue("NServiceBus.FlowControl.Marker", out var markerString) ||
!headers.TryGetValue("NServiceBus.FlowControl.ControlAddress", out var controlAddress) ||
!headers.TryGetValue("NServiceBus.FlowControl.SessionId", out var sessionId))
{
return;
}
var marker = long.Parse(markerString);
var tracker = acknowledgedMarkers.AddOrUpdate(
key: controlAddress,
addValueFactory: k => new MarkerTracker(sessionId, marker),
updateValueFactory: (k, v) => v.OnNewMarker(sessionId, marker));
if (tracker.ShouldAcknowledge(maxAckBatchSize))
{
await SendAcknowledgement(context, fork, tracker.Marker, controlAddress, sessionId);
}
}
Processing acknowledgments
When the ACK message is received, the upstream endpoint calculates the number of messages that are currently in flight between itself and the downstream endpoint.
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var headers = context.MessageHeaders;
if (!headers.TryGetValue("NServiceBus.FlowControl.ACK", out var ackString)
|| !headers.TryGetValue("NServiceBus.FlowControl.Endpoint", out var endpoint)
|| !headers.TryGetValue("NServiceBus.FlowControl.Address", out var address)
|| !headers.TryGetValue("NServiceBus.FlowControl.SessionId", out var sessionId))
{
return next();
}
if (sessionId != currentSessionId)
{
return Task.CompletedTask;
}
var ack = long.Parse(ackString);
flowManager.Acknowledge(address, ack);
return Task.CompletedTask;
}
Smart routing
The calculated number of in-flight messages is then used to distribute messages in such a way that all instances of the downstream endpoint have roughly the same number of messages in their input queues. That way, the load is adjusted to the capacity of the given instance (e.g., instances running on weaker machines process fewer messages). As a result, no instance gets overwhelmed, and no instance is underutilized when work is available.
Determining an optimal value for N
(i.e., the number of messages between ACKs) may involve some trial and error. The bigger the N
value, the bigger the difference between input queue lengths. On the other hand, lower N
values cause more traffic as more ACKs are being sent upstream.
public string FindShortestQueue(string[] receiverAddresses)
{
FlowData best = null;
foreach (var address in receiverAddresses)
{
// This instance is not yet tracked, so assume it has shortest queue.
if (!data.TryGetValue(address, out var candidate))
{
return address;
}
best = Compare(best, candidate);
}
return best.Address;
}
Real-world deployment
For the sake of simplicity, all the endpoints in this sample run on a single machine. In real-world scenarios, it is usually best to run each instance on a separate virtual machine. In this case, the instance mapping file would contain machine
attributes mapping instances to their machines' hostnames instead of queue
attributes used to run more than one instance on a single box.