Message Throughput Throttling

Component: NServiceBus
NuGet Package NServiceBus (7.x)

Systems often need to integrate with 3rd party services, some of which may limit the number of concurrent requests they process.

This sample demonstrates an integration with the GitHub API using the Octokit library. It runs in unauthenticated mode, which is limited to a fixed number of requests per minute. Upon hitting this limit, the endpoint will delay processing additional messages, until the limit resets.

The solution consists of two endpoints; Sender and Limited. The reason two endpoints are required in this scenario is that NServiceBus does not support limiting messages by message type. So, to limit only a specific message type, a separate endpoint is used for it.

Sender

The Sender is a normal endpoint that sends the SearchGitHub message and the handles the reply SearchResponse message.

Sending

The message sending occurs at startup.

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);
Console.WriteLine("Sending messages...");
for (var i = 0; i < 100; i++)
{
    var searchGitHub = new SearchGitHub
    {
        Repository = "NServiceBus",
        Owner = "Particular",
        SearchFor = "IBus"
    };
    await endpointInstance.Send("Samples.Throttling.Limited", searchGitHub)
        .ConfigureAwait(false);
}

Handling the response

Handling the GitHubSearchResponse message.

public class GitHubSearchResponseHandler :
    IHandleMessages<SearchResponse>
{
    static ILog log = LogManager.GetLogger<GitHubSearchResponseHandler>();

    public Task Handle(SearchResponse message, IMessageHandlerContext context)
    {
        log.Info($"Found {message.TotalCount} results for {message.SearchedFor}.");
        return Task.CompletedTask;
    }
}

Limited

This endpoint is limited to processing one message at a time and uses a pipeline behavior to handle when the processing limit of Octokit has been exceeded.

Configure to process only one concurrent message

Limits the endpoint concurrency.

var endpointConfiguration = new EndpointConfiguration("Samples.Throttling.Limited");
endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);

Search Handler

Performs the Octokit search.

public class GitHubSearchHandler :
    IHandleMessages<SearchGitHub>
{
    static ILog log = LogManager.GetLogger<GitHubSearchHandler>();

    // use anonymous access which has strict rate limitations
    GitHubClient GitHubClient = new GitHubClient(new ProductHeaderValue("ThroughputThrottling"));

    public async Task Handle(SearchGitHub message, IMessageHandlerContext context)
    {
        log.Info($"Received search for '{message.SearchFor}' on {message.Owner}/{message.Repository}");

        var request = new SearchCodeRequest(
            message.SearchFor,
            message.Owner,
            message.Repository);
        var result = await GitHubClient.Search.SearchCode(request)
            .ConfigureAwait(false);
        log.Info($"Found {result.TotalCount} results for {message.SearchFor}. Replying.");
        var response = new SearchResponse
        {
            SearchedFor = message.SearchFor,
            TotalCount = result.TotalCount
        };
        await context.Reply(response)
            .ConfigureAwait(false);
    }
}

Registering the behavior in pipeline

endpointConfiguration.Pipeline.Register(typeof(ThrottlingBehavior), "API throttling for GitHub");

The pipeline behavior

Handles the detection of Octokit.RateLimitExceededException and defers the message.

public class ThrottlingBehavior :
    Behavior<IInvokeHandlerContext>
{
    static ILog log = LogManager.GetLogger<ThrottlingBehavior>();
    static DateTime? nextRateLimitReset;

    public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
    {
        var rateLimitReset = nextRateLimitReset;
        if (rateLimitReset.HasValue && rateLimitReset >= DateTime.UtcNow)
        {
            var localTime = rateLimitReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Retry after {rateLimitReset} UTC ({localTime} local).");
            await DelayMessage(context, rateLimitReset.Value)
                .ConfigureAwait(false);
        }

        try
        {
            await next()
                .ConfigureAwait(false);
        }
        catch (RateLimitExceededException exception)
        {
            var nextReset = nextRateLimitReset = exception.Reset.UtcDateTime;
            var localTime = nextReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Limit resets at {nextReset} UTC ({localTime} local).");
            await DelayMessage(context, nextReset.Value)
                .ConfigureAwait(false);
        }
    }

    Task DelayMessage(IInvokeHandlerContext context, DateTime deliverAt)
    {
        var sendOptions = new SendOptions();

        // delay the message to the specified delivery date
        sendOptions.DoNotDeliverBefore(deliverAt);
        // send message to this endpoint
        sendOptions.RouteToThisEndpoint();
        // maintain the original ReplyTo address
        if (context.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress))
        {
            sendOptions.RouteReplyTo(replyAddress);
        }

        return context.Send(context.MessageBeingHandled, sendOptions);
    }
}

Related Articles


Last modified