Batch messages using Saga

Component: NServiceBus
NuGet Package: NServiceBus (8-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

Introduction

This sample shows how to control the task (work) generation via batching of the work items through Sagas. This pattern of throttled message generation can be helpful when the the rate of generation needs to be controlled:

  • When lots of messages are generated as part of an incoming message, the amount of messages can further delay other messages in the same queue.
  • When the work as a whole is completed only when all messages are processed. If all messages are generated upfront and the work is considered 'cancelled' or 'failed', generated messages that are already in the queue would still have to be processed (e.g. no-op).

Alternatively to avoid congesting the queue, messages can be generated and routed to a specific endpoint (endpoint per work type), or lower the concurrency limit to have the messages processed sequentially. The routing slip pattern might also be a good alternative in processing sequential messages.

The messages generated upfront could cause delays on processing of the message that get in the queue, where as with a batching saga processing can be interleaved and multiple batches can be processed in parallel.

gantt title Upfront messages generated vs. Batch generation using sagas axisFormat %d dateFormat YYYY-MM-DD section Upfront Gen Work Request A (100 msgs) : t1, 2022-01-01, 3d Work Request B (100 msgs) : t2, after t1, 3d Work Request C (100 msgs) : t3, after t2, 3d section Saga Work Request A (25 msgs) : s1, 2022-01-01, 1d Work Request B (25 msgs) : s2, after s1, 1d Work Request C (25 msgs) : s3, after s2, 1d Work Request A (25 msgs) : s4, after s3, 1d Work Request B (25 msgs) : s5, after s4, 1d Work Request C (25 msgs) : s6, after s5, 1d Work Request A (25 msgs) : s7, after s6, 1d Work Request B (25 msgs) : s8, after s7, 1d Work Request C (25 msgs) : s9, after s8, 1d

Once the projects are started, press S to send a StartProcessing message with a randomly WorkCount number that starts a new saga instance. This is a result of as configuration specified with the IAmStartedByMessages interface and mapping that correlates StartProcessing.ProcessId property on the message with OrderSagaData.ProcessId property on the saga data.

When processing the StartProcessing message each saga instance creates ProcessWorkOrder in batches of 100 messages (in each batch) and sends them off to the WorkProcessor endpoint to handle. Once all the messages of a batch is completed, the next batch of messages are generated.

The output to the console of the WorkGenerator will be:

Started.
Press 'S' to start a new process or [Enter] to exit.
SStarted process 'cc39af80-d33c-45e0-9991-5ece40cf7d6b' with '473' work orders.
Processing saga started: 'cc39af80-d33c-45e0-9991-5ece40cf7d6b'
Starting the process for '473' work orders.
Queueing next batch of work orders: (1 - 100).
Queueing next batch of work orders: (101 - 200).
Queueing next batch of work orders: (201 - 300).
Queueing next batch of work orders: (301 - 400).
Queueing next batch of work orders: (401 - 473).
All done. Took 33.737792

The output to the console of the WorkProcessor will be:

Started.
Press [Enter] to exit.
Processing work order '1'
Processing work order '4'
Processing work order '10'
Processing work order '2'
...
Processing work order '431'
Processing work order '427'
Processing work order '441'
Processing work order '420'

Notice that while the work requests are generated in sequence, on the processing side the work can happen out of order. The total time it took the process to finish is also logged.

Metrics such as Critical Time may not be sufficient to monitor how long the end-to-end process takes. This is something to note and measure to ensure quality of service.

The Saga

The saga controls the generation of the next batch of work and monitors for the completion signal of all messages in that batch, before starting the next batch. Very long running work items or failed work items could unintentionally stall the subsequent work items. Consider using saga timeouts and or work item failure responses to mitigate this if required.

private async Task ImportNextBatch(IMessageHandlerContext context)
{
    if (Data.Progress.AllWorkCompleted(Data.WorkCount))
    {
        await FinishWork(context);
    }
    else if (Data.Progress.HasRemainingWork(Data.WorkCount))
    {
        var importedPages = Data.Progress.ImportedPages();
        var remainingPages = Data.WorkCount - importedPages;
        var range = Enumerable.Range(importedPages + 1, remainingPages);
        var nextBatch = range.Batch(batchSize: 100).First().ToList();

        await SendWorkRequest(nextBatch, context);
    }
}

When a WorkOrderCompleted message comes back from the work handling endpoint, the saga checks if the work all completed or there is still a next batch to generate.

public async Task Handle(WorkOrderCompleted message, IMessageHandlerContext context)
{
    if (message.Status == WorkStatus.Failed)
    {
        //NOTE: The work can be marked as completed (to continue with the next pages), or
        //as failed so the next set of pages are stalled until the issue is resolved.
        await Console.Error.WriteLineAsync($"Work {message.WorkOrderNo} finished with error: {message.Error}");
        Data.FailedJobs += 1;
    }
    else
    {
        Console.WriteLine($"Work {message.WorkOrderNo} finished.");
    }

    Data.Progress.MarkWorkComplete(message.WorkOrderNo);

    if (Data.Progress.AllWorkCompleted(Data.WorkCount))
    {
        await FinishWork(context);
    }
    else if (Data.Progress.IsCurrentBatchCompleted())
    {
        await ImportNextBatch(context);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified