Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Long-running operations with Azure Service Bus Transport

In version 3 and higher of the Azure Service Bus transport, lock renewal is built into the transport, and custom lock renewal as shown in this sample is no longer required.

Prerequisites

An environment variable named AzureServiceBus_ConnectionString with the connection string for the Azure Service Bus namespace.

An environment variable named AzureStoragePersistence.ConnectionString with the connection string for the Azure Storage account.

Code walk-through

This sample shows a simplified long-running process

  • Client makes a request for processing with a unique ID.
  • Server enqueues requests from Client to be processed by Processor.
  • Processor raises events about successful or failed results.
  • Server issues warnings for Client if the estimated processing time is going to be violated.
sequenceDiagram Participant Client Participant Server Participant Storage Table Note over Processor: Polling Storage Table for requests Participant Processor Note left of Client: Publish Message1 occurs Client ->> Server: LongProcessingRequest (ID, Processing Time) Server ->> Storage Table: Store RequestRecord Server ->> Server: Set timeout Processor ->> Processor: process request Processor ->> Storage Table: Update RequestRecord Note over Server: If the processing takes longer than anticipated Server ->> Server: Timeout raised Server ->> Client: LongProcessingWarning Note over Server: Otherwise Processor ->> Server: LongProcessingFinished / LongProcessingFailed Processor ->> Client: LongProcessingFinished / LongProcessingFailed

Performing processing outside a message handler

When processing takes a long time, message lock renewal is possible. The client initiates lock renewal, not the broker. If the request to renew the lock fails after all the SDK built-in retries (.e.g, due to connection loss), the lock won't be renewed, and the message will be made available for processing by competing consumers. Lock renewal should be treated as a best effort, not as a guaranteed operation.

An alternative approach is to perform a long-running operation in an external service, outside of a message handler context and notify the interested parts of the results.

This sample is using a standalone process Processor to run an emulated long running work and raises events for successful or failed outcomes. Server and Processor use Azure Storage table to communicate RequestRecords in the Requests table.

For simplicity, Processor is not scaled out. If scaling out is required, work on an individual request must be locked to a single instance of a processor. A common way to achieve this is to create a blob with a request ID as a name on the storage account and get a lease to that file.

Also, note that the sample processes files in a serial manner, one at a time. For concurrent processing, Processor could spin a task per request. That would require an additional throttling mechanism to be implemented to ensure Processor is not overwhelmed.

Making a request from the client

var message = new LongProcessingRequest
{
    Id = Guid.NewGuid(),
    // set to a longer period of time to emulate longer processing
    EstimatedProcessingTime = Constants.EstimatedProcessingTime
};

Business logic with saga and timeout

Setting a timeout:

var timeoutToBeInvokedAt = DateTime.Now + message.EstimatedProcessingTime + TimeSpan.FromSeconds(10);
var timeoutMessage = new ProcessingPossiblyFailed
{
    Id = message.Id
};
await RequestTimeout(context, timeoutToBeInvokedAt, timeoutMessage);

On timeout:

var processingWarning = new LongProcessingWarning
{
    Id = timeoutMessage.Id
};
MarkAsComplete();
return context.Publish(processingWarning);

Server communication

Server enqueues requests for the processor using an Azure Storage table and replying back to the Client to indicate that processing is pending.

// Saga enqueues the request to process in a storage table. This is the
// logical equivalent of adding a message to a queue. If there would be
// business specific work to perform here, that work should be done by
// sending a message to a handler instead and not handled in the saga.

var request = new RequestRecord(message.Id, Status.Pending, message.EstimatedProcessingTime);
await table.ExecuteAsync(TableOperation.Insert(request));

var processingReply = new LongProcessingReply
{
    Id = message.Id
};
await context.Reply(processingReply);
Normally, work would not be done by a saga, but would be delegated to a dedicated handler. For simplicity, a handler was omitted in this sample.

Processor logic

Processor performs two never-ending tasks - polling every 5 seconds for pending requests, and processing those requests.

pollingTask = Task.Run(() => StartPolling(cancellationToken));
processingTask = Task.Run(() => StartProcessing(cancellationToken));

During processing, an exception is emulated randomly to demonstrate a failing scenario.

// emulate failure
if (DateTime.UtcNow.Ticks % 2 == 0)
{
    throw new Exception("Some exception during processing.");
}

Samples

Related Articles


Last modified