Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Modernization
Samples

Avro Serializer sample

This sample uses Apache.Avro data serialization system to serialize and deserialize the messages.

Limitations

This sample has the following limitations:

Configuration

Configure the endpoint to use the Avro serializer as follows:

var endpointConfiguration = new EndpointConfiguration("Samples.Serialization.Avro");

endpointConfiguration.UseSerialization<AvroSerializer>();

Schema registry

The sample expects the message schema to be present as an embedded resource in the same folder where the message type resides. The schemas are read on startup and cached into a SchemaRegistry. This choice was made to simplify the use of this sample. In production, it's recommended to use a central registry like:

Code

The serializer is implemented as a custom serializer by creating a SerializerDefinition:

public class AvroSerializer : SerializationDefinition
{
    public override Func<IMessageMapper, IMessageSerializer> Configure(IReadOnlySettings settings)
    {
        var registry = settings.Get<MessageMetadataRegistry>();
        var messageTypes = registry.GetAllMessages().Select(m => m.MessageType);
        var schemaCache = new SchemaRegistry();
        var assembly = Assembly.GetExecutingAssembly();

        foreach (var messageType in messageTypes)
        {
            var manifestNamespace = "Sample.";
            var schemaResourceName = manifestNamespace + messageType.Name + ".avsc";
            using var stream = assembly.GetManifestResourceStream(schemaResourceName);

            if (stream == null)
            {
                throw new InvalidOperationException(
                    $"Resource '{schemaResourceName}' not found in assembly '{assembly.FullName}'.");
            }

            // Load the schema from the embedded resource
            using var reader = new StreamReader(stream);
            var schemaJson = reader.ReadToEnd();

            // Parse and cache the schema
            schemaCache.Add(messageType, Schema.Parse(schemaJson));
        }

        return _ => new AvroMessageSerializer(schemaCache, new ClassCache());
    }
}

and also implementing the message serializer interface:

public class AvroMessageSerializer(SchemaRegistry schemaRegistry, ClassCache classCache) : IMessageSerializer
{
    public string ContentType => "avro/json";

    public void Serialize(object message, Stream stream)
    {
        var messageType = message.GetType();
        var schema = schemaRegistry.GetSchema(messageType);
        var writer = new ReflectDefaultWriter(messageType, schema, classCache);

        var encoder = new JsonEncoder(schema, stream);

        writer.Write(message, encoder);

        encoder.Flush();
    }

    public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null)
    {
        if (messageTypes == null)
        {
            throw new MessageDeserializationException(
                "Avro is not able to infer message types from the body content only," +
                "the NServiceBus.EnclosedMessageTypes header must be present");
        }

        var messages = new List<object>();
        foreach (var messageType in messageTypes)
        {
            try
            {
                var schema = schemaRegistry.GetSchema(messageType);
                var reader = new ReflectDefaultReader(messageType, schema, schema, classCache);
                using var stream = new ReadOnlyStream(body);
                var message = reader.Read(null, schema, schema, new JsonDecoder(schema, stream));
                messages.Add(message);
            }
            catch (KeyNotFoundException)
            {
                throw new MessageDeserializationException(
                    $"No schema found for message type {messageType.FullName}");
            }
        }

        return messages.ToArray();
    }
}

Sending the message

Prepare and send an order message with sample data:

var message = new CreateOrder
{
    OrderId = 9,
    Date = DateTime.Now,
    CustomerId = 12,
    OrderItems =
    [
        new OrderItem
        {
            ItemId = 6,
            Quantity = 2
        },

        new OrderItem
        {
            ItemId = 5,
            Quantity = 4
        }
    ]
};

await messageSession.SendLocal(message, cancellationToken: stoppingToken);

Output

The serialized message output appears as follows:

{
  "OrderId": 9,
  "Date": "2015-09-15T10:23:44.9367871+10:00",
  "CustomerId": 12,
  "OrderItems": [
    {
      "ItemId": 6,
      "Quantity": 2
    },
    {
      "ItemId": 5,
      "Quantity": 4
    }
  ]
}

Related Articles

  • Serialization
    .NET messaging systems require serialization and deserialization of objects sent/received over transports. NServiceBus achieves this using serializers.