Message Throughput Throttling

Component: NServiceBus
NuGet Package NServiceBus (5.x)

This sample demonstrates performing a search query against the GitHub API using the Octokit library. It runs in unauthenticated mode, which is limited to a fixed number of requests per minute. Upon hitting this limit, the endpoint will delay processing additional messages, until the limit resets.

The solution consists of two endpoints; Sender and Limited. The reason two multiple are required in this scenario is are that NServiceBus does not support limiting messages by message type. So to limit only a specific message type then an endpoint is required to handle that message type.

Sender

The Sender is a normal endpoint that sends the SearchGitHub message and the handles the reply SearchResponse message.

Sending

The message sending occurs at startup.

for (var i = 0; i < 100; i++)
{
    var searchGitHub = new SearchGitHub
    {
        Repository = "NServiceBus",
        Owner = "Particular",
        SearchFor = "IBus"
    };
    bus.Send("Samples.Throttling.Limited",searchGitHub);
}

Search Response

Handling the reply.

public class GitHubSearchResponseHandler :
    IHandleMessages<SearchResponse>
{
    static ILog log = LogManager.GetLogger<GitHubSearchResponseHandler>();

    public void Handle(SearchResponse message)
    {
        log.Info($"Found {message.TotalCount} results for {message.SearchedFor}.");
    }
}

Limited

This endpoint is limited to 1 concurrent message processing and uses a pipeline behavior to handle when the processing limit of Octokit has been exceeded.

Configure to use 1 concurrent process

Limits the endpoint concurrency.

public class LimitConcurrencyLevel :
    IProvideConfiguration<TransportConfig>
{
    public TransportConfig GetConfiguration()
    {
        return new TransportConfig
        {
            MaximumConcurrencyLevel = 1,
        };
    }
}

Search Handler

Performs the Octokit search.

public class GitHubSearchHandler :
    IHandleMessages<SearchGitHub>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<GitHubSearchHandler>();

    // use anonymous access which has strict rate limitations
    GitHubClient GitHubClient = new GitHubClient(new ProductHeaderValue("ThroughputThrottling"));

    public GitHubSearchHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(SearchGitHub message)
    {
        log.Info($"Received search for '{message.SearchFor}' on {message.Owner}/{message.Repository}");

        var request = new SearchCodeRequest(
            message.SearchFor,
            message.Owner,
            message.Repository);
        var result = GitHubClient.Search.SearchCode(request).GetAwaiter().GetResult();
        log.Info($"Found {result.TotalCount} results for {message.SearchFor}. Replying.");
        var response = new SearchResponse
        {
            SearchedFor = message.SearchFor,
            TotalCount = result.TotalCount
        };
        bus.Reply(response);
    }
}

Registering the behavior in pipeline

busConfiguration.Pipeline.Register<ThrottlingRegistration>();
class ThrottlingRegistration :
    RegisterStep
{
    public ThrottlingRegistration()
        : base("GitHubApiThrottling", typeof(ThrottlingBehavior), "API throttling for GitHub")
    {
        InsertBefore(WellKnownStep.InvokeHandlers);
    }
}

The Behavior

Handles detection of Octokit.RateLimitExceededException and defers the message.

public class ThrottlingBehavior :
    IBehavior<IncomingContext>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<ThrottlingBehavior>();
    static DateTime? nextRateLimitReset;

    public ThrottlingBehavior(IBus bus)
    {
        this.bus = bus;
    }

    public void Invoke(IncomingContext context, Action next)
    {
        var rateLimitReset = nextRateLimitReset;
        if (rateLimitReset.HasValue && rateLimitReset >= DateTime.UtcNow)
        {
            var localTime = rateLimitReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Retry after {rateLimitReset} UTC ({localTime} local).");
            DelayMessage(context, rateLimitReset.Value);
            return;
        }

        try
        {
            next();
        }
        catch (RateLimitExceededException exception)
        {
            var nextReset = nextRateLimitReset = exception.Reset.UtcDateTime;
            var localTime = nextReset?.ToLocalTime();
            log.Info($"Rate limit exceeded. Limit resets at {nextReset} UTC ({localTime} local).");
            DelayMessage(context, nextReset.Value);
        }
    }

    void DelayMessage(IncomingContext context, DateTime deliverAt)
    {
        var message = context.LogicalMessages.Single().Instance;
        // maintain the original ReplyTo address
        if (context.PhysicalMessage.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress))
        {
            bus.SetMessageHeader(message, Headers.ReplyToAddress, replyAddress);
        }
        // delay the message to the specified delivery date
        bus.Defer(deliverAt, message);
    }
}

Related Articles


Last modified