Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Using NServiceBus and Kafka in Azure Functions (isolated)

NuGet Package: NServiceBus.AzureFunctions.Worker.ServiceBus (4.x)
Target Version: NServiceBus 8.x

This sample shows how to process Kafka events using an Azure Functions trigger and how to follow up on those events using an NServiceBus SendOnly endpoint in an Azure Function.

Kafka and NServiceBus

Kafka is an event streaming broker, similar to Azure Event Hubs. Event streaming brokers are used to store massive amounts of incoming events, for example from IoT devices. In contrast, NServiceBus works on top of messaging brokers like Azure Service Bus, RabbitMQ, and Amazon SQS/SNS. They can complement each other as shown in this sample consisting of two projects.

  • The ConsoleEndpoint is the starting point of the sample, which produces numerous Kafka events.
  • The Azure Function uses a Kafka trigger to consume the events and send NServiceBus messages back to the ConsoleEndpoint via Azure ServiceBus.

For more information about Kafka and NServiceBus, read the blogpost Let's talk about Kafka.

Prerequisites

Service bus connection string

To use the sample, a valid Service Bus connection string must be provided in the local.settings.json file in the AzureFunctions.KafkaTrigger.FunctionsHostBuilder project and as an environment variable named AzureServiceBus_ConnectionString.

Kafka broker

This sample requires Kafka to store the produced events.

Setting up a Kafka Docker container

The sample contains a docker-compose.yml file to set up a Docker container with Kafka. From a CLI like PowerShell, use the following command in the solution folder to download and execute a Docker image:

docker-compose up

Sample structure

The sample contains the following projects:

  • AzureFunctions.KafkaTrigger.FunctionsHostBuilder - Azure function with Kafka trigger and NServiceBus SendOnly endpoint
  • AzureFunctions.Messages - message definitions
  • ConsoleEndpoint - NServiceBus endpoint and Kafka producer

Running the sample

The solution requires both AzureFunctions.KafkaTrigger.FunctionsHostBuilder and ConsoleEndpoint to run.

After the sample is running with both projects:

  1. Press ENTER in the ConsoleEndpoint window to start producing Kafka events.
  2. The Azure Function will consume each Kafka event and check if it contains information indicating a specific threshold has been reached. When it has, the function will send a FollowUp Azure Service Bus message to the ConsoleEndpoint using NServiceBus.
  3. The console window ConsoleEndpoint will receive the FollowUp message and process it with NServiceBus.

Code walk-through

The project ConsoleEndpoint produces the events as follows:

var electricityUsage = new ElectricityUsage() { CustomerId = 42, CurrentUsage = i, UnitId = 1337 };

var message = new Message<string, string>
{
    Value = ElectricityUsage.Serialize(electricityUsage)
};

var deliveryResult = await producer.ProduceAsync("myKafkaTopic", message);

Kafka events can only contain strings for values. The Kafka SDK for .NET supports using schemas and serialization of events, but for simplicity, the ElectricityUsage event is serialized using Newtonsoft.Json.

Kafka trigger

A Kafka trigger in project AzureFunctions.KafkaTrigger.FunctionsHostBuilder consumes the event and checks if the electricity usage for any customers and any of a customer's units goes over a specified threshold. If so, an NServiceBus SendOnly endpoint is used to send a message. The following code shows how to set up an NServiceBus endpoint in Azure Functions and register the IMessageSession instance with the dependency injection container:

var host = new HostBuilder()
    .ConfigureServices(async services =>
    {
        var cfg = new EndpointConfiguration("SendOnly");
        cfg.SendOnly();
        cfg.UseSerialization<SystemJsonSerializer>();

        var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsServiceBus");
        var transport = new AzureServiceBusTransport(connectionString);
        var routing = cfg.UseTransport(transport);

        routing.RouteToEndpoint(typeof(FollowUp), "Samples.KafkaTrigger.ConsoleEndpoint");

        var endpoint = await Endpoint.Start(cfg);

        // Inject the endpoint in the DI container
        services.AddSingleton<IMessageSession>(endpoint);
    })
    .ConfigureFunctionsWorkerDefaults()
    .Build();

In the Kafka trigger, it checks for an event value of 42 (hard-coded for simplicity sake), in which case, a message is sent back to the ConsoleEndpoint.

[Function(nameof(ElectricityUsage))]
public async Task ElectricityUsage([KafkaTrigger("LocalKafkaBroker", "topic", ConsumerGroup = "$Default")] string eventData,
        FunctionContext context)
{

    var eventValue = JObject.Parse(eventData)["Value"]?.ToString();
    var electricityUsage = Messages.KafkaMessages.ElectricityUsage.Deserialize(eventValue);

    logger.LogInformation("Received Kafka event with usage: {CurrentUsage}", electricityUsage.CurrentUsage);

    if (IsUsageAboveAverage(electricityUsage.CustomerId, electricityUsage.UnitId, electricityUsage.CurrentUsage))
    {
        var message = new FollowUp
        {
            CustomerId = electricityUsage.CustomerId,
            UnitId = electricityUsage.UnitId,
            Description = $"Usage over monthly average at [{electricityUsage.CurrentUsage}] units"
        };

        await messageSession.Send(message);
    }
}

The FollowUp message is then received in the ConsoleEndpoint in the NServiceBus FollowUpHandler message handler.

Related Articles