Asynchronous Handlers

Component: NServiceBus
NuGet Package NServiceBus (6.x)
It is difficult to give generic advice on how asynchronous code should be structured. It is important to understand compute-bound vs. I/O-bound operations and avoid copying and pasting snippets without further analysis if they provide benefit for the involved business scenarios. Don't assume; measure it.

Handlers and sagas will be invoked from a thread in the thread pool. Depending on the transport implementation a worker thread pool thread or an I/O thread pool thread might be used. Typically message handlers and sagas issue I/O-bound work, such as sending or publishing messages, storing information into databases, and calling web services. In other cases, message handlers are used to schedule compute-bound work. To be able to write efficient message handlers and sagas, it is crucial to understand the difference between compute-bound and I/O-bound work.

Thread pool

A thread pool is associated with a process and manages the execution of asynchronous callbacks on behalf of the application. Its primary purpose is to reduce the number of application threads and provide efficient management of threads. The thread pool distinguishes asynchronous callbacks into two categories: I/O-bound and compute-bound. It manages a pool of threads which are associated with one of these categories.

Further reading:

Worker thread pool

Parallel / Compute-bound blocking work happens on the worker thread pool. Things like Task.Run, Task.Factory.StartNew, Parallel.For schedule tasks on the worker thread pool.

Alternatively, if compute-bound work is scheduled, the worker thread pool will start expanding its worker threads (ramp-up phase). Ramping up more worker threads is expensive. The thread injection rate of the worker thread pool is limited.

Compute-bound recommendations:

  • Offloading compute-bound work to the worker thread pool is a top-level concern only. Use Task.Run or Task.Factory.StartNew as high up in the call hierarchy as possible (e.g. in the Handle methods of either a handler or saga.
  • Avoid those operations deeper in the call hierarchy.
  • Group compute-bound operations together as much as possible.
  • Make compute-bound operations coarse-grained instead of fine-grained.

I/O-thread pool

I/O-bound work is scheduled on the I/O-thread pool. The I/O-bound thread pool has a fixed number of worker threads (usually equal to the number of cores) which can work concurrently on thousands of I/O-bound tasks. I/O-bound work under Windows uses so-called I/O completion ports (IOCP) to get notifications when an I/O-bound operation is completed. IOCP enables efficient offloading of I/O-bound work from the user code to the kernel, driver, and hardware without blocking the user code until the I/O work is done. To achieve that, the user code registers notifications in the form of a callback. The callback occurs on an I/O thread which is a pool thread managed by the I/O system that is made available to the user code.

I/O-bound work typically takes very long, and compute-bound work is comparatively cheap. The I/O system is optimized to keep the thread count low and schedule all callbacks, and therefore the execution of interleaved user code on that one thread. Due to those optimizations, all work gets serialized, and there is minimal context switching as the OS scheduler owns the threads. In general, asynchronous code can handle bursting traffic much better because of the "always-on" nature of the IOCP.

Memory and allocations

Asynchronous code tends to use much less memory because the amount of memory saved by freeing up a thread in the worker thread pool dwarfs the amount of memory used for all the compiler-generated async structures combined.

Synchronous vs. asynchronous

If each request is examined in isolation, asynchronous code would be slightly slower than the corresponding synchronous version. There might be extra kernel transitions, task scheduling, etc. involved but the scalability more than makes up for it.

From a server perspective, if asynchronous code is compared to synchronous code by looking at one method or one request at a time, then synchronous might make more sense. But if asynchronous code is compared to parallelism - watching the server as a whole - asynchronous wins. Every worker thread that can be freed up on a server is worth freeing up. It reduces the amount of memory needed and frees up the CPU for compute-bound work while saturating the I/O system completely.

Calling short-running, compute-bound code

Short-running, compute-bound code that is executed in the handler should be executed directly on the I/O-thread that is executing the handler code.

public class ShortComputeBoundHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        ComputeBoundComponent.BlocksForAShortTime();
        return Task.CompletedTask;
    }
}

Call the code directly and do not wrap it with a Task.Run or Task.Factory.StartNew.

For the majority of business scenarios, this approach is acceptable since many of the asynchronous base class library methods in the .NET Framework will schedule continuations on the worker thread pool; the likelihood that no I/O-thread is blocked is high.

Calling long-running, compute-bound code

This approach should be used only after a thorough analysis of the runtime behavior and the code involved in the call hierarchy of a handler. Wrapping code inside the handler with Task.Run or Task.Factory.StartNew can seriously harm the throughput if applied incorrectly. It should be used when multiple long-running compute-bound tasks need to be executed in parallel.

Long-running compute-bound code that is executed in a handler could be offloaded to the worker thread pool.

public class LongComputeBoundHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var longRunning1 = Task.Run(() => ComputeBoundComponent.BlocksForALongTime());
        var longRunning2 = Task.Run(() => ComputeBoundComponent.BlocksForALongTime());
        return Task.WhenAll(longRunning1, longRunning2);
    }
}

Wrap the compute-bound code in a Task.Run or Task.Factory.StartNew and await the result of the task.

Return or await

Await the task

For the majority of cases, it is sufficient to mark the handler's Handle method with the async keyword and await all asynchronous calls inside the method.

public class HandlerAwaitsTheTask :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await SomeLibrary.SomeAsyncMethod(message);
    }
}

Return the task

For high-throughput scenarios and if there are only one or two asynchronous exit points in the Handle method, the async keyword can be avoided completely by returning the task instead of awaiting it. This will omit the state machine creation which drives the async code and reduce the number of allocations on the given code path.

public class HandlerReturnsATask :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var task = SomeLibrary.SomeAsyncMethod(message);
        return task;
    }
}
public class HandlerReturnsTwoTasks :
    IHandleMessages<MyMessage>
{
    bool someCondition = true;

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        if (someCondition)
        {
            // Task.CompletedTask
            return Task.CompletedTask;
        }

        return SomeLibrary.SomeAsyncMethod(message);
    }
}

Concurrency

Task-based APIs enable better composition of asynchronous code and allow conscious decisions on whether to execute the asynchronous code sequentially or concurrently.

Small amount of concurrent message operations

Batched

By default, all outgoing message operations on the message handler contexts are batched. Batching means messages are kept in memory and sent out when the handler is completed. So the I/O-bound work happens outside the execution scope of a handler (individual transports may apply optimizations). For a few outgoing message operations it makes sense, to reduce complexity, to sequentially await all the outgoing operations as shown below.

public class BatchedDispatchHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        for (var i = 0; i < 100; i++)
        {
            var myMessage = new MyMessage();
            await context.Send(myMessage)
                .ConfigureAwait(false);
        }
    }
}

Immediate dispatch

Immediate dispatch means outgoing message operations will be immediately dispatched to the underlying transport. For immediate dispatch operations, it might make sense to execute them concurrently as shown below.

public class ImmediateDispatchHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var tasks = new Task[100];
        for (var i = 0; i < 100; i++)
        {
            var options = new SendOptions();
            options.RequireImmediateDispatch();

            var myMessage = new MyMessage();
            tasks[i] = context.Send(myMessage, options);
        }
        return Task.WhenAll(tasks);
    }
}

Large amount of concurrent message operations

Unbounded concurrency can be problematic. For large numbers of concurrent message operations, it might be preferable to package multiple outgoing operations together into batches, limiting the concurrency to the size of an individual batch (divide & conquer).

public class PacketsImmediateDispatchHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        for (var i = 0; i < 100; i++)
        {
            var tasks = new Task[100];
            for (var j = 0; j < 100; j++)
            {
                var options = new SendOptions();
                options.RequireImmediateDispatch();
                var myMessage = new MyMessage();
                tasks[j] = context.Send(myMessage, options);
            }
            await Task.WhenAll(tasks)
                .ConfigureAwait(false);
        }
    }
}

It is also possible to limit the concurrency by using SemaphoreSlim as shown below.

public class LimitConcurrencyImmediateDispatchHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var semaphore = new SemaphoreSlim(100);

        var tasks = new Task[10000];
        for (var i = 0; i < 10000; i++)
        {
            await semaphore.WaitAsync()
                .ConfigureAwait(false);

            tasks[i] = Send(context, semaphore);
        }
        await Task.WhenAll(tasks)
            .ConfigureAwait(false);
    }

    static async Task Send(IMessageHandlerContext context, SemaphoreSlim semaphore)
    {
        try
        {
            var options = new SendOptions();
            options.RequireImmediateDispatch();
            var message = new MyMessage();
            await context.Send(message, options)
                .ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
    }
}

In practice, packaging operations together has proven to be more effective both in regards to memory allocations and performance. The snippet is shown nonetheless for completeness reasons as well as because SemaphoreSlim is a useful concurrency primitive for various scenarios.

Integration with non-tasked based APIs

Events

Sometimes it is necessary to call APIs from an asynchronous handler that uses events as the trigger for completion. Before async/await was introduced, ManualResetEvent or AutoResetEvent were usually used to synchronize runtime code flow. Unfortunately, these synchronization primitives are of a blocking nature. For asynchronous one-time event synchronization, the TaskCompletionSource<TResult> can be used.

public class HandlerWhichIntegratesWithEvent :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var cancellationToken = new CancellationTokenSource();
        cancellationToken.CancelAfter(TimeSpan.FromSeconds(10));

        var taskCompletionSource = new TaskCompletionSource<object>();

        using (cancellationToken.Token.Register(
            callback: state =>
            {
                var completionSource = (TaskCompletionSource<object>) state;
                completionSource.TrySetCanceled();
            },
            state: taskCompletionSource))
        {
            var dependency = new DependencyWhichRaisedEvent();
            dependency.MyEvent += (sender, args) =>
            {
                taskCompletionSource.TrySetResult(null);
            };

            await taskCompletionSource.Task
                .ConfigureAwait(false);
        }
    }
}

The above snippet shows how a TaskCompletionSource<TResult> can be used to asynchronously wait for an event to happen and optionally cancel it.

Asynchronous programming model (APM) pattern

For existing code which uses the Asynchronous Programming Model (APM), it is best to use Task.Factory.FromAsync to wrap the BeginOperationName and EndOperationName methods with a task object.

public class HandlerWhichIntegratesWithAPM :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var dependency = new DependencyWhichUsesAPM();

        var result = await Task.Factory.FromAsync(
                beginMethod: (callback, state) =>
                {
                    var d = (DependencyWhichUsesAPM) state;
                    return d.BeginCall(callback, state);
                },
                endMethod: asyncResult =>
                {
                    var d = (DependencyWhichUsesAPM) asyncResult.AsyncState;
                    return d.EndCall(asyncResult);
                },
                state: dependency)
            .ConfigureAwait(false);

        // Use the result in some way
        Trace.WriteLine(result);
    }
}

Asynchronous RPC calls

The APM approach described above can be used to integrate with remote procedure calls as shown in this snippet:

public class HandlerWhichIntegratesWithRemotingWithAPM :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var asyncClient = new AsyncClient();
        var result = await asyncClient.Run()
            .ConfigureAwait(false);
        // Use the result in some way
        Trace.WriteLine(result);
    }
}

public class AsyncClient :
    MarshalByRefObject
{
    [OneWay]
    public string Callback(IAsyncResult ar)
    {
        var asyncDelegate = (Func<string>) ((AsyncResult) ar).AsyncDelegate;
        return asyncDelegate.EndInvoke(ar);
    }

    public Task<string> Run()
    {
        var remoteService = new RemoteService();

        Func<string> remoteCall = remoteService.TimeConsumingRemoteCall;

        return Task.Factory.FromAsync(
            beginMethod: (callback, state) =>
            {
                var call = (Tuple<Func<string>, AsyncClient>) state;
                return call.Item1.BeginInvoke(callback, state);
            },
            endMethod: asyncResult =>
            {
                var call = (Tuple<Func<string>, AsyncClient>) asyncResult.AsyncState;
                return call.Item2.Callback(asyncResult);
            },
            state: Tuple.Create(remoteCall, this));
    }
}

or use Task.Run directly in a message handler:

public class HandlerWhichIntegratesWithRemotingWithTask :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var result = await Task.Run(
            function: () =>
            {
                var remoteService = new RemoteService();
                return remoteService.TimeConsumingRemoteCall();
            })
            .ConfigureAwait(false);
        // Use the result in some way
        Trace.WriteLine(result);
    }
}
Task.Run can have significantly less overhead than using a delegate with BeginInvoke/EndInvoke. By default, both APIs will use the worker thread pool as the underlying scheduling engine. Analyze and measure for the business scenarios involved.

Last modified