Control of serialization via the pipeline

Component: NServiceBus
NuGet Package NServiceBus (7.x)
A subset of the functionality described in this sample was made a part of NServiceBus version 6. See Serialization in NServiceBus for more information.

Introduction

This sample leverages the pipeline to provide an attribute-based message serialization feature. It is currently hard-coded to support only XML and JSON serialization. It uses attributes, defined at the message level, to switch messages between different serializations, but any code could be substituted here to control the choice of serialization.

This sample is not compatible with message serialization against NServiceBus version 4 and below. For simplicity of the sample, some wire compatibility workarounds are excluded. Review the current serialization behaviors in the core of NServiceBus for more details.

Code walk-through

The solution contains 3 projects

  • Shared contains the common message declarations and the behavior functionality
  • Sender and Receiver are the endpoints using the behaviors

The attribute definitions

These can be used to decorate messages.

[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithJsonAttribute :
    Attribute
{
}
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithXmlAttribute :
    Attribute
{
}

The message definitions

The messages use the above attributes to control how they are serialized.

[Serializable]
[SerializeWithXml]
public class MessageWithXml :
    IMessage
{
    public string SomeProperty { get; set; }
}

[SerializeWithJson]
public class MessageWithJson :
    IMessage
{
    public string SomeProperty { get; set; }
}

Serialization mapper

This class interrogates the message information and uses that to determine which serializer to use.

public class SerializationMapper
{
    IMessageSerializer jsonSerializer;
    IMessageSerializer xmlSerializer;

    public SerializationMapper(IMessageMapper mapper, ReadOnlySettings settings)
    {
        xmlSerializer = new XmlSerializer()
            .Configure(settings)(mapper);
        jsonSerializer = new NewtonsoftSerializer()
            .Configure(settings)(mapper);
    }

    public IMessageSerializer GetSerializer(Dictionary<string, string> headers)
    {
        if (!headers.TryGetValue(Headers.ContentType, out var contentType))
        {
            // default to Json
            return jsonSerializer;
        }
        if (contentType == jsonSerializer.ContentType)
        {
            return jsonSerializer;
        }
        if (contentType == xmlSerializer.ContentType)
        {
            return xmlSerializer;
        }
        throw new Exception($"Could not derive serializer for contentType='{contentType}'");
    }

    public IMessageSerializer GetSerializer(Type messageType)
    {
        var isJsonMessage = messageType.ContainsAttribute<SerializeWithJsonAttribute>();
        var isXmlMessage = messageType.ContainsAttribute<SerializeWithXmlAttribute>();
        if (isXmlMessage && isJsonMessage)
        {
            throw new Exception($"Choose either [SerializeWithXml] or [SerializeWithJson] for serialization of '{messageType.Name}'.");
        }
        if (isXmlMessage)
        {
            return xmlSerializer;
        }
        // default to json
        return jsonSerializer;
    }
}

Behavior configuration

This replaces the existing serialization behavior and also add the serialization mapper to dependency injection.

public class MultiSerializerFeature :
    Feature
{
    internal MultiSerializerFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("DeserializeLogicalMessagesConnector", typeof(DeserializeConnector));
        pipeline.Replace("SerializeMessageConnector", typeof(SerializeConnector));
        var container = context.Container;
        container.ConfigureComponent<SerializationMapper>(DependencyLifecycle.SingleInstance);
    }
}

Behaviors

DeserializeBehavior.cs and SerializeBehavior.cs are mostly copies of the core NServiceBus behaviors. The difference is that instead of using the core default serializer, they request a serializer from the serialization mapper.

Serialization behavior

class SerializeConnector :
    StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
    SerializationMapper serializationMapper;
    MessageMetadataRegistry messageMetadataRegistry;

    public SerializeConnector(
        SerializationMapper serializationMapper,
        MessageMetadataRegistry messageMetadataRegistry)
    {
        this.serializationMapper = serializationMapper;
        this.messageMetadataRegistry = messageMetadataRegistry;
    }

    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
    {
        if (context.ShouldSkipSerialization())
        {
            var emptyMessageContext = this.CreateOutgoingPhysicalMessageContext(
                messageBody: new byte[0],
                routingStrategies: context.RoutingStrategies,
                sourceContext: context);
            await stage(emptyMessageContext)
                .ConfigureAwait(false);
            return;
        }

        var messageType = context.Message.MessageType;
        var messageSerializer = serializationMapper.GetSerializer(messageType);
        var headers = context.Headers;
        headers[Headers.ContentType] = messageSerializer.ContentType;
        headers[Headers.EnclosedMessageTypes] = SerializeEnclosedMessageTypes(messageType);

        var array = Serialize(messageSerializer, context);
        var physicalMessageContext = this.CreateOutgoingPhysicalMessageContext(
            messageBody: array,
            routingStrategies: context.RoutingStrategies,
            sourceContext: context);
        await stage(physicalMessageContext)
            .ConfigureAwait(false);
    }

    byte[] Serialize(IMessageSerializer messageSerializer, IOutgoingLogicalMessageContext context)
    {
        using (var stream = new MemoryStream())
        {
            messageSerializer.Serialize(context.Message.Instance, stream);
            return stream.ToArray();
        }
    }

    string SerializeEnclosedMessageTypes(Type messageType)
    {
        var metadata = messageMetadataRegistry.GetMessageMetadata(messageType);
        var distinctTypes = metadata.MessageHierarchy.Distinct();
        return string.Join(";", distinctTypes.Select(t => t.AssemblyQualifiedName));
    }
}

Deserialization behavior

class DeserializeConnector :
    StageConnector<IIncomingPhysicalMessageContext, IIncomingLogicalMessageContext>
{
    SerializationMapper serializationMapper;
    MessageMetadataRegistry messageMetadataRegistry;
    LogicalMessageFactory logicalMessageFactory;
    static ILog log = LogManager.GetLogger<DeserializeConnector>();

    public DeserializeConnector(
        SerializationMapper serializationMapper,
        MessageMetadataRegistry messageMetadataRegistry,
        LogicalMessageFactory logicalMessageFactory)
    {
        this.serializationMapper = serializationMapper;
        this.messageMetadataRegistry = messageMetadataRegistry;
        this.logicalMessageFactory = logicalMessageFactory;
    }

    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> stage)
    {
        var incomingMessage = context.Message;

        var messages = ExtractWithExceptionHandling(incomingMessage);

        foreach (var message in messages)
        {
            var logicalMessageContext = this.CreateIncomingLogicalMessageContext(message, context);
            await stage(logicalMessageContext)
                .ConfigureAwait(false);
        }
    }

    List<LogicalMessage> ExtractWithExceptionHandling(IncomingMessage message)
    {
        try
        {
            return Extract(message);
        }
        catch (Exception exception)
        {
            throw new MessageDeserializationException(message.MessageId, exception);
        }
    }

    List<LogicalMessage> Extract(IncomingMessage physicalMessage)
    {
        if (physicalMessage.Body == null || physicalMessage.Body.Length == 0)
        {
            return new List<LogicalMessage>();
        }

        var messageMetadata = new List<MessageMetadata>();

        var headers = physicalMessage.Headers;
        if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypeIdentifier))
        {
            foreach (var messageTypeString in messageTypeIdentifier.Split(';'))
            {
                var typeString = messageTypeString;
                var metadata = messageMetadataRegistry.GetMessageMetadata(typeString);
                if (metadata == null)
                {
                    continue;
                }

                messageMetadata.Add(metadata);
            }

            if (
                messageMetadata.Count == 0 &&
                physicalMessage.GetMessageIntent() != MessageIntentEnum.Publish)
            {
                log.Warn($"Could not determine message type from message header '{messageTypeIdentifier}'. MessageId: {physicalMessage.MessageId}");
            }
        }

        using (var stream = new MemoryStream(physicalMessage.Body))
        {
            var messageSerializer = serializationMapper.GetSerializer(headers);
            var typesToDeserialize = messageMetadata
                .Select(metadata => metadata.MessageType)
                .ToList();
            return messageSerializer.Deserialize(stream, typesToDeserialize)
                .Select(x => logicalMessageFactory.Create(x.GetType(), x))
                .ToList();
        }
    }
}

Running the code

A simple execution

  • Set both Receiver and Sender as startup projects
  • Run the solution
  • In Sender press J (for a JSON message) and X (for an XML message)
  • The message will be received by Receiver

The message on the wire

  • Start only the Sender
  • Send both JSON and SML

Now look at the learning transport's message storage and there will be two messages in the Receiver folder.

An XML message with the following content

<?xml version="1.0"?>
<MessageWithXml xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://tempuri.net/">
<SomeProperty>Some content in an XML message</SomeProperty>
</MessageWithXml>

And a JSON message with the following content

{"SomeProperty":"Some content in a JSON message"}

Related Articles


Last modified