This sample demonstrates performing a search query against the GitHub API using the Octokit library. It runs in unauthenticated mode, which is limited to a fixed number of requests per minute. Upon hitting this limit, the endpoint will delay processing additional messages, until the limit resets.
The solution consists of two endpoints; Sender and Limited. The reason two multiple are required in this scenario is are that NServiceBus does not support limiting messages by message type. So to limit only a specific message type then an endpoint is required to handle that message type.
Sender
The Sender is a normal endpoint that sends the SearchGitHub
message and the handles the reply SearchResponse
message.
Sending
The message sending occurs at startup.
for (var i = 0; i < 100; i++)
{
var searchGitHub = new SearchGitHub
{
Repository = "NServiceBus",
Owner = "Particular",
SearchFor = "IBus"
};
bus.Send("Samples.Throttling.Limited",searchGitHub);
}
Search Response
Handling the reply.
public class GitHubSearchResponseHandler :
IHandleMessages<SearchResponse>
{
static ILog log = LogManager.GetLogger<GitHubSearchResponseHandler>();
public void Handle(SearchResponse message)
{
log.Info($"Found {message.TotalCount} results for {message.SearchedFor}.");
}
}
Limited
This endpoint is limited to 1 concurrent message processing and uses a pipeline behavior to handle when the processing limit of Octokit has been exceeded.
Configure to use 1 concurrent process
Limits the endpoint concurrency.
public class LimitConcurrencyLevel :
IProvideConfiguration<TransportConfig>
{
public TransportConfig GetConfiguration()
{
return new TransportConfig
{
MaximumConcurrencyLevel = 1,
};
}
}
Search Handler
Performs the Octokit search.
public class GitHubSearchHandler :
IHandleMessages<SearchGitHub>
{
IBus bus;
static ILog log = LogManager.GetLogger<GitHubSearchHandler>();
// use anonymous access which has strict rate limitations
GitHubClient GitHubClient = new GitHubClient(new ProductHeaderValue("ThroughputThrottling"));
public GitHubSearchHandler(IBus bus)
{
this.bus = bus;
}
public void Handle(SearchGitHub message)
{
log.Info($"Received search for '{message.SearchFor}' on {message.Owner}/{message.Repository}");
var request = new SearchCodeRequest(
message.SearchFor,
message.Owner,
message.Repository);
var result = GitHubClient.Search.SearchCode(request).GetAwaiter().GetResult();
log.Info($"Found {result.TotalCount} results for {message.SearchFor}. Replying.");
var response = new SearchResponse
{
SearchedFor = message.SearchFor,
TotalCount = result.TotalCount
};
bus.Reply(response);
}
}
Registering the behavior in pipeline
busConfiguration.Pipeline.Register<ThrottlingRegistration>();
class ThrottlingRegistration :
RegisterStep
{
public ThrottlingRegistration()
: base("GitHubApiThrottling", typeof(ThrottlingBehavior), "API throttling for GitHub")
{
InsertBefore(WellKnownStep.InvokeHandlers);
}
}
The Behavior
Handles detection of Octokit.
and defers the message.
public class ThrottlingBehavior :
IBehavior<IncomingContext>
{
IBus bus;
static ILog log = LogManager.GetLogger<ThrottlingBehavior>();
static DateTime? nextRateLimitReset;
public ThrottlingBehavior(IBus bus)
{
this.bus = bus;
}
public void Invoke(IncomingContext context, Action next)
{
var rateLimitReset = nextRateLimitReset;
if (rateLimitReset.HasValue && rateLimitReset >= DateTime.UtcNow)
{
var localTime = rateLimitReset?.ToLocalTime();
log.Info($"Rate limit exceeded. Retry after {rateLimitReset} UTC ({localTime} local).");
DelayMessage(context, rateLimitReset.Value);
return;
}
try
{
next();
}
catch (RateLimitExceededException exception)
{
var nextReset = nextRateLimitReset = exception.Reset.UtcDateTime;
var localTime = nextReset?.ToLocalTime();
log.Info($"Rate limit exceeded. Limit resets at {nextReset} UTC ({localTime} local).");
DelayMessage(context, nextReset.Value);
}
}
void DelayMessage(IncomingContext context, DateTime deliverAt)
{
var message = context.LogicalMessages.Single().Instance;
// maintain the original ReplyTo address
if (context.PhysicalMessage.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress))
{
bus.SetMessageHeader(message, Headers.ReplyToAddress, replyAddress);
}
// delay the message to the specified delivery date
bus.Defer(deliverAt, message);
}
}