Message Throughput Throttling

Component: NServiceBus
NuGet Package NServiceBus (5.x)

Systems often need to integrate with 3rd party services, some of which may limit the number of concurrent requests they process.

This sample demonstrates an integration with the GitHub API using the Octokit library. It runs in unauthenticated mode, which is limited to a fixed number of requests per minute. Upon hitting this limit, the endpoint will delay processing additional messages, until the limit resets.

The solution consists of two endpoints; Sender and Limited. The reason two endpoints are required in this scenario is that NServiceBus does not support limiting messages by message type. So, to limit only a specific message type, a separate endpoint is used for it.

Sender

The Sender is a normal endpoint that sends the SearchGitHub message and the handles the reply SearchResponse message.

Sending

The message sending occurs at startup.

for (var i = 0; i < 100; i++)
{
    var searchGitHub = new SearchGitHub
    {
        Repository = "NServiceBus",
        Owner = "Particular",
        SearchFor = "IBus"
    };
    bus.Send("Samples.Throttling.Limited",searchGitHub);
}

Handling the response

Handling the GitHubSearchResponse message.

public class GitHubSearchResponseHandler :
    IHandleMessages<SearchResponse>
{
    static ILog log = LogManager.GetLogger<GitHubSearchResponseHandler>();

    public void Handle(SearchResponse message)
    {
        log.Info($"Found {message.TotalCount} results for {message.SearchedFor}.");
    }
}

Limited

This endpoint is limited to processing one message at a time and uses a pipeline behavior to handle when the processing limit of Octokit has been exceeded.

Configure to process only one concurrent message

Limits the endpoint concurrency.

public class LimitConcurrencyLevel :
    IProvideConfiguration<TransportConfig>
{
    public TransportConfig GetConfiguration()
    {
        return new TransportConfig
        {
            MaximumConcurrencyLevel = 1,
        };
    }
}

Search Handler

Performs the Octokit search.

public class GitHubSearchHandler :
    IHandleMessages<SearchGitHub>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<GitHubSearchHandler>();

    // use anonymous access which has strict rate limitations
    GitHubClient GitHubClient = new GitHubClient(new ProductHeaderValue("ThroughputThrottling"));

    public GitHubSearchHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(SearchGitHub message)
    {
        log.Info($"Received search for '{message.SearchFor}' on {message.Owner}/{message.Repository}");

        var request = new SearchCodeRequest(
            message.SearchFor,
            message.Owner,
            message.Repository);
        var result = GitHubClient.Search.SearchCode(request).GetAwaiter().GetResult();
        log.Info($"Found {result.TotalCount} results for {message.SearchFor}. Replying.");
        var response = new SearchResponse
        {
            SearchedFor = message.SearchFor,
            TotalCount = result.TotalCount
        };
        bus.Reply(response);
    }
}

Registering the behavior in pipeline

busConfiguration.Pipeline.Register<ThrottlingBehavior.Registration>();

The pipeline behavior

Handles the detection of Octokit.RateLimitExceededException and defers the message.

public class ThrottlingBehavior :
    IBehavior<IncomingContext>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<ThrottlingBehavior>();
    static DateTime? nextRateLimitReset;

    public ThrottlingBehavior(IBus bus)
    {
        this.bus = bus;
    }

    public void Invoke(IncomingContext context, Action next)
    {
        var rateLimitReset = nextRateLimitReset;
        if (rateLimitReset.HasValue && rateLimitReset >= DateTime.UtcNow)
        {
            var localTime = rateLimitReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Retry after {rateLimitReset} UTC ({localTime} local).");
            DelayMessage(context, rateLimitReset.Value);
            return;
        }

        try
        {
            next();
        }
        catch (RateLimitExceededException exception)
        {
            var nextReset = nextRateLimitReset = exception.Reset.UtcDateTime;
            var localTime = nextReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Limit resets at {nextReset} UTC ({localTime} local).");
            DelayMessage(context, nextReset.Value);
        }
    }

    void DelayMessage(IncomingContext context, DateTime deliverAt)
    {
        var message = context.LogicalMessages.Single().Instance;
        // maintain the original ReplyTo address
        if (context.PhysicalMessage.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress))
        {
            bus.SetMessageHeader(message, Headers.ReplyToAddress, replyAddress);
        }
        // delay the message to the specified delivery date
        bus.Defer(deliverAt, message);
    }

    public class Registration :
        RegisterStep
    {
        public Registration()
            : base("GitHubApiThrottling", typeof(ThrottlingBehavior), "API throttling for GitHub")
        {
            InsertBefore(WellKnownStep.InvokeHandlers);
        }
    }
}

Related Articles


Last modified