Getting Started
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Previews
Samples

Azure Service Bus lock renewal

NuGet Package: NServiceBus.Transport.AzureServiceBus (2.x)
Target Version: NServiceBus 7.x

Prerequisites

An environment variable named AzureServiceBus_ConnectionString with the connection string for the Azure Service Bus namespace.

Important information about lock renewal

  1. For a lock to be extended for longer than 10 minutes, the value of TransactionManager.MaxTimeout must be changed to the maximum time allowed to process a message. This is a machine wide setting and should be treated carefully.
  2. Message lock renewal is initiated by client code, not the broker. If the request to renew the lock fails after all the SDK built-in retries, the lock won't be renewed, and the message will become unlocked and available for processing by competing consumers. Lock renewal should be treated as best-effort and not as a guaranteed operation.
  3. Message lock renewal applies to only the message currently being processed. Prefetched messages that are not handled within the LockDuration time will lose their lock, indicated by a LockLostException in the log when the transport attempts to complete them. The number of prefetched messages may be adjusted, which may help to prevent exceptions with lock renewal. Alternatively, the endpoint's concurrency limit may be raised to increase the rate of message processing, but this may also increase resource contention.

Code walk-through

The sample contains a single executable project, LockRenewal, that sends a LongProcessingMessage message to itself, and the time taken to process that message exceeds the maximum lock duration on the endpoint's input queue.

Configuration

Lock renewal is enabled by the LockRenewalFeature, which is configured to be enabled by default.

The feature does not require any additional configuration.

Behavior

The lock will be renewed 10 seconds before the LockUntil value from the message. By default the lock will thus be renewed after 4m50s.

The sample overrides the queue lock duration to 30 seconds for demo purposes which means in the sample the lock is renewed every 20s (30s-10s). The buffer value is hardcoded in the sample.

The behavior processes every incoming message and uses native message access to get the message's lock token, which is required for lock renewal.

var message = context.Extensions.Get<ServiceBusReceivedMessage>();

Obtain the message receiver to renew the lock:

var messageReceiver = context.Extensions.Get<ServiceBusReceiver>();

A background task is started that will renew the lock until the completion of message processing signals the cancellation token.

async Task RenewLockToken(CancellationToken cancellationToken)
{
    try
    {
        int attempts = 0;

        while (!cancellationToken.IsCancellationRequested)
        {
            now = DateTimeOffset.UtcNow;
            remaining = message.LockedUntil - now;

            var buffer = TimeSpan.FromTicks(Math.Min(remaining.Ticks / 2, MaximumRenewBufferDuration.Ticks));
            var renewAfter = remaining - buffer;

            await Task.Delay(renewAfter, cancellationToken).ConfigureAwait(false);

            try
            {
                await messageReceiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
                attempts = 0;
                Log.Info($"{message.MessageId}: Lock renewed until {message.LockedUntil:s}Z.");
            }
            catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost)
            {
                Log.Error($"{message.MessageId}: Lock lost.", e);
                return;
            }
            catch (Exception e) when (!(e is OperationCanceledException))
            {
                ++attempts;
                Log.Warn($"{message.MessageId}: Failed to renew lock (#{attempts:N0}), if lock cannot be renewed within {message.LockedUntil:s}Z message will reappear.", e);
            }
        }
    }
    catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
    {
        // Expected, no need to process
    }
    catch (Exception e)
    {
        Log.Fatal($"{message.MessageId}: RenewLockToken: " + e.Message, e);
    }
}

The behavior executes the rest of the pipeline and cancels the background task when execution is completed.

try
{
    await next().ConfigureAwait(false);
}
finally
{
    remaining = message.LockedUntil - DateTimeOffset.UtcNow;

    if (remaining < MaximumRenewBufferDuration)
    {
        Log.Warn($"{message.MessageId}: Processing completed but LockedUntil {message.LockedUntil:s}Z less than {MaximumRenewBufferDuration}. This could indicate issues during lock renewal.");
    }

    cts.Cancel();
}

Prefetching

Disable prefetching if the majority of messages in the queue result in long processing durations. Otherwise, the lock for prefetched messages could expire before processing even starts.

Long-running handler

The handler emulates long-running processing with a delay of 45 seconds, which exceeds the LockDuration of the input queue, which is set to 30 seconds.

public async Task Handle(LongProcessingMessage message, IMessageHandlerContext context)
{
    log.Info($"--- Received a message with processing duration of {message.ProcessingDuration}");

    await Task.Delay(message.ProcessingDuration).ConfigureAwait(false);

    log.Info("--- Processing completed");
}

Running the sample

Running the sample produces output similar to the following:

Press any key to exit
INFO  LockRenewalBehavior Incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2
INFO  LockRenewalBehavior Lock will be renewed in 00:00:25
INFO  LongProcessingMessageHandler --- Received a message with processing duration of 00:00:45
INFO  LockRenewalBehavior Lock renewed till 2021-02-22 05:47:40 UTC / 2021-02-21 22:47:40 local
INFO  LockRenewalBehavior Lock will be renewed in 00:00:25
INFO  LongProcessingMessageHandler --- Processing completed
INFO  LockRenewalBehavior Cancelling renewal task for incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2
INFO  LockRenewalBehavior Lock renewal task for incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2 was cancelled.

Message processing takes 45 seconds, and the LockDuration is 30 seconds, so the message will have its lock renewed once, and processing will finish successfully.

Overriding the value of TransactionManager.MaxTimeout

.NET Framework

The setting can be modified using a machine level-configuration file:

<system.transactions>
  <machineSettings maxTimeout="01:00:00" />
</system.transactions>

or using reflection:

static void ConfigureTransactionTimeoutNetFramework(TimeSpan timeout)
{
    SetTransactionManagerField("_cachedMaxTimeout", true);
    SetTransactionManagerField("_maximumTimeout", timeout);

    void SetTransactionManagerField(string fieldName, object value) =>
        typeof(TransactionManager)
            .GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static)
            .SetValue(null, value);
}

.NET Core

The setting can be modified using reflection:

static void ConfigureTransactionTimeoutCore(TimeSpan timeout)
{
    SetTransactionManagerField("s_cachedMaxTimeout", true);
    SetTransactionManagerField("s_maximumTimeout", timeout);

    void SetTransactionManagerField(string fieldName, object value) =>
        typeof(TransactionManager)
            .GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static)
            .SetValue(null, value);
}

Samples

Related Articles


Last modified