This sample shows how to use Azure Functions to automatically trigger blob cleanup.
Prerequisites
- Make sure Azure Functions Tools for Visual Studio are setup correctly.
- Start Azure Storage Emulator. Ensure the latest version is installed.
- Run the solution. Two console applications start.
- Find the
SenderAndReceiver
application by looking for the one withSenderAndReceiver
in its path and pressenter to send a large message. NServiceBus sends it as an attachment via Azure storage. The DataBusBlobCreated
trigger function runs in the Function window, followed by theDataBusCleanupOrchestrator
orchestrator function, invoking theDeleteBlob
activity function, deleting the blob when the time-to-live for the message is reached.
Code walk-through
This sample contains two projects:
- DataBusBlobCleanupFunctions - An Azure Function project that contains the three Azure Functions that perform the cleanup.
- SenderAndReceiver - A console application responsible for sending and receiving the large message.
DatabusBlobCleanupFunctions
DataBusBlobCreated
The following Azure Function is included in this project that is triggered when a blob is created or updated in the data bus path in the storage account.
[FunctionName(nameof(DataBusBlobCreated))]
public async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")] CloudBlockBlob myBlob, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
log.LogInformation($"Blob created at {myBlob.Uri}");
var instanceId = myBlob.Name;
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance != null)
{
log.LogInformation($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {myBlob.Uri}.");
return;
}
var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(myBlob);
if (validUntilUtc == DateTime.MaxValue)
{
log.LogError($"Could not parse the 'ValidUntil' value for blob {myBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
return;
}
await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
{
Path = myBlob.Uri.ToString(),
ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
});
}
The execution uses a singleton orchestration pattern using the blob name when starting the DataBusCleanupOrchestrator
function. This prevents multiple timeouts from being started.
The GetValidUntil
method uses logic that reproduces the cleanup functionality of the NServiceBus.
package.
public static DateTime GetValidUntil(CloudBlockBlob blockBlob)
{
if (blockBlob.Metadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
{
return ToUtcDateTime(validUntilUtcString);
}
return DateTime.MaxValue;
}
The method evaluates the metadata of the blob looking for previously provided timeout values. If none are found the default time to live is calculated for the blob and returned.
The timeout value is passed in when the DataBusCleanupOrchestrator
orchestration function is executed.
DataBusCleanupOrchestrator
[FunctionName(nameof(DataBusCleanupOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
var blobData = context.GetInput<DataBusBlobData>();
log.LogInformation($"Orchestrating deletion for blob at {blobData.Path} with ValidUntilUtc of {blobData.ValidUntilUtc}");
var validUntilUtc = DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc);
DateTime timeoutUntil;
//Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
do
{
timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(6) ? context.CurrentUtcDateTime.AddDays(6) : validUntilUtc;
log.LogInformation($"Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Path}. Currently {context.CurrentUtcDateTime}.");
await context.CreateTimer(DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
} while (validUntilUtc > timeoutUntil);
await context.CallActivityAsync("DeleteBlob", blobData);
}
The function uses a durable function timer to delay execute deletion of the blob from azure storage.
DeleteBlob
[FunctionName("DeleteBlob")]
public async Task DeleteBlob([ActivityTrigger] DataBusBlobData blobData, ILogger log)
{
var blob = await cloudBlobClient.GetBlobReferenceFromServerAsync(new Uri(blobData.Path));
log.LogInformation($"Deleting blob at {blobData.Path}");
await blob.DeleteIfExistsAsync();
}
The function is executing the actual work to delete a blob.
Configuring time to live for large binary objects
The default time to live for all large binary objects is configured by setting the DefaultTimeToLiveInSeconds
environment variable. This can be set during debugging by adding the appropriate Values
setting in the local.
file:
{
"IsEncrypted": false,
"Values": {
"DefaultTimeToLiveInSeconds": "180"
}
}
In production this is set using an applications settings value named DefaultTimeToLiveInSeconds
in the Azure portal.
A message with a set time to be received will override the default time to live for the large binary object and instead use this value when determining the time to clean up the blob.
Configuring the data bus location
The DataBusBlobCleanupFunctions
project needs to access the large binary objects. This is done by specifying an Azure storage connection string in the DataBusStorageAccount
environment variable. This can be set during debugging by adding the appropriate Values
setting in the local.
file:
{
"IsEncrypted": false,
"Values": {
"DataBusStorageAccount": "UseDevelopmentStorage=true"
}
}
In production this is set using an applications settings value named DataBusStorageAccount
in the Azure portal.
Migrating existing projects
In environments where NServiceBus.
is already in use the timeout function will need to be triggered for the existing attachments.
A manually-triggered function called DataBusOrchestrateExistingBlobs
is included to trigger orchestration for every existing blob in the container. It's an HTTP triggered function that can be invoked using a browser.
[FunctionName(nameof(DataBusOrchestrateExistingBlobs))]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
var counter = 0;
try
{
BlobContinuationToken token = null;
do
{
var segment = await container.ListBlobsSegmentedAsync(token).ConfigureAwait(false);
token = segment.ContinuationToken;
foreach (var blockBlob in segment.Results.Where(blob => blob is CloudBlockBlob).Cast<CloudBlockBlob>())
{
var instanceId = blockBlob.Name;
var existingInstance = await starter.GetStatusAsync(instanceId);
if (existingInstance != null)
{
log.LogInformation($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {blockBlob.Uri}.");
continue;
}
var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(blockBlob);
if (validUntilUtc == DateTime.MaxValue)
{
log.LogError($"Could not parse the 'ValidUntilUtc' value for blob {blockBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
continue;
}
await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
{
Path = blockBlob.Uri.ToString(),
ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
});
counter++;
}
} while (token != null);
}
catch (Exception exception)
{
var result = new ObjectResult(exception.Message)
{
StatusCode = (int)HttpStatusCode.InternalServerError
};
return result;
}
var message = "DataBusOrchestrateExistingBlobs has completed." + (counter > 0 ? $" {counter} blob{(counter > 1 ? "s" : string.Empty)} will be tracked for clean-up." : string.Empty);
return new OkObjectResult(message);
}
The function is very similar to the DataBusBlobCreated
function, but instead of working on a single blob at a time it will iterate over every existing blob in the container.
This function does not require downtime as the implemented singleton orchestration pattern will prevent existing timeouts from being duplicated.
SenderAndReceiver project
The project sends the MessageWithLargePayload
message to itself, utilizing the NServiceBus attachment mechanism.