Asynchronous Handlers

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

Introduction

It is difficult to give generic advice on how asynchronous code should be structured. It is important to understand compute-bound vs. IO-bound operations. Avoid copy-pasting snippets without further analysis if they provide benefit for the involved business scenarios. Don't assume. Measure it.

Handlers and Sagas will be invoked from an IO thread pool thread. Typically message handlers and sagas issue IO bound work like sending or publishing messages, storing information into databases, calling web services and more. In other cases, message handlers are used to schedule compute-bound work. To be able to write efficient message handlers and sagas it is crucial to understand the difference between compute-bound and IO bound work and the thread pools involved.

Thread Pools

There are two thread pools. The worker thread pool and the IO thread pool.

Further reading:

Worker thread pool

Parallel / Compute-bound blocking work happens on the worker thread pool. Things like Task.Run, Task.Factory.StartNew, Parallel.For schedule tasks on the worker thread pool.

Alternatively, if compute bound work is scheduled the worker thread pool will start expanding its worker threads (ramp-up phase). Ramping up more worker threads is expensive. The thread injection rate of the worker thread pool is limited.

Compute bound recommendations:

  • Offloading compute-bound work to the worker thread pool is a top-level concern only. Use Task.Run or Task.Factory.StartNew as high up in the call hierarchy as possible (e.g. in the Handle methods of either a Handler and Saga.
  • Avoid those operations deeper down in the call hierarchy.
  • Group compute-bound operations together as much as possible.
  • Make them coarse-grained instead of fine-grained.

IO-thread pool

IO-bound work is scheduled on the IO-thread pool. The IO-bound thread pool has a fixed number of worker threads (usually number of cores) which can work concurrently on thousands of IO-bound tasks. IO-bound work under Windows uses so-called IO completion ports (IOCP) to get notifications when an IO-bound operation is completed. IOCP enable efficient offloading of IO-bound work from the user code to the kernel, driver, and hardware without blocking the user code until the IO work is done. To achieve that the user code registers notifications in the form of a callback. The callback occurs on an IO thread which is a pool thread managed by the IO system that is made available to the user code.

IO-bound work typically takes very long, and compute-bound work is comparatively cheap. The IO system is optimized to keep the thread count low and schedule all callbacks, and therefore the execution of interleaved user code on that one thread. Due to those optimizations, all works get serialized, and there is minimal context switching as the OS scheduler owns the threads. In general asynchronous code can handle bursting traffic much better because of the "always-on" nature of the IOCP.

Memory and allocations

Asynchronous code tends to use much less memory because the amount of memory saved by freeing up a thread in the worker thread pool dwarfs the amount of memory used for all the compiler-generated async structures combined.

Synchronous vs. Asynchronous

If each request is examined in isolation, an asynchronous code would be slightly slower than the synchronous version. There might be extra kernel transitions, task scheduling, etc. involved. But the scalability more than makes up for it.

From a server perspective if asynchronous is compared to synchronous by looking at one method or one request at a time, then synchronous might make more sense. But if asynchronous is compared to parallelism - watching the server as a whole - asynchronous wins. Every worker thread that can be freed up on a server is worth freeing up. It reduces the amount of memory needed, frees up the CPU for compute-bound work while saturating the IO system completely.

Calling short running compute-bound code

Short running compute-bound code that is executed in the handler should be executed directly on the IO-thread that is executing the handler code.

Edit
public class ShortComputeBoundHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        ComputeBoundComponent.BlocksForAShortTime();
        // or Task.CompletedTask
        return Task.CompletedTask;
    }
}

Call the code directly and do not wrap it with a Task.Run or Task.Factory.StartNew.

For the majority of business scenarios this approach is acceptable since many of the asynchronous base class library methods in the .NET Framework will schedule continuations on the worker thread pool the likelihood that no IO-thread is blocked is high.

Calling long-running compute-bound code

This approach should only be used after a thorough analysis of the runtime behavior and the code involved in the call hierarchy of a handler. Wrapping code inside the handler with Task.Run or Task.Factory.StartNew can seriously harm the throughput if applied incorrectly. It should be used when multiple long-running compute-bound tasks need to be executed in parallel.

Long running compute-bound code that is executed in a handler could be offloaded to the worker thread pool.

Edit
public class LongComputeBoundHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var longRunning1 = Task.Run(() => ComputeBoundComponent.BlocksForALongTime());
        var longRunning2 = Task.Run(() => ComputeBoundComponent.BlocksForALongTime());
        return Task.WhenAll(longRunning1, longRunning2);
    }
}

Wrap the compute-bound code in a Task.Run or Task.Factory.StartNew and await the result of the task.

Return or await

Awaits the task

For the majority of cases it is sufficient to mark the handler's Handle method with the async keyword and await all asynchronous calls inside the method.

Edit
public class HandlerAwaitsTheTask :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await SomeLibrary.SomeAsyncMethod(message);
    }
}

Returns the task

For high-throughput scenarios and if there are only one or two asynchronous exit points in the Handle method the async keyword can be avoided completely by returning the task instead of awaiting it. This will omit the state machine creation which drives the async code and reduce the number of allocations on the given code path.

Edit
public class HandlerReturnsATask :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var task = SomeLibrary.SomeAsyncMethod(message);
        return task;
    }
}
Edit
public class HandlerReturnsTwoTasks :
    IHandleMessages<MyMessage>
{
    bool someCondition = true;

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        if (someCondition)
        {
            // Task.CompletedTask
            return Task.CompletedTask;
        }

        return SomeLibrary.SomeAsyncMethod(message);
    }
}

Usage of ConfigureAwait

By default when a task is awaited a mechanism called context capturing is enabled. The current context is captured and restored for the continuation that is scheduled after the precedent task was completed.

Edit
public class HandlerConfigureAwaitNotSpecified :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await SomeLibrary.SomeAsyncMethod(message);
        await SomeLibrary.AnotherAsyncMethod(message);
    }
}

In the snippet above SomeAsyncMethod and AnotherAsyncMethod are awaited. So when entering SomeAsyncMethod the context is captured and restored before AnotherAsyncMethod is executed. The context capturing mechanism is almost never needed in code that is executed inside handlers or sagas. NServiceBus makes sure the context is not captured in the framework at all. So the following approach is preferred:

Edit
public class HandlerConfigureAwaitSpecified :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        await SomeLibrary.SomeAsyncMethod(message)
            .ConfigureAwait(false);
        await SomeLibrary.AnotherAsyncMethod(message)
            .ConfigureAwait(false);
    }
}

Specify ConfigureAwait(false) on each awaited statement. Apply this principle to all asynchronous code that is called from handlers and sagas.

Concurrency

Task based APIs enable to better compose asynchronous code and making conscious decisions whether to execute the asynchronous code sequentially or concurrent.

Small amount of concurrent message operations

Batched

By default, all outgoing message operations on the message handler contexts are batched. Batching means messages are kept in memory and sent out when the handler is completed. So the IO-bound work happens outside the execution scope of a handler (individual transports may apply optimizations). For a few outgoing message operations it makes sense, to reduce complexity, to sequentially await all the outgoing operations as shown below.

Edit
public class BatchedDispatchHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        for (var i = 0; i < 100; i++)
        {
            var myMessage = new MyMessage();
            await context.Send(myMessage)
                .ConfigureAwait(false);
        }
    }
}

Immediate dispatch

Immediate dispatch means outgoing message operations will be immediately dispatched to the underlying transport. For immediate dispatch operations, it might make sense execute them concurrently like shown below.

Edit
public class ImmediateDispatchHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var tasks = new Task[100];
        for (var i = 0; i < 100; i++)
        {
            var options = new SendOptions();
            options.RequireImmediateDispatch();

            var myMessage = new MyMessage();
            tasks[i] = context.Send(myMessage, options);
        }
        return Task.WhenAll(tasks);
    }
}

Large amount of concurrent message operations

Unbounded concurrency can be problematic. For large numbers of concurrent message operations, it might make sense to package multiple outgoing operations together into batches. Therefore limiting the concurrency to the size of an individual batch (divide & conquer).

Edit
public class PacketsImmediateDispatchHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        for (var i = 0; i < 100; i++)
        {
            var tasks = new Task[100];
            for (var j = 0; j < 100; j++)
            {
                var options = new SendOptions();
                options.RequireImmediateDispatch();
                var myMessage = new MyMessage();
                tasks[j] = context.Send(myMessage, options);
            }
            await Task.WhenAll(tasks)
                .ConfigureAwait(false);
        }
    }
}

It is also possible to limit the concurrency by using SemaphoreSlim like shown below.

Edit
public class LimitConcurrencyImmediateDispatchHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var semaphore = new SemaphoreSlim(100);

        var tasks = new Task[10000];
        for (var i = 0; i < 10000; i++)
        {
            await semaphore.WaitAsync()
                .ConfigureAwait(false);

            tasks[i] = Send(context, semaphore);
        }
        await Task.WhenAll(tasks)
            .ConfigureAwait(false);
    }

    static async Task Send(IMessageHandlerContext context, SemaphoreSlim semaphore)
    {
        try
        {
            var options = new SendOptions();
            options.RequireImmediateDispatch();
            var message = new MyMessage();
            await context.Send(message, options)
                .ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
    }
}

In practice packaging operations together has proved to be more effective both in regards to memory allocations and performance. The snippet is shown nonetheless for completeness reasons as well as because SemaphoreSlim is a useful concurrency primitive for various scenarios.

Integration with non-tasked based APIs

Events

Sometimes it is necessary to call APIs from an asynchronous handler that use events as the trigger for completion. Before async/await was introduced ManualResetEvent or AutoResetEvent were usually used to synchronize runtime code flow. Unfortunately, these synchronization primitives are of blocking nature. For asynchronous one-time event synchronization the TaskCompletionSource<TResult> can be used.

Edit
public class HandlerWhichIntegratesWithEvent :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var cancellationToken = new CancellationTokenSource();
        cancellationToken.CancelAfter(TimeSpan.FromSeconds(10));

        var taskCompletionSource = new TaskCompletionSource<object>();

        using (cancellationToken.Token.Register(
            callback: state =>
            {
                var completionSource = (TaskCompletionSource<object>) state;
                completionSource.TrySetCanceled();
            },
            state: taskCompletionSource))
        {
            var dependency = new DependencyWhichRaisedEvent();
            dependency.MyEvent += (sender, args) =>
            {
                taskCompletionSource.TrySetResult(null);
            };

            await taskCompletionSource.Task
                .ConfigureAwait(false);
        }
    }
}

The above snippet shows how a TaskCompletionSource<TResult> can be used to asynchronously wait for an event to happen and optionally cancel it.

Asynchronous programming model (APM) pattern

For existing code which uses the Asynchronous Programming Model (APM) it is best to use Task.Factory.FromAsync to wrap the BeginOperationName and EndOperationName methods with a task object.

Edit
public class HandlerWhichIntegratesWithAPM :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var dependency = new DependencyWhichUsesAPM();

        var result = await Task.Factory.FromAsync(
                beginMethod: (callback, state) =>
                {
                    var d = (DependencyWhichUsesAPM) state;
                    return d.BeginCall(callback, state);
                },
                endMethod: asyncResult =>
                {
                    var d = (DependencyWhichUsesAPM) asyncResult.AsyncState;
                    return d.EndCall(asyncResult);
                },
                state: dependency)
            .ConfigureAwait(false);

        // Use the result in some way
        Trace.WriteLine(result);
    }
}

Asynchronous RPC calls

The APM approach described above can be used to integrate with remote procedure calls as shown in this snippet:

Edit
public class HandlerWhichIntegratesWithRemotingWithAPM :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var asyncClient = new AsyncClient();
        var result = await asyncClient.Run()
            .ConfigureAwait(false);
        // Use the result in some way
        Trace.WriteLine(result);
    }
}

public class AsyncClient :
    MarshalByRefObject
{
    [OneWay]
    public string Callback(IAsyncResult ar)
    {
        var asyncDelegate = (Func<string>) ((AsyncResult) ar).AsyncDelegate;
        return asyncDelegate.EndInvoke(ar);
    }

    public Task<string> Run()
    {
        var remoteService = new RemoteService();

        Func<string> remoteCall = remoteService.TimeConsumingRemoteCall;

        return Task.Factory.FromAsync(
            beginMethod: (callback, state) =>
            {
                var call = (Tuple<Func<string>, AsyncClient>) state;
                return call.Item1.BeginInvoke(callback, state);
            },
            endMethod: asyncResult =>
            {
                var call = (Tuple<Func<string>, AsyncClient>) asyncResult.AsyncState;
                return call.Item2.Callback(asyncResult);
            },
            state: Tuple.Create(remoteCall, this));
    }
}

or use Task.Run directly in message handler:

Edit
public class HandlerWhichIntegratesWithRemotingWithTask :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var result = await Task.Run(
            function: () =>
            {
                var remoteService = new RemoteService();
                return remoteService.TimeConsumingRemoteCall();
            })
            .ConfigureAwait(false);
        // Use the result in some way
        Trace.WriteLine(result);
    }
}
Task.Run can have significantly less overhead than using a delegate with BeginInvoke/EndInvoke. By default, both APIs will use the worker thread pool as the underlying scheduling engine. Analyze and measure for the business scenarios involved.

Last modified