Azure Service Bus lock renewal

Component: Azure Service Bus Transport
NuGet Package NServiceBus.Transport.AzureServiceBus (1.x)
Target NServiceBus Version: 7.x

Prerequisites

An environment variable named AzureServiceBus_ConnectionString with the connection string for the Azure Service Bus namespace.

Important information about lock renewal

  1. The transport must use the default SendsAtomicWithReceive transaction mode for the sample to work.
  2. When using lock renewal with the outbox feature, the transport transaction mode has to be explicitly set to SendsAtomicWithReceive in the endpoint configuration code.
  3. For a lock to be extended for longer than 10 minutes, the value of TransactionManager.MaxTimeout must be changed to the maximum time allowed to process a message. This is a machine wide setting and should be treated carefully.
  4. Message lock renewal is initiated by client code, not the broker. If the request to renew the lock fails after all the SDK built-in retries, the lock won't be renewed, and the message will become unlocked and available for processing by competing consumers. Lock renewal should be treated as best-effort and not as a guaranteed operation.
  5. Message lock renewal applies to only the message currently being processed. Prefetched messages that are not handled within the LockDuration time will lose their lock, indicated by a LockLostException in the log when the transport attempts to complete them. The number of prefetched messages may be adjusted, which may help to prevent exceptions with lock renewal. Alternatively, the endpoint's concurrency limit may be raised to increase the rate of message processing, but this may also increase resource contention.

Code walk-through

The sample contains a single executable project, LockRenewal, that sends a LongProcessingMessage message to itself, and the time taken to process that message exceeds the maximum lock duration on the endpoint's input queue.

Lock renewal feature

Lock renewal is enabled by the LockRenewalFeature, which is configured to be enabled by default.

EnableByDefault();

Defaults(settings =>
{
    settings.SetDefault<LockRenewalOptions>(new LockRenewalOptions
    {
        // NServiceBus.Transport.AzureServiceBus sets LockDuration to 5 minutes by default
        LockDuration = TimeSpan.FromMinutes(5),
        ExecuteRenewalBefore = TimeSpan.FromSeconds(10)
    });
});

The Azure Service Bus transport sets LockDuration to 5 minutes by default, so the default LockDuration for the feature has the same value. ExecuteRenewalBefore is a TimeSpan specifying how soon to attempt lock renewal before the lock expires. The default is 10 seconds. Both settings may be overridden using the EndpointConfiguration API.

endpointConfiguration.LockRenewal(options =>
{
    options.LockDuration = TimeSpan.FromSeconds(30);
    options.ExecuteRenewalBefore = TimeSpan.FromSeconds(5);
});

In the sample, LockDuration is set to 30 seconds, and ExecuteRenewalBefore is set to 5 seconds.

Lock renewal behavior

The LockRenewalFeature uses the two settings to register the LockRenewalBehavior pipeline behavior. With LockDuration set to 30 seconds and ExecuteRenewalBefore set to 5 seconds, the lock will be renewed every 25 seconds (LockDuration - ExecuteRenewalBefore).

The behavior processes every incoming message and uses native message access to get the message's lock token, which is required for lock renewal.

var message = context.Extensions.Get<Message>();
var lockToken = message.SystemProperties.LockToken;

The request to renew the lock must use the same Azure Service Bus connection object and queue path used to receive the incoming message. These items are available in the TransportTransaction:

var transportTransaction = context.Extensions.Get<TransportTransaction>();
var (serviceBusConnection, path) = transportTransaction.Get<(ServiceBusConnection, string)>();

With the lock token, connection object, and queue path, an Azure Service Bus MessageReceiver object may be created to renew the lock. This is done in a background task, running in an infinite loop until the specified cancellation token is signaled as canceled.

async Task RenewLockToken(CancellationToken cancellationToken)
{
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            Log.Info($"Lock will be renewed in {renewLockTokenIn}");

            await Task.Delay(renewLockTokenIn, cancellationToken).ConfigureAwait(false);

            var time = await messageReceiver.RenewLockAsync(lockToken).ConfigureAwait(false);

            Log.Info($"Lock renewed till {time} UTC / {time.ToLocalTime()} local");
        }
    }
    catch (OperationCanceledException)
    {
        Log.Info($"Lock renewal task for incoming message ID: {message.MessageId} was cancelled.");
    }
    catch (Exception exception)
    {
        Log.Error($"Failed to renew lock for incoming message ID: {message.MessageId}", exception);
    }
}

The behavior executes the rest of the pipeline and cancels the background task when execution is completed.

try
{
    await next().ConfigureAwait(false);
}
finally
{
    Log.Info($"Cancelling renewal task for incoming message ID: {message.MessageId}");
    cts.Cancel();
    cts.Dispose();
}

Long-running handler

The handler emulates long-running processing with a delay of 45 seconds, which exceeds the LockDuration of the input queue, which is set to 30 seconds.

public async Task Handle(LongProcessingMessage message, IMessageHandlerContext context)
{
    log.Info($"--- Received a message with processing duration of {message.ProcessingDuration}");

    await Task.Delay(message.ProcessingDuration).ConfigureAwait(false);

    log.Info("--- Processing completed");
}

Running the sample

Running the sample produces output similar to the following:

Press any key to exit
INFO  LockRenewalBehavior Incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2
INFO  LockRenewalBehavior Lock will be renewed in 00:00:25
INFO  LongProcessingMessageHandler --- Received a message with processing duration of 00:00:45
INFO  LockRenewalBehavior Lock renewed till 2021-02-22 05:47:40 UTC / 2021-02-21 22:47:40 local
INFO  LockRenewalBehavior Lock will be renewed in 00:00:25
INFO  LongProcessingMessageHandler --- Processing completed
INFO  LockRenewalBehavior Cancelling renewal task for incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2
INFO  LockRenewalBehavior Lock renewal task for incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2 was cancelled.

Message processing takes 45 seconds, and the LockDuration is 30 seconds, so the message will have its lock renewed once, and processing will finish successfully.

Overriding the value of TransactionManager.MaxTimeout

.NET Framework

The setting can be modified using a machine level-configuration file:

<system.transactions>
  <machineSettings maxTimeout="01:00:00" />
</system.transactions>

or using reflection:

static void ConfigureTransactionTimeoutNetFramework(TimeSpan timeout)
{
    SetTransactionManagerField("_cachedMaxTimeout", true);
    SetTransactionManagerField("_maximumTimeout", timeout);

    void SetTransactionManagerField(string fieldName, object value) =>
        typeof(TransactionManager)
            .GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static)
            .SetValue(null, value);
}

.NET Core

The setting can be modified using reflection:

static void ConfigureTransactionTimeoutCore(TimeSpan timeout)
{
    SetTransactionManagerField("s_cachedMaxTimeout", true);
    SetTransactionManagerField("s_maximumTimeout", timeout);

    void SetTransactionManagerField(string fieldName, object value) =>
        typeof(TransactionManager)
            .GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static)
            .SetValue(null, value);
}

Samples

Related Articles


Last modified