Prerequisites
An environment variable named AzureServiceBus_ConnectionString
with the connection string for the Azure Service Bus namespace.
Important information about lock renewal
- The transport must use the default
SendsAtomicWithReceive
transaction mode for the sample to work. - When using lock renewal with the outbox feature, the transport transaction mode has to be explicitly set to
SendsAtomicWithReceive
in the endpoint configuration code. - For a lock to be extended for longer than 10 minutes, the value of
TransactionManager.
must be changed to the maximum time allowed to process a message. This is a machine wide setting and should be treated carefully.MaxTimeout - Message lock renewal is initiated by client code, not the broker. If the request to renew the lock fails after all the SDK built-in retries, the lock won't be renewed, and the message will become unlocked and available for processing by competing consumers. Lock renewal should be treated as best-effort and not as a guaranteed operation.
- Message lock renewal applies to only the message currently being processed. Prefetched messages that are not handled within the
LockDuration
time will lose their lock, indicated by aLockLostException
in the log when the transport attempts to complete them. The number of prefetched messages may be adjusted, which may help to prevent exceptions with lock renewal. Alternatively, the endpoint's concurrency limit may be raised to increase the rate of message processing, but this may also increase resource contention.
Code walk-through
The sample contains a single executable project, LockRenewal
, that sends a LongProcessingMessage
message to itself, and the time taken to process that message exceeds the maximum lock duration on the endpoint's input queue.
Lock renewal feature
Lock renewal is enabled by the LockRenewalFeature
, which is configured to be enabled by default.
EnableByDefault();
Defaults(settings =>
{
settings.SetDefault<LockRenewalOptions>(new LockRenewalOptions
{
// NServiceBus.Transport.AzureServiceBus sets LockDuration to 5 minutes by default
LockDuration = TimeSpan.FromMinutes(5),
RenewalInterval = TimeSpan.FromMinutes(1)
});
});
The Azure Service Bus transport sets LockDuration
to 5 minutes by default, so the default LockDuration
for the feature has the same value. ExecuteRenewalBefore
is a TimeSpan
specifying how soon to attempt lock renewal before the lock expires. The default is 10 seconds. Both settings may be overridden using the EndpointConfiguration
API.
var lockDuration = TimeSpan.FromSeconds(30);
var renewalInterval = TimeSpan.FromSeconds(5);
endpointConfiguration.LockRenewal(options =>
{
options.LockDuration = lockDuration;
options.RenewalInterval = renewalInterval;
});
In the sample, LockDuration
is set to 30 seconds, and ExecuteRenewalBefore
is set to 5 seconds.
Lock renewal behavior
The LockRenewalFeature
uses the two settings to register the LockRenewalBehavior
pipeline behavior. With LockDuration
set to 30 seconds and ExecuteRenewalBefore
set to 5 seconds, the lock will be renewed every 25 seconds (LockDuration
- ExecuteRenewalBefore
).
The behavior processes every incoming message and uses native message access to get the message's lock token, which is required for lock renewal.
var message = context.Extensions.Get<ServiceBusReceivedMessage>();
The request to renew the lock must use the same Azure Service Bus connection object and queue path used to receive the incoming message. These items are available in the TransportTransaction
:
var transportTransaction = context.Extensions.Get<TransportTransaction>();
var serviceBusClient = transportTransaction.Get<ServiceBusClient>();
With the lock token, connection object, and queue path, an Azure Service Bus MessageReceiver
object may be created to renew the lock. This is done in a background task, running in an infinite loop until the specified cancellation token is signaled as canceled.
async Task RenewLockToken(CancellationToken cancellationToken)
{
try
{
int attempts = 0;
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(renewLockTokenIn, cancellationToken).ConfigureAwait(false);
try
{
await messageReceiver.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
attempts = 0;
Log.Info($"{message.MessageId}: Lock renewed untill {message.LockedUntil:s}Z.");
}
catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.MessageLockLost)
{
Log.Error($"{message.MessageId}: Lock lost.", e);
return;
}
catch (Exception e) when (!(e is OperationCanceledException))
{
++attempts;
Log.Warn($"{message.MessageId}: Failed to renew lock (#{attempts:N0}), if lock cannot be renewed within {message.LockedUntil:s}Z message will reappear.", e);
}
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected, no need to process
}
catch (Exception e)
{
Log.Fatal($"{message.MessageId}: RenewLockToken: " + e.Message, e);
}
}
The behavior executes the rest of the pipeline and cancels the background task when execution is completed.
try
{
await next().ConfigureAwait(false);
}
finally
{
remaining = message.LockedUntil - DateTimeOffset.UtcNow;
if (remaining < renewLockTokenIn)
{
Log.Warn($"{message.MessageId}: Processing completed but LockedUntil {message.LockedUntil:s}Z less than {renewLockTokenIn}. This could indicate issues during lock renewal.");
}
cts.Cancel();
}
Long-running handler
The handler emulates long-running processing with a delay of 45 seconds, which exceeds the LockDuration
of the input queue, which is set to 30 seconds.
public async Task Handle(LongProcessingMessage message, IMessageHandlerContext context)
{
log.Info($"--- Received a message with processing duration of {message.ProcessingDuration}");
await Task.Delay(message.ProcessingDuration).ConfigureAwait(false);
log.Info("--- Processing completed");
}
Running the sample
Running the sample produces output similar to the following:
Press any key to exit
INFO LockRenewalBehavior Incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2
INFO LockRenewalBehavior Lock will be renewed in 00:00:25
INFO LongProcessingMessageHandler --- Received a message with processing duration of 00:00:45
INFO LockRenewalBehavior Lock renewed till 2021-02-22 05:47:40 UTC / 2021-02-21 22:47:40 local
INFO LockRenewalBehavior Lock will be renewed in 00:00:25
INFO LongProcessingMessageHandler --- Processing completed
INFO LockRenewalBehavior Cancelling renewal task for incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2
INFO LockRenewalBehavior Lock renewal task for incoming message ID: 940e9a1f-fd8e-4e48-96b7-4604a544d8f2 was cancelled.
Message processing takes 45 seconds, and the LockDuration
is 30 seconds, so the message will have its lock renewed once, and processing will finish successfully.
Overriding the value of TransactionManager.MaxTimeout
.NET Framework
The setting can be modified using a machine level-configuration file:
<system.transactions>
<machineSettings maxTimeout="01:00:00" />
</system.transactions>
or using reflection:
static void ConfigureTransactionTimeoutNetFramework(TimeSpan timeout)
{
SetTransactionManagerField("_cachedMaxTimeout", true);
SetTransactionManagerField("_maximumTimeout", timeout);
void SetTransactionManagerField(string fieldName, object value) =>
typeof(TransactionManager)
.GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static)
.SetValue(null, value);
}
.NET Core
The setting can be modified using reflection:
static void ConfigureTransactionTimeoutCore(TimeSpan timeout)
{
SetTransactionManagerField("s_cachedMaxTimeout", true);
SetTransactionManagerField("s_maximumTimeout", timeout);
void SetTransactionManagerField(string fieldName, object value) =>
typeof(TransactionManager)
.GetField(fieldName, BindingFlags.NonPublic | BindingFlags.Static)
.SetValue(null, value);
}