This sample shows how to process Kafka events using an Azure Functions trigger and how to follow up on those events using an NServiceBus SendOnly endpoint in an Azure Function.
Kafka and NServiceBus
Kafka is an event streaming broker, similar to Azure Event Hubs. Event streaming brokers are used to store massive amounts of incoming events, for example from IoT devices. In contrast, NServiceBus works on top of messaging brokers like Azure Service Bus, RabbitMQ, and Amazon SQS/SNS. They can complement each other as shown in this sample consisting of two projects.
- The ConsoleEndpoint is the starting point of the sample, which produces numerous Kafka events.
- The Azure Function uses a Kafka trigger to consume the events and send NServiceBus messages back to the ConsoleEndpoint via Azure ServiceBus.
For more information about Kafka and NServiceBus, read the blogpost Let's talk about Kafka.
Prerequisites
Service bus connection string
To use the sample, a valid Service Bus connection string must be provided in the local.
file in the AzureFunctions.
project and as an environment variable named AzureServiceBus_ConnectionString
.
Kafka broker
This sample requires Kafka to store the produced events.
Setting up a Kafka Docker container
The sample contains a docker-compose.
file to set up a Docker container with Kafka. From a CLI like PowerShell, use the following command in the solution folder to download and execute a Docker image:
docker-compose up
When running Docker in Windows, it's possible to get an error saying no matching manifest for windows/amd64 in the manifest list entries. This can be solved by running the daemon in experimental mode.
Sample structure
The sample contains the following projects:
AzureFunctions.
- Azure function with Kafka trigger and NServiceBus SendOnly endpointKafkaTrigger. FunctionsHostBuilder AzureFunctions.
- message definitionsMessages ConsoleEndpoint
- NServiceBus endpoint and Kafka producer
Running the sample
The solution requires both AzureFunctions.
and ConsoleEndpoint
to run.
After the sample is running with both projects:
- Press ENTER in the
ConsoleEndpoint
window to start producing Kafka events. - The Azure Function will consume each Kafka event and check if it contains information indicating a specific threshold has been reached. When it has, the function will send a
FollowUp
Azure Service Bus message to the ConsoleEndpoint using NServiceBus. - The console window
ConsoleEndpoint
will receive theFollowUp
message and process it with NServiceBus.
Code walk-through
The project ConsoleEndpoint
produces the events as follows:
var electricityUsage = new ElectricityUsage() { CustomerId = 42, CurrentUsage = i, UnitId = 1337 };
var message = new Message<string, string>
{
Value = ElectricityUsage.Serialize(electricityUsage)
};
var deliveryResult = await producer.ProduceAsync("myKafkaTopic", message);
Kafka events can only contain strings for values. The Kafka SDK for .NET supports using schemas and serialization of events, but for simplicity, the ElectricityUsage
event is serialized using Newtonsoft.Json.
Kafka trigger
A Kafka trigger in project AzureFunctions.
consumes the event and checks if the electricity usage for any customers and any of a customer's units goes over a specified threshold. If so, an NServiceBus SendOnly endpoint is used to send a message. The following code shows how to set up an NServiceBus endpoint in Azure Functions and register the IMessageSession
instance with the dependency injection container:
var host = new HostBuilder()
.ConfigureServices(async services =>
{
var cfg = new EndpointConfiguration("SendOnly");
cfg.SendOnly();
cfg.UseSerialization<SystemJsonSerializer>();
var connectionString = Environment.GetEnvironmentVariable("AzureWebJobsServiceBus");
var transport = new AzureServiceBusTransport(connectionString);
var routing = cfg.UseTransport(transport);
routing.RouteToEndpoint(typeof(FollowUp), "Samples.KafkaTrigger.ConsoleEndpoint");
var endpoint = await Endpoint.Start(cfg);
// Inject the endpoint in the DI container
services.AddSingleton<IMessageSession>(endpoint);
})
.ConfigureFunctionsWorkerDefaults()
.Build();
In the Kafka trigger, it checks for an event value of 42
(hard-coded for simplicity sake), in which case, a message is sent back to the ConsoleEndpoint
.
[Function(nameof(ElectricityUsage))]
public async Task ElectricityUsage([KafkaTrigger("LocalKafkaBroker", "topic", ConsumerGroup = "$Default")] string eventData,
FunctionContext context)
{
var eventValue = JObject.Parse(eventData)["Value"]?.ToString();
var electricityUsage = Messages.KafkaMessages.ElectricityUsage.Deserialize(eventValue);
logger.LogInformation("Received Kafka event with usage: {CurrentUsage}", electricityUsage.CurrentUsage);
if (IsUsageAboveAverage(electricityUsage.CustomerId, electricityUsage.UnitId, electricityUsage.CurrentUsage))
{
var message = new FollowUp
{
CustomerId = electricityUsage.CustomerId,
UnitId = electricityUsage.UnitId,
Description = $"Usage over monthly average at [{electricityUsage.CurrentUsage}] units"
};
await messageSession.Send(message);
}
}
The FollowUp
message is then received in the ConsoleEndpoint
in the NServiceBus FollowUpHandler
message handler.