Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Azure Blob Storage Data Bus Cleanup with Azure Functions

NuGet Package: NServiceBus.DataBus.AzureBlobStorage (6.x)
Target Version: NServiceBus 9.x

This sample shows how to use Azure Functions to automatically trigger blob cleanup.

Prerequisites

  1. Azure Functions Tools for Visual Studio
  2. Azurite Emulator

Running the sample

  1. Start Azurite Emulator.
  2. Run the solution—two console applications will start.
  3. Switch to the console window with SenderAndReceiver in its path, and press enter to send a large message.

Code walk-through

This sample contains two projects:

  • SenderAndReceiver—a console application which sends and receives a large message.
  • DataBusBlobCleanupFunctions—an Azure Functions project with three Azure Functions that perform cleanup.

SenderAndReceiver

This project sends a MessageWithLargePayload to itself. The message is sent using an attachment stored in Azure Storage.

DatabusBlobCleanupFunctions

DataBusBlobCreated

This Azure Function is triggered when a blob is created or updated in the data bus path in the storage account.

[Function(nameof(DataBusBlobCreated))]
public async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")] Stream blob, string name, Uri uri, IDictionary<string, string> metadata, [DurableClient] DurableTaskClient durableTaskClient, CancellationToken cancellationToken)
{
    logger.LogInformation("Blob created at {uri}", uri);

    var instanceId = name;
    var existingInstance = await durableTaskClient.GetInstanceAsync(instanceId, cancellationToken);

    if (existingInstance != null)
    {
        logger.LogInformation("{DataBusCleanupOrchestratorName} has already been started for blob {uri}.", DataBusCleanupOrchestratorName, uri);
        return;
    }

    var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(metadata);

    if (validUntilUtc == DateTime.MaxValue)
    {
        logger.LogError("Could not parse the 'ValidUntil' value for blob {uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.", uri);
        return;
    }

    await durableTaskClient.ScheduleNewOrchestrationInstanceAsync(DataBusCleanupOrchestratorName, new DataBusBlobData
    {
        Name = name,
        ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
    },
    new StartOrchestrationOptions()
    {
        InstanceId = instanceId
    }, cancellationToken);
}

To prevent multiple timeouts from starting, the function uses the singleton orchestration pattern, using the blob name, when starting the DataBusCleanupOrchestrator function.

The GetValidUntil method imitates the behavior of the NServiceBus.DataBus.AzureBlobStorage package.

public static DateTime GetValidUntil(IDictionary<string, string> blobMetadata)
{
    if (blobMetadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
    {
        return ToUtcDateTime(validUntilUtcString);
    }

    return DateTime.MaxValue;
}

The method looks for a previously set timeout value in the blob metadata. If none is found, the default time to live (DateTime.MaxValue) is returned.

The timeout value is passed to the DataBusCleanupOrchestrator function.

DataBusCleanupOrchestrator

[Function(nameof(DataBusCleanupOrchestrator))]
public async Task RunOrchestrator([OrchestrationTrigger] TaskOrchestrationContext context)
{
    var blobData = context.GetInput<DataBusBlobData>();

    logger.LogInformation("Orchestrating deletion for blob at {name} with ValidUntilUtc of {validUntilUtc}", blobData.Name, blobData.ValidUntilUtc);

    var validUntilUtc = DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc);

    DateTime timeoutUntil;

    //Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
    do
    {
        timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(6) ? context.CurrentUtcDateTime.AddDays(6) : validUntilUtc;

        logger.LogInformation("Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Name}. Currently {context.CurrentUtcDateTime}.", timeoutUntil, validUntilUtc, blobData.Name, context.CurrentUtcDateTime);

        await context.CreateTimer(DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
    } while (validUntilUtc > timeoutUntil);

    await context.CallActivityAsync("DeleteBlob", blobData);
}

The function uses a durable function timer to delete the blob from Azure Storage after the timeout period has elapsed.

DeleteBlob

[Function("DeleteBlob")]
public async Task DeleteBlob([ActivityTrigger] DataBusBlobData blobData)
{
    var blob = containerClient.GetBlobClient(blobData.Name);

    logger.LogInformation($"Deleting blob at {blobData.Name}");

    await blob.DeleteIfExistsAsync();
}

The function is executing the actual work to delete a blob.

Configuring time to live for large binary objects

If a message has a specific time to be received, that value will be used to determine when to clean up the blob.

Configuring the data bus location

The DataBusBlobCleanupFunctions project requires access to the large binary objects. This is provided by an Azure Storage connection string in the DataBusStorageAccount environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
    "DataBusStorageAccount": "UseDevelopmentStorage=true"
  }
}

In production this is set using an applications settings value named DataBusStorageAccount in the Azure portal.

Migrating existing projects

In environments where NServiceBus.DataBus.AzureBlobStorage is already in use, the timeout function must be triggered for the existing attachments.

DataBusOrchestrateExistingBlobs is used to trigger orchestration for every existing blob in the container. It's an HTTP triggered function that can be invoked manually using a browser.

[Function(nameof(DataBusOrchestrateExistingBlobs))]
public async Task<IActionResult> Run([HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequest req, [DurableClient] DurableTaskClient durableTaskClient, CancellationToken cancellationToken)
{
    var counter = 0;

    try
    {
        var segment = blobContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, cancellationToken: cancellationToken).AsPages();

        await foreach (var blobPage in segment)
        {
            foreach (var blobItem in blobPage.Values)
            {
                var instanceId = blobItem.Name;

                var existingInstance = await durableTaskClient.GetInstanceAsync(instanceId, cancellationToken);

                if (existingInstance != null)
                {
                    logger.LogInformation("{name} has already been started for blob {blobItemName}.", nameof(DataBusCleanupOrchestrator), blobItem.Name);
                    continue;
                }

                var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(blobItem.Metadata);

                if (validUntilUtc == DateTime.MaxValue)
                {
                    logger.LogError("Could not parse the 'ValidUntilUtc' value for blob {name}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.", blobItem.Name);
                    continue;
                }

                await durableTaskClient.ScheduleNewOrchestrationInstanceAsync(nameof(DataBusCleanupOrchestrator), new DataBusBlobData
                {
                    Name = blobItem.Name,
                    ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
                },
                new StartOrchestrationOptions()
                {
                    InstanceId = instanceId
                }, cancellationToken);

                counter++;
            }
        }
    }
    catch (Exception exception)
    {
        var result = new ObjectResult(exception.Message)
        {
            StatusCode = (int)HttpStatusCode.InternalServerError
        };

        return result;
    }

    var message = "DataBusOrchestrateExistingBlobs has completed." + (counter > 0 ? $" {counter} blob{(counter > 1 ? "s" : string.Empty)} will be tracked for clean-up." : string.Empty);

    return new OkObjectResult(message);
}

The function is very similar to the DataBusBlobCreated function, but instead of working on a single blob, it iterates over every blob in the container.

This function does not require downtime as the implemented singleton orchestration pattern prevents existing timeouts from being duplicated.

Related Articles