Long running operations with Azure Service Bus Transport

Component: Azure Service Bus Transport | Nuget: NServiceBus.Azure.Transports.WindowsAzureServiceBus (Version: 7.x)
Target NServiceBus Version: 6.x

Prerequisites

An environment variable named AzureServiceBus.ConnectionString with the connection string for the Azure Service Bus namespace.

An environment variable named AzureStoragePersistence.ConnectionString with the connection string for the Azure Storage account.

Azure Service Bus Transport

This sample utilizes the Azure Service Bus Transport.

Code walk-through

This sample shows a simplified long running process

  • Client makes a request for processing with a unique ID.
  • Server enqueues requests from Client to be processed by Processor.
  • Processor raises events about successful or failed results.
  • Server issues warnings for Client if estimated processing time is going to be violated.
sequenceDiagram Participant Client Participant Server Participant Storage Table Note over Processor: Polling Storage Table for requests Participant Processor Note left of Client: Publish Message1 occurs Client ->> Server: LongProcessingRequest (ID, Processing Time) Server ->> Storage Table: Store RequestRecord Server ->> Server: Set timeout Processor ->> Processor: process request Processor ->> Storage Table: Update RequestRecord Note over Server: If processing takes longer than anticipated Server ->> Server: Timeout raised Server ->> Client: LongProcessingWarning Note over Server: Otherwise Processor ->> Server: LongProcessingFinished / LongProcessingFailed Processor ->> Client: LongProcessingFinished / LongProcessingFailed

Performing processing outside of a message handler

When processing is taking a long time, message lock renewal can be used, but should be avoided to keep message locking to the minimum. The alternative approach is to perform a long running operation in an external service, outside of a message handler context and notify the interested parts of the results.

This sample is using a standalone process Processor to run an emulated long running work and raises events for successful or failed outcomes. Server and Processor use Azure Storage table to communicate RequestRecords in the Requests table.

For simplicity of this sample, Processor is not scaled out. In case scale out is required, work on an individual request has to be locked to a single instance of a processor. A common way to achieve that would be creating a blob with request ID as a name on storage account and get a lease to that file. Also, the sample processes files in a serial manner, one at a time. For concurrent processing, Processor could spin a task per request. That would require an additional throttling mechanism to be implemented to ensure Processor is not overwhelmed.

Making a request from the client

Edit
var message = new LongProcessingRequest
{
    Id = Guid.NewGuid(),
    // set to a longer period of time to emulate longer processing
    EstimatedProcessingTime = Constants.EstimatedProcessingTime
};

Business logic with saga and timeout

Setting a timeout:

Edit
var timeoutToBeInvokedAt = DateTime.Now + message.EstimatedProcessingTime + TimeSpan.FromSeconds(10);
var timeoutMessage = new ProcessingPossiblyFailed
{
    Id = message.Id
};
await RequestTimeout(context, timeoutToBeInvokedAt, timeoutMessage)
    .ConfigureAwait(false);

On timeout:

Edit
var processingWarning = new LongProcessingWarning
{
    Id = timeoutMessage.Id
};
await context.Publish(processingWarning)
    .ConfigureAwait(false);
MarkAsComplete();

Server communication

Server enqueues request for the processor using Azure Storage table and replying back to the Client to indicate that processing is pending.

Edit
// Saga enqueues the request to process in a storage table. This is the
// logical equivalent of adding a message to a queue. If there would be
// business specific work to perform here, that work should be done by
// sending a message to a handler instead and not handled in the saga.

var request = new RequestRecord(message.Id, Status.Pending, message.EstimatedProcessingTime);
await table.ExecuteAsync(TableOperation.Insert(request))
    .ConfigureAwait(false);

var processingReply = new LongProcessingReply
{
    Id = message.Id
};
await context.Reply(processingReply)
    .ConfigureAwait(false);
normally, actual work would not be done by the Sagas, and would be delegated to a dedicated handler. For simplicity of this sample, a handler was omitted.

Processor logic

Processor performs to never ending tasks - polling every 5 seconds for pending requests and processing those

Edit
pollingTask = Task.Run(() => StartPolling(cancellationToken));
processingTask = Task.Run(() => StartProcessing(cancellationToken));

During processing, a processing exception is emulated randomly to demonstrate a failing scenario.

Edit
// emulate failure
if (DateTime.UtcNow.Ticks%2 == 0)
{
    throw new Exception("Some exception during processing.");
}

Related Articles


Last modified