Long-running operations with Azure Service Bus Transport

NuGet Package: NServiceBus.Transport.AzureServiceBus (1.x)
Target Version: NServiceBus 7.x
This page refers to the legacy Azure Service Bus transport that uses the WindowsAzure.ServiceBus NuGet package. This package is currently on extended support until May 2022, after which it will be out of support according to the support policy. All users should migrate to the Azure Service Bus transport transport.


An environment variable named AzureServiceBus_ConnectionString with the connection string for the Azure Service Bus namespace.

An environment variable named AzureStoragePersistence.ConnectionString with the connection string for the Azure Storage account.

Azure Service Bus Transport

This sample uses the Azure Service Bus Transport (Legacy).

Code walk-through

This sample shows a simplified long-running process

  • Client makes a request for processing with a unique ID.
  • Server enqueues requests from Client to be processed by Processor.
  • Processor raises events about successful or failed results.
  • Server issues warnings for Client if the estimated processing time is going to be violated.
sequenceDiagram Participant Client Participant Server Participant Storage Table Note over Processor: Polling Storage Table for requests Participant Processor Note left of Client: Publish Message1 occurs Client ->> Server: LongProcessingRequest (ID, Processing Time) Server ->> Storage Table: Store RequestRecord Server ->> Server: Set timeout Processor ->> Processor: process request Processor ->> Storage Table: Update RequestRecord Note over Server: If the processing takes longer than anticipated Server ->> Server: Timeout raised Server ->> Client: LongProcessingWarning Note over Server: Otherwise Processor ->> Server: LongProcessingFinished / LongProcessingFailed Processor ->> Client: LongProcessingFinished / LongProcessingFailed

Performing processing outside a message handler

When processing takes a long time, message lock renewal is possible, but should be avoided to keep message locking to a minimum.

Message lock renewal operation is initiated by the Azure Service Bus client, not the broker. If it fails after all the retries, the lock won't be re-acquired, and the message will become unlocked and available for processing. Lock renewal should be treated as best-effort and not as a guaranteed operation.

An alternative approach is to perform a long-running operation in an external service, outside of a message handler context and notify the interested parts of the results.

This sample is using a standalone process Processor to run an emulated long running work and raises events for successful or failed outcomes. Server and Processor use Azure Storage table to communicate RequestRecords in the Requests table.

For simplicity, Processor is not scaled out. If scaling out is required, work on an individual request must be locked to a single instance of a processor. A common way to achieve this is to create a blob with a request ID as a name on the storage account and get a lease to that file.

Also, note that the sample processes files in a serial manner, one at a time. For concurrent processing, Processor could spin a task per request. That would require an additional throttling mechanism to be implemented to ensure Processor is not overwhelmed.

Making a request from the client

var message = new LongProcessingRequest
    Id = Guid.NewGuid(),
    // set to a longer period of time to emulate longer processing
    EstimatedProcessingTime = Constants.EstimatedProcessingTime

Business logic with saga and timeout

Setting a timeout:

var timeoutToBeInvokedAt = DateTime.Now + message.EstimatedProcessingTime + TimeSpan.FromSeconds(10);
var timeoutMessage = new ProcessingPossiblyFailed
    Id = message.Id
await RequestTimeout(context, timeoutToBeInvokedAt, timeoutMessage)

On timeout:

var processingWarning = new LongProcessingWarning
    Id = timeoutMessage.Id
return context.Publish(processingWarning);

Server communication

Server enqueues requests for the processor using an Azure Storage table and replying back to the Client to indicate that processing is pending.

// Saga enqueues the request to process in a storage table. This is the
// logical equivalent of adding a message to a queue. If there would be
// business specific work to perform here, that work should be done by
// sending a message to a handler instead and not handled in the saga.

var request = new RequestRecord(message.Id, Status.Pending, message.EstimatedProcessingTime);
await table.ExecuteAsync(TableOperation.Insert(request))

var processingReply = new LongProcessingReply
    Id = message.Id
await context.Reply(processingReply)
Normally, work would not be done by a saga, but would be delegated to a dedicated handler. For simplicity, a handler was omitted in this sample.

Processor logic

Processor performs two never-ending tasks - polling every 5 seconds for pending requests, and processing those requests.

pollingTask = Task.Run(() => StartPolling(cancellationToken));
processingTask = Task.Run(() => StartProcessing(cancellationToken));

During processing, an exception is emulated randomly to demonstrate a failing scenario.

// emulate failure
if (DateTime.UtcNow.Ticks % 2 == 0)
    throw new Exception("Some exception during processing.");


Related Articles

Last modified