This sample showcases how serialization using Avro can work, but it is not a supported serializer and some use cases may not work. See our serializer documentation for a full list of supported serializers.
This sample uses Apache.Avro data serialization system to serialize and deserialize the messages.
Limitations
This sample has the following limitations:
- It does not support message types defined using C# interfaces
- It is not able to infer the message type from the payload like some other serializers; therefore, the
NServiceBus.
header must be present on all messages.EnclosedMessageTypes
Configuration
Configure the endpoint to use the Avro serializer as follows:
var endpointConfiguration = new EndpointConfiguration("Samples.Serialization.Avro");
endpointConfiguration.UseSerialization<AvroSerializer>();
Schema registry
The sample expects the message schema to be present as an embedded resource in the same folder where the message type resides. The schemas are read on startup and cached into a SchemaRegistry. This choice was made to simplify the use of this sample. In production, it's recommended to use a central registry like:
- Event Hubs Schema Registry
- Confluent Schema Registry
- AWS Glue Schema Registry
- Apicurio Registry
- Redpanda Schema Registry
When a schema is not found, a MessageDeserializationException
will be thrown, which will cause the message to be moved to the configured error queue without retries.
Code
The serializer is implemented as a custom serializer by creating a SerializerDefinition
:
public class AvroSerializer : SerializationDefinition
{
public override Func<IMessageMapper, IMessageSerializer> Configure(IReadOnlySettings settings)
{
var registry = settings.Get<MessageMetadataRegistry>();
var messageTypes = registry.GetAllMessages().Select(m => m.MessageType);
var schemaCache = new SchemaRegistry();
var assembly = Assembly.GetExecutingAssembly();
foreach (var messageType in messageTypes)
{
var manifestNamespace = "Sample.";
var schemaResourceName = manifestNamespace + messageType.Name + ".avsc";
using var stream = assembly.GetManifestResourceStream(schemaResourceName);
if (stream == null)
{
throw new InvalidOperationException(
$"Resource '{schemaResourceName}' not found in assembly '{assembly.FullName}'.");
}
// Load the schema from the embedded resource
using var reader = new StreamReader(stream);
var schemaJson = reader.ReadToEnd();
// Parse and cache the schema
schemaCache.Add(messageType, Schema.Parse(schemaJson));
}
return _ => new AvroMessageSerializer(schemaCache, new ClassCache());
}
}
and also implementing the message serializer interface:
public class AvroMessageSerializer(SchemaRegistry schemaRegistry, ClassCache classCache) : IMessageSerializer
{
public string ContentType => "avro/json";
public void Serialize(object message, Stream stream)
{
var messageType = message.GetType();
var schema = schemaRegistry.GetSchema(messageType);
var writer = new ReflectDefaultWriter(messageType, schema, classCache);
var encoder = new JsonEncoder(schema, stream);
writer.Write(message, encoder);
encoder.Flush();
}
public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null)
{
if (messageTypes == null)
{
throw new MessageDeserializationException(
"Avro is not able to infer message types from the body content only," +
"the NServiceBus.EnclosedMessageTypes header must be present");
}
var messages = new List<object>();
foreach (var messageType in messageTypes)
{
try
{
var schema = schemaRegistry.GetSchema(messageType);
var reader = new ReflectDefaultReader(messageType, schema, schema, classCache);
using var stream = new ReadOnlyStream(body);
var message = reader.Read(null, schema, schema, new JsonDecoder(schema, stream));
messages.Add(message);
}
catch (KeyNotFoundException)
{
throw new MessageDeserializationException(
$"No schema found for message type {messageType.FullName}");
}
}
return messages.ToArray();
}
}
Sending the message
Prepare and send an order message with sample data:
var message = new CreateOrder
{
OrderId = 9,
Date = DateTime.Now,
CustomerId = 12,
OrderItems =
[
new OrderItem
{
ItemId = 6,
Quantity = 2
},
new OrderItem
{
ItemId = 5,
Quantity = 4
}
]
};
await messageSession.SendLocal(message, cancellationToken: stoppingToken);
Output
The serialized message output appears as follows:
{
"OrderId": 9,
"Date": "2015-09-15T10:23:44.9367871+10:00",
"CustomerId": 12,
"OrderItems": [
{
"ItemId": 6,
"Quantity": 2
},
{
"ItemId": 5,
"Quantity": 4
}
]
}