Getting Started
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Previews
Samples

Azure Blob Storage Data Bus Cleanup with Azure Functions

This sample shows how to use Azure Functions to automatically trigger blob cleanup.

Prerequisites

  1. Azure Functions Tools for Visual Studio
  2. Azurite Emulator

Running the sample

  1. Start Azurite Emulator.
  2. Run the solution—two console applications will start.
  3. Switch to the console window with SenderAndReceiver in its path, and press enter to send a large message.

Code walk-through

This sample contains two projects:

  • SenderAndReceiver—a console application which sends and receives a large message.
  • DataBusBlobCleanupFunctions—an Azure Functions project with three Azure Functions that perform cleanup.

SenderAndReceiver

This project sends a MessageWithLargePayload to itself. The message is sent using an attachment stored in Azure Storage.

DatabusBlobCleanupFunctions

DataBusBlobCreated

This Azure Function is triggered when a blob is created or updated in the data bus path in the storage account.

[FunctionName(nameof(DataBusBlobCreated))]
public async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")] CloudBlockBlob myBlob, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
    log.LogInformation($"Blob created at {myBlob.Uri}");

    var instanceId = myBlob.Name;

    var existingInstance = await starter.GetStatusAsync(instanceId);
    if (existingInstance != null)
    {
        log.LogInformation($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {myBlob.Uri}.");
        return;
    }

    var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(myBlob);

    if (validUntilUtc == DateTime.MaxValue)
    {
        log.LogError($"Could not parse the 'ValidUntil' value for blob {myBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
        return;
    }

    await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
    {
        Path = myBlob.Uri.ToString(),
        ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
    });
}

To prevent multiple timeouts from starting, the function uses the singleton orchestration pattern, using the blob name, when starting the DataBusCleanupOrchestrator function.

The GetValidUntil method imitates the behavior of the NServiceBus.DataBus.AzureBlobStorage package.

public static DateTime GetValidUntil(CloudBlockBlob blockBlob)
{
    if (blockBlob.Metadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
    {
        return ToUtcDateTime(validUntilUtcString);
    }

    return DateTime.MaxValue;
}

The method looks for a previously set timeout value in the blob metadata. If none is found, the default time to live (DateTime.MaxValue) is returned.

The timeout value is passed to the DataBusCleanupOrchestrator function.

DataBusCleanupOrchestrator

[FunctionName(nameof(DataBusCleanupOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
    var blobData = context.GetInput<DataBusBlobData>();

    log.LogInformation($"Orchestrating deletion for blob at {blobData.Path} with ValidUntilUtc of {blobData.ValidUntilUtc}");

    var validUntilUtc = DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc);

    DateTime timeoutUntil;

    //Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
    do
    {
        timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(6) ? context.CurrentUtcDateTime.AddDays(6) : validUntilUtc;
        log.LogInformation($"Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Path}. Currently {context.CurrentUtcDateTime}.");
        await context.CreateTimer(DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
    } while (validUntilUtc > timeoutUntil);

    await context.CallActivityAsync("DeleteBlob", blobData);
}

The function uses a durable function timer to delete the blob from Azure Storage after the timeout period has elapsed.

DeleteBlob

[FunctionName("DeleteBlob")]
public async Task DeleteBlob([ActivityTrigger] DataBusBlobData blobData, ILogger log)
{
    var blob = await cloudBlobClient.GetBlobReferenceFromServerAsync(new Uri(blobData.Path));
    log.LogInformation($"Deleting blob at {blobData.Path}");
    await blob.DeleteIfExistsAsync();
}

The function is executing the actual work to delete a blob.

Configuring time to live for large binary objects

The default time to live for all large binary objects is configured by setting the DefaultTimeToLiveInSeconds environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
     "DefaultTimeToLiveInSeconds":  "180"
  }
}

In production this is set using an applications settings value named DefaultTimeToLiveInSeconds in the Azure portal.

If a message has a specific time to be received, that value overrides the default time to live, and it will be used to determine when to clean up the blob.

Configuring the data bus location

The DataBusBlobCleanupFunctions project requires access to the large binary objects. This is provided by an Azure Storage connection string in the DataBusStorageAccount environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
    "DataBusStorageAccount": "UseDevelopmentStorage=true"
  }
}

In production this is set using an applications settings value named DataBusStorageAccount in the Azure portal.

Migrating existing projects

In environments where NServiceBus.DataBus.AzureBlobStorage is already in use, the timeout function must be triggered for the existing attachments.

DataBusOrchestrateExistingBlobs is used to trigger orchestration for every existing blob in the container. It's an HTTP triggered function that can be invoked manually using a browser.

[FunctionName(nameof(DataBusOrchestrateExistingBlobs))]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
{
    var counter = 0;

    try
    {
        BlobContinuationToken token = null;
        do
        {
            var segment = await container.ListBlobsSegmentedAsync(token).ConfigureAwait(false);

            token = segment.ContinuationToken;

            foreach (var blockBlob in segment.Results.Where(blob => blob is CloudBlockBlob).Cast<CloudBlockBlob>())
            {
                var instanceId = blockBlob.Name;

                var existingInstance = await starter.GetStatusAsync(instanceId);
                if (existingInstance != null)
                {
                    log.LogInformation($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {blockBlob.Uri}.");
                    continue;
                }

                var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(blockBlob);

                if (validUntilUtc == DateTime.MaxValue)
                {
                    log.LogError($"Could not parse the 'ValidUntilUtc' value for blob {blockBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
                    continue;
                }

                await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
                {
                    Path = blockBlob.Uri.ToString(),
                    ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
                });

                counter++;
            }

        } while (token != null);
    }
    catch (Exception exception)
    {
        var result = new ObjectResult(exception.Message)
        {
            StatusCode = (int)HttpStatusCode.InternalServerError
        };

        return result;
    }

    var message = "DataBusOrchestrateExistingBlobs has completed." + (counter > 0 ? $" {counter} blob{(counter > 1 ? "s" : string.Empty)} will be tracked for clean-up." : string.Empty);

    return new OkObjectResult(message);
}

The function is very similar to the DataBusBlobCreated function, but instead of working on a single blob, it iterates over every blob in the container.

This function does not require downtime as the implemented singleton orchestration pattern prevents existing timeouts from being duplicated.

Related Articles

  • Data Bus
    How to handle messages that are too large to be sent by a transport natively.

Last modified