Getting Started

Message Throughput Throttling

Component: NServiceBus
NuGet Package: NServiceBus (8-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

Systems often need to integrate with 3rd party services, some of which may limit the number of concurrent requests they process.

This sample demonstrates an integration with the GitHub API using the Octokit library. It runs in unauthenticated mode, which is limited to a fixed number of requests per minute. Upon hitting this limit, the endpoint will delay processing additional messages, until the limit resets.

The solution consists of two endpoints; Sender and Limited. The reason two endpoints are required in this scenario is that NServiceBus does not support limiting messages by message type. So, to limit only a specific message type, a separate endpoint is used for it.


The Sender is a normal endpoint that sends the SearchGitHub message and the handles the reply SearchResponse message.


The message sending occurs at startup.

var endpointInstance = await Endpoint.Start(endpointConfiguration)
Console.WriteLine("Sending messages...");
for (var i = 0; i < 100; i++)
    var searchGitHub = new SearchGitHub
        Repository = "NServiceBus",
        Owner = "Particular",
        SearchFor = "IBus"
    await endpointInstance.Send("Samples.Throttling.Limited", searchGitHub)

Handling the response

Handling the GitHubSearchResponse message.

public class GitHubSearchResponseHandler :
    static ILog log = LogManager.GetLogger<GitHubSearchResponseHandler>();

    public Task Handle(SearchResponse message, IMessageHandlerContext context)
        log.Info($"Found {message.TotalCount} results for {message.SearchedFor}.");
        return Task.CompletedTask;


This endpoint is limited to processing one message at a time and uses a pipeline behavior to handle when the processing limit of Octokit has been exceeded.

Configure to process only one concurrent message

Limits the endpoint concurrency.

var endpointConfiguration = new EndpointConfiguration("Samples.Throttling.Limited");

Search Handler

Performs the Octokit search.

public class GitHubSearchHandler :
    static ILog log = LogManager.GetLogger<GitHubSearchHandler>();

    // use anonymous access which has strict rate limitations
    GitHubClient GitHubClient = new GitHubClient(new ProductHeaderValue("ThroughputThrottling"));

    public async Task Handle(SearchGitHub message, IMessageHandlerContext context)
        log.Info($"Received search for '{message.SearchFor}' on {message.Owner}/{message.Repository}");

        var request = new SearchCodeRequest(
        var result = await GitHubClient.Search.SearchCode(request)
        log.Info($"Found {result.TotalCount} results for {message.SearchFor}. Replying.");
        var response = new SearchResponse
            SearchedFor = message.SearchFor,
            TotalCount = result.TotalCount
        await context.Reply(response)

Registering the behavior in pipeline

endpointConfiguration.Pipeline.Register(typeof(ThrottlingBehavior), "API throttling for GitHub");

The pipeline behavior

Handles the detection of Octokit.RateLimitExceededException and defers the message.

public class ThrottlingBehavior :
    static ILog log = LogManager.GetLogger<ThrottlingBehavior>();
    static DateTime? nextRateLimitReset;

    public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
        var rateLimitReset = nextRateLimitReset;
        if (rateLimitReset.HasValue && rateLimitReset >= DateTime.UtcNow)
            var localTime = rateLimitReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Retry after {rateLimitReset} UTC ({localTime} local).");
            await DelayMessage(context, rateLimitReset.Value)

            await next()
        catch (RateLimitExceededException exception)
            var nextReset = nextRateLimitReset = exception.Reset.UtcDateTime;
            var localTime = nextReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Limit resets at {nextReset} UTC ({localTime} local).");
            await DelayMessage(context, nextReset.Value)

    Task DelayMessage(IInvokeHandlerContext context, DateTime deliverAt)
        var sendOptions = new SendOptions();

        // delay the message to the specified delivery date
        // send message to this endpoint
        // maintain the original ReplyTo address
        if (context.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress))

        return context.Send(context.MessageBeingHandled, sendOptions);
The behavior sends a copy of the original message, but does not copy the headers of the original message. If the headers of the original message are required, they must be copied from context.Headers to SendOptions.

Related Articles

Last modified