Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Using NServiceBus and Kafka in Azure Functions (isolated)

This sample shows how to process Kafka events using an Azure Functions trigger and how to follow up on those events using an NServiceBus SendOnly endpoint in an Azure Function.

Kafka and NServiceBus

Kafka is an event streaming broker, similar to Azure Event Hubs. Event streaming brokers are used to store massive amounts of incoming events, for example from IoT devices. NServiceBus on the other hand works on top of messaging brokers like Azure Service Bus, RabbitMQ, and Amazon SQS/SNS. They can complement each other as shown in this sample, which has two projects.

  • The ConsoleEndpoint is the starting point of the sample, which produces numerous Kafka events.
  • The Azure Function uses a Kafka trigger to consume the events and send NServiceBus messages back to the ConsoleEndpoint via Azure ServiceBus.

For more information about Kafka and NServiceBus read the blogpost Let's talk about Kafka.

Prerequisites

Configure Connection string

To use the sample a valid Service Bus connection string must be provided in the local.settings.json file in the AzureFunctions.KafkaTrigger.FunctionsHostBuilder project and as an environment variable named AzureServiceBus_ConnectionString

Kafka broker

This sample requires Kafka available to store the events being produced.

Set up Kafka Docker container

To set up a Docker container with Kafka, the sample contains a docker-compose.yml file. From a CLI like PowerShell, using the following command in the solution folder, a Docker image can be downloaded and executed:

docker-compose up
When running Docker in Windows, it's possible to get an error saying no matching manifest for windows/amd64 in the manifest list entries. This can be solved by running the daemon in experimental mode.

Sample structure

The sample contains the following projects:

  • AzureFunctions.KafkaTrigger.FunctionsHostBuilder - Azure function with Kafka trigger and NServiceBus SendOnly endpoint
  • AzureFunctions.Messages - message definitions
  • ConsoleEndpoint - NServiceBus endpoint and Kafka producer

Running the sample

The solution requires both AzureFunctions.KafkaTrigger.FunctionsHostBuilder and ConsoleEndpoint to run.

After the sample is running with both projects:

  1. Press ENTER in the ConsoleEndpoint window to start producing Kafka events.
  2. The Azure Function will consume each Kafka event and check if it contains information that indicates a certain threshold has been reached. When it has, the function will send a FollowUp Azure Service Bus message to the ConsoleEndpoint using NServiceBus.
  3. The console window ConsoleEndpoint will receive the FollowUp message and process it with NServiceBus.

Code walk-through

The project ConsoleEndpoint produces the events as follows:

var electricityUsage = new ElectricityUsage() { CustomerId = 42, CurrentUsage = i, UnitId = 1337 };

var message = new Message<string, string>
{
    Value = ElectricityUsage.Serialize(electricityUsage)
};

var deliveryResult = await producer.ProduceAsync("myKafkaTopic", message);

Kafka events can only contains strings for values. The Kafka SDK for .NET supports using schemas and serialization of events, but for simplicity reasons the event ElectricityUsage is serialized using Newtonsoft.Json.

Kafka trigger

A Kafka trigger in project AzureFunctions.KafkaTrigger.FunctionsHostBuilder consumes the event and verifies if the electricity usage for any customers and any of its units goes over a certain threshold. If so an NServiceBus SendOnly endpoint is used to send the message. The following code shows how to set up an NServiceBus endpoint in Azure Functions and register the IMessageSession instance with the dependency injection container:

var host = new HostBuilder()
    .ConfigureServices(async services =>
    {
        var cfg = new EndpointConfiguration("SendOnly");
        cfg.SendOnly();

        var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsServiceBus");
        var transport = new AzureServiceBusTransport(connectionString);
        var routing = cfg.UseTransport(transport);

        routing.RouteToEndpoint(typeof(FollowUp), "Samples.KafkaTrigger.ConsoleEndpoint");

        var endpoint = await Endpoint.Start(cfg);

        // Inject the endpoint in the DI container
        services.AddSingleton<IMessageSession>(endpoint);
    })
    .ConfigureFunctionsWorkerDefaults()
    .Build();

In the Kafka trigger, again for simplicity reasons, it is verified if the event has the value of 42 and a message is send back to the ConsoleEndpoint.

[Function(nameof(ElectricityUsage))]
public async Task ElectricityUsage([KafkaTrigger("LocalKafkaBroker", "topic", ConsumerGroup = "$Default")] string eventData,
        FunctionContext context)
{

    var eventValue = JObject.Parse(eventData)["Value"]?.ToString();
    var electricityUsage = Messages.KafkaMessages.ElectricityUsage.Deserialize(eventValue);

    logger.LogInformation("Received Kafka event with usage: {CurrentUsage}", electricityUsage.CurrentUsage);

    if (IsUsageAboveAverage(electricityUsage.CustomerId, electricityUsage.UnitId, electricityUsage.CurrentUsage))
    {
        var message = new FollowUp
        {
            CustomerId = electricityUsage.CustomerId,
            UnitId = electricityUsage.UnitId,
            Description = $"Usage over monthly average at [{electricityUsage.CurrentUsage}] units"
        };

        await messageSession.Send(message);
    }
}

The FollowUp message is then received in the ConsoleEndpoint in the NServiceBus FollowUpHandler message handler.

Related Articles

  • Isolated Worker
    Using NServiceBus with the Azure Functions isolated worker hosting model.

Last modified