Using NServiceBus in Azure Functions with Storage Queue triggers

Component: NServiceBus.AzureFunctions.StorageQueues
NuGet Package NServiceBus.AzureFunctions.StorageQueues (0.x)
This is a Preview project
Target NServiceBus Version: 7.x

This sample shows how to host NServiceBus within an Azure Function, in this case, a function triggered by an incoming Storage Queues message. This enables hosting message handlers in Azure Functions, gaining the abstraction of message handlers implemented using IHandleMessages<T> and also taking advantage of NServiceBus's extensible message processing pipeline.

The sample demonstrates two configuration approaches that achieve the same outcome:

  1. Integrating with the IFunctionHostBuilder, including the host managed DI container.
  2. Configuring the endpoint inside the trigger class as a static field.

When hosting NServiceBus within Azure Functions, each Function (as identified by the [FunctionName] attribute) hosts an NServiceBus endpoint that is capable of processing multiple different message types.

The Azure Functions SDK enforces certain constraints that are also applied to NServiceBus endpoints. Review these constraints before running the sample.

Prerequisites

Unlike a traditional NServiceBus endpoint, an endpoint hosted in Azure Functions cannot create its own input queue. In this sample, that queue name is ASQTriggerQueue.

To create the queue with the Azure CLI, execute the following Azure CLI command:

 az storage queue create --name ASQTriggerQueue --connection-string "<storage-account-connection-string>"

To use the sample, a valid Queue Storage connection string must be provided in the local.settings.json file.

Sample structure

The sample contains the following projects:

  • AzureFunctions.ASQTrigger.FunctionsHostBuilder - Using the IFunctionHostBuilder approach to host the NServiceBus endpoint
  • AzureFunctions.ASQTrigger.Static - Using a static approach to host the NServiceBus endpoint
AzureFunctions.ASQTrigger.FunctionsHostBuilder and AzureFunctions.ASQTrigger.Static are both using the same trigger queue and should not be executed simultaneously.

Running the sample

Each Functions project contains two functions:

  1. Storage Queue-triggered function.
  2. HTTP-triggered function.

Running the sample will launch the Azure Functions runtime window.

To try the Azure Function:

  1. Open a browser and navigate to http://localhost:7071/api/HttpSender. The port number might be different and will be indicated when the function project is started.
  2. The queue-triggered function will receive the TriggerMessage and process it with NServiceBus.
  3. The NServiceBus message handler for TriggerMessage sends a FollowUpMessage.
  4. The queue-triggered function will receive the FollowUpMessage and process it with NServiceBus.

Code walk-through

IFunctionHostBuilder approach

The NServiceBus endpoint configured using IFunctionHostBuilder is using the convention and is wired using Startup class like this:

[assembly: FunctionsStartup(typeof(Startup))]

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        var services = builder.Services;

        // register custom service in the container
        services.AddSingleton(_ =>
        {
            var configurationRoot = builder.GetContext().Configuration;
            var customComponentInitializationValue = configurationRoot.GetValue<string>("CustomComponentValue");

            return new CustomComponent(customComponentInitializationValue);
        });

        builder.UseNServiceBus(() =>
        {
            var configuration = new StorageQueueTriggeredEndpointConfiguration(AzureStorageQueueTriggerFunction.EndpointName);

            configuration.UseSerialization<NewtonsoftSerializer>();
            
            // Disable persistence requirement
            configuration.Transport.DisablePublishing();

            // optional: log startup diagnostics using Functions provided logger
            configuration.LogDiagnostics();

            return configuration;
        });
    }
}

IFunctionEndpoint is then injected into the function class:

public AzureStorageQueueTriggerFunction(IFunctionEndpoint endpoint)
{
    this.endpoint = endpoint;
}

It is invoked in the following manner:

[FunctionName(EndpointName)]
public async Task QueueTrigger(
    [QueueTrigger(EndpointName)]
    CloudQueueMessage message,
    ILogger logger,
    ExecutionContext context)
{
    await endpoint.Process(message, context, logger);
}

Static approach

The static NServiceBus endpoint must be configured using details that come from the Azure Functions ExecutionContext. Since that is not available until a message is handled by the function, the NServiceBus endpoint instance is deferred until the first message is processed, using a lambda expression like this:

private static readonly IFunctionEndpoint endpoint = new FunctionEndpoint(executionContext =>
{
    // endpoint name, logger, and connection strings are automatically derived from FunctionName and QueueTrigger attributes
    var configuration = StorageQueueTriggeredEndpointConfiguration.FromAttributes();

    configuration.UseSerialization<NewtonsoftSerializer>();

    // optional: log startup diagnostics using Functions provided logger
    configuration.LogDiagnostics();

    // Disable persistence requirement
    configuration.Transport.DisablePublishing();

    configuration.AdvancedConfiguration.RegisterComponents(r => r.ConfigureComponent(() =>
    {
        var customComponentInitializationValue = Environment.GetEnvironmentVariable("CustomComponentValue");
        return new CustomComponent(customComponentInitializationValue);
    }, DependencyLifecycle.SingleInstance));

    return configuration;
});

The same class defines the Azure Function which makes up the hosting for the NServiceBus endpoint. The Function hands off processing of the message to NServiceBus:

[FunctionName(EndpointName)]
public static async Task QueueTrigger(
    [QueueTrigger(EndpointName)]
    CloudQueueMessage message,
    ILogger logger,
    ExecutionContext context)
{
    await endpoint.Process(message, context, logger);
}

Meanwhile, the message handlers for TriggerMessage and FollowUpMessage, also hosted within the Azure Functions project, are normal NServiceBus message handlers, which are also capable of sending messages themselves.

Handlers

Both approaches use the same message handlers, with a CustomDependency passed in.

public class TriggerMessageHandler : IHandleMessages<TriggerMessage>
{
    static readonly ILog Log = LogManager.GetLogger<TriggerMessageHandler>();

    readonly CustomComponent customComponent;

    public TriggerMessageHandler(CustomComponent customComponent)
    {
        this.customComponent = customComponent;
    }

    public Task Handle(TriggerMessage message, IMessageHandlerContext context)
    {
        Log.Warn($"Handling {nameof(TriggerMessage)} in {nameof(TriggerMessageHandler)}");
        Log.Warn($"Custom component returned: {customComponent.GetValue()}");
        return context.SendLocal(new FollowupMessage());
    }
}
public class FollowupMessageHandler : IHandleMessages<FollowupMessage>
{
    static readonly ILog Log = LogManager.GetLogger<FollowupMessageHandler>();

    readonly CustomComponent customComponent;

    public FollowupMessageHandler(CustomComponent customComponent)
    {
        this.customComponent = customComponent;
    }

    public Task Handle(FollowupMessage message, IMessageHandlerContext context)
    {
        Log.Warn($"Handling {nameof(FollowupMessage)} in {nameof(FollowupMessageHandler)}.");
        Log.Warn($"Custom component returned: {customComponent.GetValue()}");
        return Task.CompletedTask;
    }
}

Related Articles


Last modified