Azure Blob Storage Data Bus Cleanup with Azure Functions

Component: Azure Blob Storage Databus
NuGet Package NServiceBus.DataBus.AzureBlobStorage (5-pre)
Target NServiceBus Version: 8.x
This page targets a pre-release version and is subject to change prior to the final release.

This sample shows how to use Azure Functions to automatically trigger blob cleanup.

Prerequisites

  1. Make sure Azure Functions Tools for Visual Studio are setup correctly.
  2. Start Azure Storage Emulator. Ensure the latest version is installed.
  3. Run the solution. Two console applications start.
  4. Find the SenderAndReceiver application by looking for the one with SenderAndReceiver in its path and press enter to send a large message. NServiceBus sends it as an attachment via Azure storage. The DataBusBlobCreated trigger function runs in the Function window, followed by the DataBusCleanupOrchestrator orchestrator function, invoking the DeleteBlob activity function, deleting the blob when the time-to-live for the message is reached.

Code walk-through

This sample contains two projects:

  • DataBusBlobCleanupFunctions - An Azure Function project that contains the three Azure Functions that perform the cleanup.
  • SenderAndReceiver - A console application responsible for sending and receiving the large message.

DatabusBlobCleanupFunctions

DataBusBlobCreated

The following Azure Function is included in this project that is triggered when a blob is created or updated in the data bus path in the storage account.

[FunctionName(nameof(DataBusBlobCreated))]
public async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")]CloudBlockBlob  myBlob, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
    log.LogInformation($"Blob created at {myBlob.Uri}");

    var instanceId = myBlob.Name;

    var existingInstance = await starter.GetStatusAsync(instanceId);
    if (existingInstance != null)
    {
        log.LogInformation($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {myBlob.Uri}.");
        return;
    }

    var validUntilUtc =  calculator.GetValidUntil(myBlob);

    if (validUntilUtc == DateTime.MaxValue)
    {
        log.LogError($"Could not parse the 'ValidUntil' value for blob {myBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
        return;
    }

    await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
    {
        Path = myBlob.Uri.ToString(),
        ValidUntilUtc = calculator.ToWireFormattedString(validUntilUtc)
    });
}

The execution uses a singleton orchestration pattern using the blob name when starting the DataBusCleanupOrchestrator function. This prevents multiple timeouts from being started.

The GetValidUntil method uses logic that reproduces the cleanup functionality of the NServiceBus.DataBus.AzureBlobStorage package.

public DateTime GetValidUntil(CloudBlockBlob blockBlob)
{
    if (blockBlob.Metadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
    {
        return ToUtcDateTime(validUntilUtcString);
    }

    return DateTime.MaxValue;
}

The method evaluates the metadata of the blob looking for previously provided timeout values. If none are found the default time to live is calculated for the blob and returned.

The timeout value is passed in when the DataBusCleanupOrchestrator orchestration function is executed.

DataBusCleanupOrchestrator

[FunctionName(nameof(DataBusCleanupOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
    var blobData = context.GetInput<DataBusBlobData>();

    log.LogInformation($"Orchestrating deletion for blob at {blobData.Path} with ValidUntilUtc of {blobData.ValidUntilUtc}");

    var validUntilUtc = calculator.ToUtcDateTime(blobData.ValidUntilUtc);

    DateTime timeoutUntil;

    //Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
    do
    {
        timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(6) ? context.CurrentUtcDateTime.AddDays(6) : validUntilUtc;
        log.LogInformation($"Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Path}. Currently {context.CurrentUtcDateTime}.");
        await context.CreateTimer(calculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
    } while (validUntilUtc > timeoutUntil);

    await context.CallActivityAsync("DeleteBlob", blobData);
}

The function uses a durable function timer to delay execute deletion of the blob from azure storage.

DeleteBlob

[FunctionName("DeleteBlob")]
public async Task DeleteBlob([ActivityTrigger] DataBusBlobData blobData, ILogger log)
{
    var blob = await cloudBlobClient.GetBlobReferenceFromServerAsync(new Uri(blobData.Path));
    log.LogInformation($"Deleting blob at {blobData.Path}");
    await blob.DeleteIfExistsAsync();
}

The function is executing the actual work to delete a blob.

Configuring time to live for large binary objects

The default time to live for all large binary objects is configured by setting the DefaultTimeToLiveInSeconds environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
     "DefaultTimeToLiveInSeconds":  "180"
  }
}

In production this is set using an applications settings value named DefaultTimeToLiveInSeconds in the Azure portal.

A message with a set time to be received will override the default time to live for the large binary object and instead use this value when determining the time to clean up the blob.

Configuring the data bus location

The DataBusBlobCleanupFunctions project needs to access the large binary objects. This is done by specifying an Azure storage connection string in the DataBusStorageAccount environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
    "DataBusStorageAccount": "UseDevelopmentStorage=true"
  }
}

In production this is set using an applications settings value named DataBusStorageAccount in the Azure portal.

Migrating existing projects

In environments where NServiceBus.DataBus.AzureBlobStorage is already in use the timeout function will need to be triggered for the existing attachments.

A manually-triggered function called DataBusOrchestrateExistingBlobs is included to trigger orchestration for every existing blob in the container. It's an HTTP triggered function that can be invoked using a browser.

[FunctionName(nameof(DataBusOrchestrateExistingBlobs))]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
    var counter = 0;

    try
    {
        BlobContinuationToken token = null;
        do
        {
            var segment = await container.ListBlobsSegmentedAsync(token).ConfigureAwait(false);

            token = segment.ContinuationToken;

            foreach (var blockBlob in segment.Results.Where(blob => blob is CloudBlockBlob).Cast<CloudBlockBlob>())
            {
                var instanceId = blockBlob.Name;

                var existingInstance = await starter.GetStatusAsync(instanceId);
                if (existingInstance != null)
                {
                    log.LogInformation($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {blockBlob.Uri}.");
                    continue;
                }

                var validUntilUtc = calculator.GetValidUntil(blockBlob);

                if (validUntilUtc == DateTime.MaxValue)
                {
                    log.LogError($"Could not parse the 'ValidUntilUtc' value for blob {blockBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
                    continue;
                }

                await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
                {
                    Path = blockBlob.Uri.ToString(),
                    ValidUntilUtc = calculator.ToWireFormattedString(validUntilUtc)
                });

                counter++;
            }

        } while (token != null);
    }
    catch (Exception exception)
    {
        var result = new ObjectResult(exception.Message)
        {
            StatusCode = (int) HttpStatusCode.InternalServerError
        };

        return result;
    }

    var message = "DataBusOrchestrateExistingBlobs has completed." + (counter > 0 ? $" {counter} blob{(counter > 1 ? "s" : string.Empty)} will be tracked for clean-up." : string.Empty);

    return new OkObjectResult(message);
}

The function is very similar to the DataBusBlobCreated function, but instead of working on a single blob at a time it will iterate over every existing blob in the container.

This function does not require downtime as the implemented singleton orchestration pattern will prevent existing timeouts from being duplicated.

SenderAndReceiver project

The project sends the MessageWithLargePayload message to itself, utilizing the NServiceBus attachment mechanism.

Related Articles

  • Data Bus
    How to handle messages that are too large to be sent by a transport natively.

Last modified