It is difficult to give generic advice on how asynchronous code should be structured. It is important to understand compute-bound vs. I/O-bound operations and avoid copying and pasting snippets without analysing the benefits they provide for a given business scenarios. Don't assume; measure it.
Handlers and sagas are executed by threads from the thread pool. Depending on the transport implementation the worker thread pool thread or the I/O thread pool thread might be used. Typically message handlers and sagas issue I/O-bound work, such as sending or publishing messages, storing information into databases, and calling web services. In other cases, message handlers are used to schedule compute-bound work. To be able to write efficient message handlers and sagas, it is crucial to understand the difference between those scenarios.
Thread pool
A thread pool is associated with a process and manages the execution of asynchronous callbacks on behalf of the application. Its primary purpose is to reduce the number of application threads and provide efficient management of threads. Every thread pool manages a pool of threads designated to handle one class of workload: either I/O-bound or compute-bound work.
Further reading:
- Overview of the Threadpool
- CLR 4.0 Threadpool improvements
- Thread Pools
- Thread Pooling
- I/O Completion Ports
Worker thread pool
Parallel / Compute-bound blocking work happens on the worker thread pool. Things like Task.
, Task.
, or Parallel.
schedule tasks on the worker thread pool.
Whenever a compute-bound work is scheduled, the worker thread pool will start expanding its worker threads (ramp-up phase). Ramping up more worker threads is expensive. The thread injection rate of the worker thread pool is limited.
Compute-bound recommendations:
- Manual scheduling of compute-bound work to the worker thread pool is a top-level concern only. Use
Task.
orRun Task.
as high up in the call hierarchy as possible (e.g. in theFactory. StartNew Handle
methods of either a handler or saga. - Avoid those operations deeper in the call hierarchy.
- Group compute-bound operations together as much as possible.
- Make compute-bound operations coarse-grained instead of fine-grained.
I/O-thread pool
I/O-bound work is scheduled on the I/O-thread pool. The I/O-bound thread pool has a fixed number of worker threads (usually equal to the number of cores) which can work concurrently on thousands of I/O-bound tasks. I/O-bound work under Windows uses I/O completion ports (IOCP) to get notifications when an I/O-bound operation is completed. IOCP enables efficient offloading of I/O-bound work from the user code to the kernel, driver, and hardware without blocking the user code until the I/O work is done. To achieve that, the user code registers notifications in the form of a callback. The callback occurs on an I/O thread which is a pool thread managed by the I/O system that is made available to the user code.
I/O-bound work typically takes longer to complete compared to compute-bound work. The I/O system is optimized to keep the thread count low and schedule all callbacks, and therefore the execution of interleaved user code on that one thread. Due to those optimizations, all work gets serialized, and there is minimal context switching as the OS scheduler owns the threads. In general, asynchronous code can handle bursting traffic much better because of the "always-on" nature of the IOCP.
Memory and allocations
Asynchronous code tends to use much less memory because the amount of memory saved by freeing up a thread in the worker thread pool dwarfs the amount of memory used for all the compiler-generated async structures combined.
Synchronous vs. asynchronous
If each request is examined in isolation, asynchronous code would be slightly slower than the corresponding synchronous version. There might be extra kernel transitions, task scheduling, etc. involved but the scalability more than makes up for it.
From a server perspective, if asynchronous code is compared to synchronous code by looking at one method or one request at a time, then synchronous might make more sense. But if asynchronous code is compared to parallelism — watching the server as a whole — asynchronous wins. Every worker thread that can be freed up on a server is worth freeing up. It reduces the amount of memory needed and frees up the CPU for compute-bound work while saturating the I/O system completely.
Calling short-running, compute-bound code
Short-running, compute-bound code that is executed in the handler should be executed directly on the I/O-thread that is executing the handler code.
public class ShortComputeBoundHandler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
ComputeBoundComponent.BlocksForAShortTime();
return Task.CompletedTask;
}
}
Call the code directly and do not wrap it with a Task.
or Task.
.
For the majority of business scenarios, this approach is acceptable since many of the asynchronous base class library methods in the .NET Framework will schedule continuations on the worker thread pool; the likelihood that no I/O-thread is blocked is high.
Calling long-running, compute-bound code
This approach should be used only after a thorough analysis of the runtime behavior and the code involved in the call hierarchy of a handler. Wrapping code inside the handler with Task.
or Task.
can seriously harm the throughput if applied incorrectly. It should be used when multiple long-running compute-bound tasks need to be executed in parallel.
Long-running compute-bound code that is executed in a handler could be offloaded to the worker thread pool.
public class LongComputeBoundHandler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
var longRunning1 = Task.Run(() => ComputeBoundComponent.BlocksForALongTime());
var longRunning2 = Task.Run(() => ComputeBoundComponent.BlocksForALongTime());
return Task.WhenAll(longRunning1, longRunning2);
}
}
Wrap the compute-bound code in a Task.
or Task.
and await
the result of the task.
Return or await
Await the task
For the majority of cases, it is sufficient to mark the handler's Handle
method with the async
keyword and await
all asynchronous calls inside the method.
public class HandlerAwaitsTheTask :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
await SomeLibrary.SomeAsyncMethod(message);
}
}
Return the task
For high-throughput scenarios and if there are only one or two asynchronous exit points in the Handle method, the async
keyword can be avoided completely by returning the task instead of awaiting it. This will omit the state machine creation which drives the async code and reduce the number of allocations on the given code path.
public class HandlerReturnsATask :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
var task = SomeLibrary.SomeAsyncMethod(message);
return task;
}
}
public class HandlerReturnsTwoTasks :
IHandleMessages<MyMessage>
{
bool someCondition = true;
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
if (someCondition)
{
// Task.CompletedTask
return Task.CompletedTask;
}
return SomeLibrary.SomeAsyncMethod(message);
}
}
Concurrency
Task-based APIs enable better composition of asynchronous code and allow conscious decisions on whether to execute the asynchronous code sequentially or concurrently.
Small amount of concurrent message operations
Batched
By default, all outgoing message operations on the message handler contexts are batched. Batching means messages are kept in memory and sent out when the handler is completed. So the I/O-bound work happens outside the execution scope of a handler (individual transports may apply optimizations). For a few outgoing message operations it makes sense, to reduce complexity, to sequentially await all the outgoing operations as shown below.
public class BatchedDispatchHandler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
for (var i = 0; i < 100; i++)
{
var myMessage = new MyMessage();
await context.Send(myMessage);
}
}
}
Immediate dispatch
Immediate dispatch means outgoing message operations will be immediately dispatched to the underlying transport. For immediate dispatch operations, it might make sense to execute them concurrently as shown below.
public class ImmediateDispatchHandler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
var tasks = new Task[100];
for (var i = 0; i < 100; i++)
{
var options = new SendOptions();
options.RequireImmediateDispatch();
var myMessage = new MyMessage();
tasks[i] = context.Send(myMessage, options);
}
return Task.WhenAll(tasks);
}
}
Large amount of concurrent message operations
Unbounded concurrency can be problematic. For large numbers of concurrent message operations, it might be preferable to package multiple outgoing operations together into batches, limiting the concurrency to the size of an individual batch (divide & conquer).
public class PacketsImmediateDispatchHandler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
for (var i = 0; i < 100; i++)
{
var tasks = new Task[100];
for (var j = 0; j < 100; j++)
{
var options = new SendOptions();
options.RequireImmediateDispatch();
var myMessage = new MyMessage();
tasks[j] = context.Send(myMessage, options);
}
await Task.WhenAll(tasks);
}
}
}
It is also possible to limit the concurrency by using SemaphoreSlim
as shown below.
public class LimitConcurrencyImmediateDispatchHandler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var semaphore = new SemaphoreSlim(100);
var tasks = new Task[10000];
for (var i = 0; i < 10000; i++)
{
await semaphore.WaitAsync();
tasks[i] = Send(context, semaphore);
}
await Task.WhenAll(tasks);
}
static async Task Send(IMessageHandlerContext context, SemaphoreSlim semaphore)
{
try
{
var options = new SendOptions();
options.RequireImmediateDispatch();
var message = new MyMessage();
await context.Send(message, options);
}
finally
{
semaphore.Release();
}
}
}
In practice, packaging operations together has proven to be more effective both in regards to memory allocations and performance. The snippet is shown nonetheless for completeness reasons as well as because SemaphoreSlim
is a useful concurrency primitive for various scenarios.
Integration with non-tasked based APIs
Events
Sometimes it is necessary to call APIs from an asynchronous handler that uses events as the trigger for completion. Before async
/await
was introduced, ManualResetEvent
or AutoResetEvent
were usually used to synchronize runtime code flow. Unfortunately, these synchronization primitives are of a blocking nature. For asynchronous one-time event synchronization, the TaskCompletionSource
can be used.
public class HandlerWhichIntegratesWithEvent :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var cancellationToken = new CancellationTokenSource();
cancellationToken.CancelAfter(TimeSpan.FromSeconds(10));
var taskCompletionSource = new TaskCompletionSource<object>();
using (cancellationToken.Token.Register(
callback: state =>
{
var completionSource = (TaskCompletionSource<object>)state;
completionSource.TrySetCanceled();
},
state: taskCompletionSource))
{
var dependency = new DependencyWhichRaisedEvent();
dependency.MyEvent += (sender, args) =>
{
taskCompletionSource.TrySetResult(null);
};
await taskCompletionSource.Task;
}
}
}
The above snippet shows how a TaskCompletionSource
can be used to asynchronously wait for an event to happen and optionally cancel it.
Asynchronous programming model (APM) pattern
For existing code which uses the Asynchronous Programming Model (APM), it is best to use Task.
to wrap the BeginOperationName
and EndOperationName
methods with a task object.
public class HandlerWhichIntegratesWithAPM :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var dependency = new DependencyWhichUsesAPM();
var result = await Task.Factory.FromAsync(
beginMethod: (callback, state) =>
{
var d = (DependencyWhichUsesAPM)state;
return d.BeginCall(callback, state);
},
endMethod: asyncResult =>
{
var d = (DependencyWhichUsesAPM)asyncResult.AsyncState;
return d.EndCall(asyncResult);
},
state: dependency);
// Use the result in some way
Trace.WriteLine(result);
}
}
Asynchronous RPC calls
The APM approach described above can be used to integrate with remote procedure calls as shown in this snippet:
public class HandlerWhichIntegratesWithRemotingWithAPM :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var asyncClient = new AsyncClient();
var result = await asyncClient.Run();
// Use the result in some way
Trace.WriteLine(result);
}
}
public class AsyncClient :
MarshalByRefObject
{
[OneWay]
public string Callback(IAsyncResult ar)
{
var asyncDelegate = (Func<string>)((AsyncResult)ar).AsyncDelegate;
return asyncDelegate.EndInvoke(ar);
}
public Task<string> Run()
{
var remoteService = new RemoteService();
Func<string> remoteCall = remoteService.TimeConsumingRemoteCall;
return Task.Factory.FromAsync(
beginMethod: (callback, state) =>
{
var call = (Tuple<Func<string>, AsyncClient>)state;
return call.Item1.BeginInvoke(callback, state);
},
endMethod: asyncResult =>
{
var call = (Tuple<Func<string>, AsyncClient>)asyncResult.AsyncState;
return call.Item2.Callback(asyncResult);
},
state: Tuple.Create(remoteCall, this));
}
}
or use Task.
directly in a message handler:
public class HandlerWhichIntegratesWithRemotingWithTask :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var result = await Task.Run(
function: () =>
{
var remoteService = new RemoteService();
return remoteService.TimeConsumingRemoteCall();
});
// Use the result in some way
Trace.WriteLine(result);
}
}
Task.
can have significantly less overhead than using a delegate with BeginInvoke
/EndInvoke
. By default, both APIs will use the worker thread pool as the underlying scheduling engine. Analyze and measure for the business scenarios involved.