Azure Blob Storage DataBus Cleanup with Azure Functions

Component: Azure Blob Storage Databus
NuGet Package NServiceBus.DataBus.AzureBlobStorage (2.x)
Target NServiceBus Version: 7.x

NServiceBus.DataBus.AzureBlobStorage has a built-in cleanup mechanism to remove blobs after a configured timeout. By default this runs on every endpoint and can become slow under systems with higher volumes of messages with databus properties.

This sample shows how to use Azure Functions to automatically trigger blob cleanup as an alternative to using the NServiceBus.DataBus.AzureBlobStorage built-in cleanup mechanism. This has the advantage of reducing processing load on the endpoints, instead using the on-demand scaling of Azure Functions.

Prerequisites

  1. Make sure Azure Functions Tools for Visual Studio 2017 are setup correctly.
  2. Start Azure Storage Emulator. Ensure the latest version is installed.
  3. Run the solution. Two console applications start.
  4. Find the SenderAndReceiver application by looking for the one with SenderAndReceiver in its path and press enter to send a message. A message has been sent that is larger than the 4MB allowed by MSMQ. NServiceBus sends it as an attachment via Azure storage. The DataBusBlobCreated Azure Function runs in the Function window, followed by the DataBusCleanupOrchestrator, deleting the blob when the time to live for the message is reached.

Code walk-through

This sample contains two projects:

  • DataBusBlobCleanupFunctions - An Azure Function project that contains the three Azure Functions that perform the cleanup.
  • SenderAndReceiver - A console application responsible for sending and receiving the large message.

DataBusBlobCleanupFunctions

DataBusBlobCreated

The following Azure Function is included in this project that is triggered when a blob is created or updated in the data bus path in the storage account.

[FunctionName(nameof(DataBusBlobCreated))]
public static async Task Run([BlobTrigger("databus/{name}", Connection = "DataBusStorageAccount")]CloudBlockBlob myBlob, [OrchestrationClient] DurableOrchestrationClient starter, TraceWriter log)
{
    log.Info($"Blob created at {myBlob.Uri}");

    var instanceId = myBlob.Name;

    var existingInstance = await starter.GetStatusAsync(instanceId);
    if (existingInstance != null)
    {
        log.Info($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {myBlob.Uri}.");
        return;
    }

    var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(myBlob);

    if (validUntilUtc == DateTime.MaxValue)
    {
        log.Error($"Could not parse the 'ValidUntil' value `{myBlob.Metadata["ValidUntil"]}` for blob {myBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
        return;
    }

    await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
    {
        Path = myBlob.Uri.ToString(),
        ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
    });

}

The execution uses a singleton orchestration pattern using the blob name when starting the DataBusCleanupOrchestrator function. This prevents multiple timeouts from being started.

The GetValidUntil method uses logic that reproduces the cleanup functionality of the NServiceBus.DataBus.AzureBlobStorage package.

public static DateTime GetValidUntil(ICloudBlob blockBlob)
{
    if (blockBlob.Metadata.TryGetValue("ValidUntilUtc", out var validUntilUtcString))
    {
        return ToUtcDateTime(validUntilUtcString);
    }

    // no ValidUntil and no ValidUntilUtc will be considered non-expiring or whatever default ttl is set
    if (!blockBlob.Metadata.TryGetValue("ValidUntil", out var validUntilString))
    {
        if (!blockBlob.Properties.LastModified.HasValue)
        {
            return DateTime.UtcNow.Add(DefaultTtl);
        }

        try
        {
            return blockBlob.Properties.LastModified.Value.Add(DefaultTtl).UtcDateTime;
        }
        catch (ArgumentOutOfRangeException)
        {
            // fallback to now + defaultTtl
        }

        return DateTime.UtcNow.Add(DefaultTtl);
    }

    var style = DateTimeStyles.AssumeUniversal;
    if (!blockBlob.Metadata.ContainsKey("ValidUntilKind"))
    {
        style = DateTimeStyles.AdjustToUniversal;
    }

    //since this is the old version that could be written in any culture we cannot be certain it will parse so need to handle failure
    return DateTime.TryParse(validUntilString, null, style, out var validUntil)
        ? validUntil.ToUniversalTime()
        //If we can't parse the datetime then assume data corruption and return max time
        : DateTime.MaxValue;
}

The method evaluates the metadata of the blob looking for previously provided timeout values. If none are found the default time to live is calculated for the blob and returned.

The timeout value is passed in when the DataBusCleanupOrchestrator orchestration function is executed.

DataBusCleanupOrchestrator

[FunctionName(nameof(DataBusCleanupOrchestrator))]
public static async Task RunOrchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var blobData = context.GetInput<DataBusBlobData>();

    log.Info($"Orchestrating deletion for blob at {blobData.Path} with ValidUntilUtc of {blobData.ValidUntilUtc}");

    var validUntilUtc = DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc);

    DateTime timeoutUntil;

    //Timeouts currently have a 7 day limit, use 6 day loops until the wait is less than 6 days
    do
    {
        timeoutUntil = validUntilUtc > context.CurrentUtcDateTime.AddDays(2) ? context.CurrentUtcDateTime.AddDays(2) : validUntilUtc;
        log.Info($"Waiting until {timeoutUntil}/{validUntilUtc} for blob at {blobData.Path}. Currently {context.CurrentUtcDateTime}.");
        await context.CreateTimer(DataBusBlobTimeoutCalculator.ToUtcDateTime(blobData.ValidUntilUtc), CancellationToken.None);
    } while (validUntilUtc > timeoutUntil);

    var blob = await cloudBlobClient.GetBlobReferenceFromServerAsync(new Uri(blobData.Path));
    log.Info($"Deleting blob at {blobData.Path}");
    await blob.DeleteIfExistsAsync();
}

The function uses a durable function timer to delay execute deletion of the blob from azure storage.

Configuring time to live for large binary objects

The default time to live for all large binary objects is configured by setting the DefaultTimeToLiveInSeconds environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
     "DefaultTimeToLiveInSeconds":  "180"
  }
}

In production this is set using an applications settings value named DefaultTimeToLiveInSeconds in the Azure portal.

A message with a set time to be received will override the default time to live for the large binary object and instead use this value when determining the time to clean up the blob.

Configuring the DataBus location

The DataBusBlobCleanupFunctions project needs to access the large binary objects. This is done by specifying an Azure storage connection string in the DataBusStorageAccount environment variable. This can be set during debugging by adding the appropriate Values setting in the local.settings.json file:

{
  "IsEncrypted": false,
  "Values": {
    "DataBusStorageAccount": "UseDevelopmentStorage=true"
  }
}

In production this is set using an applications settings value named DataBusStorageAccount in the Azure portal.

Migrating existing projects

In environments where NServiceBus.DataBus.AzureBlobStorage is already in use the timeout function will need to be triggered for the existing attachments.

A manually-triggered function is included to trigger orchestration for every existing blob in the container.

[FunctionName(nameof(DataBusOrchestrateExistingBlobs))]
[NoAutomaticTrigger]
public static async Task Run(
    [OrchestrationClient] DurableOrchestrationClient starter,
    TraceWriter log)
{
    var storageConnectionString = Environment.GetEnvironmentVariable("DataBusStorageAccount");
    var cloudStorageAccount = CloudStorageAccount.Parse(storageConnectionString);
    var cloudBlobClient = cloudStorageAccount.CreateCloudBlobClient();
    var container = cloudBlobClient.GetContainerReference("databus");

    BlobContinuationToken token = null;
    do
    {
        var segment = await container.ListBlobsSegmentedAsync(token).ConfigureAwait(false);

        token = segment.ContinuationToken;

        foreach (var blockBlob in segment.Results.Where(blob => blob is CloudBlockBlob).Cast<CloudBlockBlob>())
        {
            var instanceId = blockBlob.Name;

            var existingInstance = await starter.GetStatusAsync(instanceId);
            if (existingInstance != null)
            {
                log.Info($"{nameof(DataBusCleanupOrchestrator)} has already been started for blob {blockBlob.Uri}.");
                return;
            }

            var validUntilUtc = DataBusBlobTimeoutCalculator.GetValidUntil(blockBlob);

            if (validUntilUtc == DateTime.MaxValue)
            {
                log.Error($"Could not parse the 'ValidUntil' value `{blockBlob.Metadata["ValidUntil"]}` for blob {blockBlob.Uri}. Cleanup will not happen on this blob. You may consider manually removing this entry if non-expiry is incorrect.");
                continue;
            }

            await starter.StartNewAsync(nameof(DataBusCleanupOrchestrator), instanceId, new DataBusBlobData
            {
                Path = blockBlob.Uri.ToString(),
                ValidUntilUtc = DataBusBlobTimeoutCalculator.ToWireFormattedString(validUntilUtc)
            });
        }

    } while (token != null);
}

The function is very similar to the DataBusBlobCreated function, but instead of working on a single blob at a time it will iterate over every existing blob in the container.

This function does not require downtime as the implemented singleton orchestration pattern will prevent existing timeouts from being duplicated.

SenderAndReceiver project

The project sends the MessageWithLargePayload message to itself, utilizing the NServiceBus attachment mechanism.

The built-in DataBus cleanup functionality for the endpoint is disabled by setting CleanupInterval to 0.

var dataBus = endpointConfiguration.UseDataBus<AzureDataBus>();
dataBus.ConnectionString("UseDevelopmentStorage=true");
dataBus.CleanupInterval(0);

Related Articles

  • DataBus
    How to handle messages that are too large to be sent by a transport.

Last modified