Systems often need to integrate with 3rd party services, some of which may limit the number of concurrent requests they process.
This sample demonstrates an integration with the GitHub API using the Octokit library. It runs in unauthenticated mode, which is limited to a fixed number of requests per minute. Upon hitting this limit, the endpoint will delay processing additional messages, until the limit resets.
The solution consists of two endpoints; Sender and Limited. The reason two endpoints are required in this scenario is that NServiceBus does not support limiting messages by message type. So, to limit only a specific message type, a separate endpoint is used for it.
Sender
The Sender is a normal endpoint that sends the SearchGitHub
message and the handles the reply SearchResponse
message.
Sending
The message sending occurs at startup.
var endpointInstance = await Endpoint.Start(endpointConfiguration);
Console.WriteLine("Sending messages...");
for (var i = 0; i < 100; i++)
{
var searchGitHub = new SearchGitHub
{
Repository = "NServiceBus",
Owner = "Particular",
Branch = "master"
};
await endpointInstance.Send("Samples.Throttling.Limited", searchGitHub);
}
Handling the response
Handling the GitHubSearchResponse message.
public class GitHubSearchResponseHandler :
IHandleMessages<SearchResponse>
{
static ILog log = LogManager.GetLogger<GitHubSearchResponseHandler>();
public Task Handle(SearchResponse message, IMessageHandlerContext context)
{
log.Info($"Found commit '{message.CommitSha}' for branch '{message.Branch}'.");
return Task.CompletedTask;
}
}
Limited
This endpoint is limited to processing one message at a time and uses a pipeline behavior to handle when the processing limit of Octokit has been exceeded.
Configure to process only one concurrent message
Limits the endpoint concurrency.
var endpointConfiguration = new EndpointConfiguration("Samples.Throttling.Limited");
endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);
Search Handler
Performs the Octokit search.
public class GitHubSearchHandler :
IHandleMessages<SearchGitHub>
{
static ILog log = LogManager.GetLogger<GitHubSearchHandler>();
// use anonymous access which has strict rate limitations
GitHubClient GitHubClient = new GitHubClient(new ProductHeaderValue("ThroughputThrottling"));
public async Task Handle(SearchGitHub message, IMessageHandlerContext context)
{
log.Info($"Received search request for branch '{message.Branch}' on '{message.Owner}/{message.Repository}'");
var result = await GitHubClient.Repository.Branch.Get(message.Owner, message.Repository, "master");
log.Info($"Found commit '{result.Commit.Sha}' for branch '{message.Branch}'. Replying.");
var response = new SearchResponse
{
Branch = message.Branch,
CommitSha = result.Commit.Sha
};
await context.Reply(response);
}
}
Registering the behavior in pipeline
endpointConfiguration.Pipeline.Register(typeof(ThrottlingBehavior), "API throttling for GitHub");
The pipeline behavior
Handles the detection of Octokit.
and defers the message.
public class ThrottlingBehavior :
Behavior<IInvokeHandlerContext>
{
static ILog log = LogManager.GetLogger<ThrottlingBehavior>();
static DateTime? nextRateLimitReset;
public override async Task Invoke(IInvokeHandlerContext context, Func<Task> next)
{
var rateLimitReset = nextRateLimitReset;
if (rateLimitReset.HasValue && rateLimitReset >= DateTime.UtcNow)
{
var localTime = rateLimitReset?.ToLocalTime();
log.Info($"Rate limit exceeded. Retry after {rateLimitReset} UTC ({localTime} local).");
await DelayMessage(context, rateLimitReset.Value);
return;
}
try
{
await next();
}
catch (RateLimitExceededException exception)
{
var nextReset = nextRateLimitReset = exception.Reset.UtcDateTime;
var localTime = nextReset?.ToLocalTime();
log.Info($"Rate limit exceeded. Limit resets at {nextReset} UTC ({localTime} local).");
await DelayMessage(context, nextReset.Value);
}
}
Task DelayMessage(IInvokeHandlerContext context, DateTime deliverAt)
{
var sendOptions = new SendOptions();
// delay the message to the specified delivery date
sendOptions.DoNotDeliverBefore(deliverAt);
// send message to this endpoint
sendOptions.RouteToThisEndpoint();
// maintain the original ReplyTo address
if (context.Headers.TryGetValue(Headers.ReplyToAddress, out var replyAddress))
{
sendOptions.RouteReplyTo(replyAddress);
}
return context.Send(context.MessageBeingHandled, sendOptions);
}
}
The behavior sends a copy of the original message, but does not copy the headers of the original message. If the headers of the original message are required, they must be copied from context.
to SendOptions
.