Multi-message conversations when scaling out

Component: Distributor
NuGet Package NServiceBus (6.x)

The sample demonstrates differences in behavior of some of NServiceBus APIs that depend on the scaling out method.

Prerequisites

Make sure MSMQ is set up as described in the MSMQ Transport - NServiceBus Configuration section.

Running the sample

  1. Start the solution.
  2. Press A in Sender.V5 console.
  3. Wait until Confirming order {orderId} message shows up in one of the worker console.
  4. Notice that the messages that took part in the order flow conversation were processed by both workers in the round-robin way, each time going through the Distributor.
  5. Press A in Sender.V6 console.
  6. Wait until Confirming order {orderId} message shows up in one of the worker console.
  7. Notice that all the messages that took part in the order flow conversation were processed by a single worker.
  8. Press A in Sender.V6 console.
  9. Wait until Confirming order {orderId} message shows up in one of the worker console.
  10. Notice that all the messages that took part in the order flow conversation were processed by a single worker (different than previously).
  11. Press B in the Sender.V5 console.
  12. Notice the message fails twice until it ultimately succeeds. Each retry is routed to different worker.
  13. Press B in the Sender.V6 console.
  14. Notice the message fails twice until it ultimately succeeds. Each retry is routed to different worker.

Code walk-through

This sample contains five NServiceBus console applications

  • Sender.V5 sends commands to the scaled out endpoint via Distributor.
  • Sender.V6 sends commands to the scaled out endpoint directly using Sender-Side Distribution
  • Distributor
  • Worker.1 and Worker.2 process commands simulating order flow, they represent the scaled out endpoint

APIs that depend on scaling out approach

The behavior of following APIs depend on the selected scaling out approach. The behaviour is slightly different if message is sent by Distributor from when it is sent by other endpoints. In case of delayed retries the behavior doesn't depend on which endpoint sent it, it's based on configuration only.

SendLocal

Sending to the local endpoint when processing a message sent via a Distributor, routes the message back to the Distributor. The Distributor then routes it to the first available worker.

public async Task Handle(PlaceOrder placeOrder, IMessageHandlerContext context)
{
    log.Info($"Sending order {placeOrder.OrderId} to validation.");

    await Task.Delay(1000)
        .ConfigureAwait(false);

    var validateOrder = new ValidateOrder
    {
        OrderId = placeOrder.OrderId,
        Sender = context.ReplyToAddress
    };


    await context.SendLocal(validateOrder)
        .ConfigureAwait(false);
}

When a message was sent directly or when it was called from outside of a message handler, the message is routed to the local queue of processing instance.

Defer

Deferring a message to the local endpoint when processing a message sent via a Distributor, creates a pending timeout with destination set to the Distributor's queue. When that timeout is due, the message is sent back to the Distributor. The Distributor then routes it to the first available worker.

public Task Handle(ValidateOrder message, IMessageHandlerContext context)
{
    var validated = new OrderValidated
    {
        OrderId = message.OrderId,
        Sender = message.Sender
    };

    log.Info($"Validating order {message.OrderId}. It will take 3 seconds.");

    var options = new SendOptions();
    options.RouteToThisEndpoint();
    options.DelayDeliveryWith(TimeSpan.FromSeconds(3));
    return context.Send(validated, options);
}

ReplyTo

Sending a message when processing a message sent via a Distributor, sets the reply to header to the Distributor's queue.

public Task Handle(PlaceOrderResponse placeOrderResponse, IMessageHandlerContext context)
{
    log.Info($"Received OrderPlaced. OrderId: {placeOrderResponse.OrderId}. Worker: {placeOrderResponse.WorkerName}");

    var confirmOrder = new ConfirmOrder
    {
        OrderId = placeOrderResponse.OrderId
    };

    return context.Reply(confirmOrder);
}

When another endpoint replies to such message, the reply is routed to the Distributor. The Distributor then routes it to the first available worker.

Delayed retries

var attempt = GetProcessingAttempt(context);

if (attempt < 2)
{
    log.Info($"Processing order {placeOrder.OrderId} failed. It will be retried using delayed retries.");
    //First attempt
    throw new Exception("Unexpected failure.");
}

Moving a message to delayed retries in MSMQ uses the same mechanism as message deferrals mentioned above. However, the behaviour is identical for messages sent via Distributor and others. The message is always deferred to the Distributor queue if the endpoint is configured to enlist with the Distributor.

var appSettings = ConfigurationManager.AppSettings;
endpointConfiguration.EnlistWithLegacyMSMQDistributor(
    masterNodeAddress: appSettings["DistributorAddress"],
    masterNodeControlAddress: appSettings["DistributorControlAddress"],
    capacity: 1);

Otherwise, the message is deferred to the local instance queue.

Samples

Related Articles


Last modified