Prerequisites
An environment variable named AzureServiceBus_ConnectionString
with the connection string for the Azure Service Bus namespace.
An environment variable named AzureStoragePersistence.
with the connection string for the Azure Storage account.
Code walk-through
This sample shows a simplified long-running process
Client
makes a request for processing with a unique ID.Server
enqueues requests fromClient
to be processed byProcessor
.Processor
raises events about successful or failed results.Server
issues warnings forClient
if the estimated processing time is going to be violated.
Performing processing outside a message handler
When processing takes a long time, message lock renewal is possible. The client initiates lock renewal, not the broker. If the request to renew the lock fails after all the SDK built-in retries (.e.g, due to connection loss), the lock won't be renewed, and the message will be made available for processing by competing consumers. Lock renewal should be treated as a best effort, not as a guaranteed operation.
An alternative approach is to perform a long-running operation in an external service, outside of a message handler context and notify the interested parts of the results.
This sample is using a standalone process Processor
to run an emulated long running work and raises events for successful or failed outcomes. Server
and Processor
use Azure Storage table to communicate RequestRecord
s in the Requests
table.
For simplicity, Processor
is not scaled out. If scaling out is required, work on an individual request must be locked to a single instance of a processor. A common way to achieve this is to create a blob with a request ID as a name on the storage account and get a lease to that file.
Also, note that the sample processes files in a serial manner, one at a time. For concurrent processing, Processor
could spin a task per request. That would require an additional throttling mechanism to be implemented to ensure Processor
is not overwhelmed.
Making a request from the client
var message = new LongProcessingRequest
{
Id = Guid.NewGuid(),
// set to a longer period of time to emulate longer processing
EstimatedProcessingTime = Constants.EstimatedProcessingTime
};
Business logic with saga and timeout
Setting a timeout:
var timeoutToBeInvokedAt = DateTime.Now + message.EstimatedProcessingTime + TimeSpan.FromSeconds(10);
var timeoutMessage = new ProcessingPossiblyFailed
{
Id = message.Id
};
await RequestTimeout(context, timeoutToBeInvokedAt, timeoutMessage)
.ConfigureAwait(false);
On timeout:
var processingWarning = new LongProcessingWarning
{
Id = timeoutMessage.Id
};
MarkAsComplete();
return context.Publish(processingWarning);
Server communication
Server
enqueues requests for the processor using an Azure Storage table and replying back to the Client
to indicate that processing is pending.
// Saga enqueues the request to process in a storage table. This is the
// logical equivalent of adding a message to a queue. If there would be
// business specific work to perform here, that work should be done by
// sending a message to a handler instead and not handled in the saga.
var request = new RequestRecord(message.Id, Status.Pending, message.EstimatedProcessingTime);
await table.ExecuteAsync(TableOperation.Insert(request))
.ConfigureAwait(false);
var processingReply = new LongProcessingReply
{
Id = message.Id
};
await context.Reply(processingReply)
.ConfigureAwait(false);
Processor logic
Processor
performs two never-ending tasks - polling every 5 seconds for pending requests, and processing those requests.
pollingTask = Task.Run(() => StartPolling(cancellationToken));
processingTask = Task.Run(() => StartProcessing(cancellationToken));
During processing, an exception is emulated randomly to demonstrate a failing scenario.
// emulate failure
if (DateTime.UtcNow.Ticks % 2 == 0)
{
throw new Exception("Some exception during processing.");
}