Control of serialization via the pipeline

Component: NServiceBus | Nuget: NServiceBus (Version: 5.x)
A subset of the functionality described in this sample was made a part of NServiceBus Version 6. See Serialization in NServiceBus for more information.

Introduction

This sample leverages the pipeline to provide an attribute based message serialization feature.

It is currently hard coded to only support xml and json serialization. It uses attributes, defined at the message level, to switch messages between different serializations but any code could be substituted here to control the choice of serialization.

This sample is not compatible with message serialization against Version 4 and below of NServiceBus. The reason is that, for simplicity of the sample, some wire compatibility workarounds are excluded. Have a look at the current serialization behaviors in the core of NServiceBus for more details.

Code Walk Through

The solution contains 3 projects

  • Shared contains the common message declarations and the actual behavior functionality.
  • Sender and Receiver are the actual endpoint using the behaviors.

The attribute definitions

These can be used to decorate messages.

Edit
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithJsonAttribute :
    Attribute
{
}
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithXmlAttribute :
    Attribute
{
}

The message definitions

The messages use of the above attributes to control how they are serialized.

Edit
[Serializable]
[SerializeWithXml]
public class MessageWithXml :
    IMessage
{
    public string SomeProperty { get; set; }
}

[SerializeWithJson]
public class MessageWithJson :
    IMessage
{
    public string SomeProperty { get; set; }
}

Serialization Mapper

This class interrogates the message information and derives what serializer to use.

Edit
public class SerializationMapper
{
    JsonMessageSerializer jsonSerializer;
    XmlMessageSerializer xmlSerializer;

    public SerializationMapper(IMessageMapper mapper, Conventions conventions, Configure configure)
    {
        jsonSerializer = new JsonMessageSerializer(mapper);
        xmlSerializer = new XmlMessageSerializer(mapper, conventions);
        var messageTypes = configure.TypesToScan.Where(conventions.IsMessageType).ToList();
        xmlSerializer.Initialize(messageTypes);
    }

    public IMessageSerializer GetSerializer(Dictionary<string, string> headers)
    {
        string contentType;
        if (!headers.TryGetValue(Headers.ContentType, out contentType))
        {
            // default to Json
            return jsonSerializer;
        }
        if (contentType == jsonSerializer.ContentType)
        {
            return jsonSerializer;
        }
        if (contentType == xmlSerializer.ContentType)
        {
            return xmlSerializer;
        }
        var message = $"Could not derive serializer for contentType='{contentType}'";
        throw new Exception(message);
    }

    public IMessageSerializer GetSerializer(Type messageType)
    {
        var isJsonMessage = messageType.ContainsAttribute<SerializeWithJsonAttribute>();
        var isXmlMessage = messageType.ContainsAttribute<SerializeWithXmlAttribute>();
        if (isXmlMessage && isJsonMessage)
        {
            var message = $"Choose either [SerializeWithXml] or [SerializeWithJson] for serialization of '{messageType.Name}'.";
            throw new Exception(message);
        }
        if (isXmlMessage)
        {
            return xmlSerializer;
        }
        // default to json
        return jsonSerializer;
    }
}

Behavior Configuration

This replaces the existing serialization behavior and also injects the Serialization Mapper into the container.

Edit
public class MultiSerializerFeature :
    Feature
{
    internal MultiSerializerFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace(WellKnownStep.DeserializeMessages, typeof(DeserializeBehavior));
        pipeline.Replace(WellKnownStep.SerializeMessage, typeof(SerializeBehavior));
        var container = context.Container;
        container.ConfigureComponent<SerializationMapper>(DependencyLifecycle.SingleInstance);
    }
}

Behaviors

DeserializeBehavior.cs and SerializeBehavior.cs are mostly a copies of the core NServiceBus behaviors. The main difference is that instead of using the core default serializer they request a serializer from Serialization Mapper.

Serialization Behavior

Edit
class SerializeBehavior :
    IBehavior<OutgoingContext>
{
    SerializationMapper serializationMapper;

    public SerializeBehavior(SerializationMapper serializationMapper)
    {
        this.serializationMapper = serializationMapper;
    }

    public void Invoke(OutgoingContext context, Action next)
    {
        var transportMessage = context.OutgoingMessage;
        if (!transportMessage.IsControlMessage())
        {
            var logicalMessage = context.OutgoingLogicalMessage;
            var messageInstance = logicalMessage.Instance;
            var messageType = messageInstance.GetType();

            var messageSerializer = serializationMapper.GetSerializer(messageType);
            transportMessage.Body = Serialize(messageSerializer, messageInstance);

            var headers = transportMessage.Headers;
            headers[Headers.ContentType] = messageSerializer.ContentType;
            headers[Headers.EnclosedMessageTypes] = SerializeEnclosedMessageTypes(logicalMessage);

            foreach (var headerEntry in logicalMessage.Headers)
            {
                headers[headerEntry.Key] = headerEntry.Value;
            }
        }

        next();
    }

    static byte[] Serialize(IMessageSerializer messageSerializer, object messageInstance)
    {
        using (var stream = new MemoryStream())
        {
            messageSerializer.Serialize(messageInstance, stream);
            return stream.ToArray();
        }
    }

    string SerializeEnclosedMessageTypes(LogicalMessage message)
    {
        var distinctTypes = message.Metadata.MessageHierarchy.Distinct();
        return string.Join(";", distinctTypes.Select(t => t.AssemblyQualifiedName));
    }

}

Deserialization Behavior

Edit
class DeserializeBehavior :
    IBehavior<IncomingContext>
{
    SerializationMapper serializationMapper;
    MessageMetadataRegistry messageMetadataRegistry;
    LogicalMessageFactory messageFactory;
    static ILog log = LogManager.GetLogger<DeserializeBehavior>();

    public DeserializeBehavior(SerializationMapper serializationMapper, MessageMetadataRegistry messageMetadataRegistry, LogicalMessageFactory messageFactory)
    {
        this.serializationMapper = serializationMapper;
        this.messageMetadataRegistry = messageMetadataRegistry;
        this.messageFactory = messageFactory;
    }

    public void Invoke(IncomingContext context, Action next)
    {
        var transportMessage = context.PhysicalMessage;

        if (transportMessage.IsControlMessage())
        {
            log.Info("Received a control message. Skipping deserialization as control message data is contained in the header.");
            next();
            return;
        }
        try
        {
            context.LogicalMessages = Extract(transportMessage);
        }
        catch (Exception exception)
        {
            throw new MessageDeserializationException(transportMessage.Id, exception);
        }

        next();
    }

    List<LogicalMessage> Extract(TransportMessage physicalMessage)
    {
        if (physicalMessage.Body == null || physicalMessage.Body.Length == 0)
        {
            return new List<LogicalMessage>();
        }

        string typeIdentifier;
        if (!physicalMessage.Headers.TryGetValue(Headers.EnclosedMessageTypes, out typeIdentifier))
        {
            return Deserialize(physicalMessage, new List<MessageMetadata>());
        }
        var messageMetadata = GetMessageMetadata(typeIdentifier)
            .ToList();
        if (messageMetadata.Count != 0 || physicalMessage.MessageIntent == MessageIntentEnum.Publish)
        {
            return Deserialize(physicalMessage, messageMetadata);
        }
        log.Warn($"Could not determine message type from message header '{typeIdentifier}'. MessageId: {physicalMessage.Id}");
        return Deserialize(physicalMessage, messageMetadata);
    }

    IEnumerable<MessageMetadata> GetMessageMetadata(string messageTypeIdentifier)
    {
        return messageTypeIdentifier
            .Split(';')
            .Select(type => messageMetadataRegistry.GetMessageMetadata(type))
            .Where(metadata => metadata != null);
    }

    List<LogicalMessage> Deserialize(TransportMessage physicalMessage, List<MessageMetadata> messageMetadata)
    {
        var messageSerializer = serializationMapper.GetSerializer(physicalMessage.Headers);
        var typesToDeserialize = messageMetadata.Select(x => x.MessageType)
            .ToList();
        using (var stream = new MemoryStream(physicalMessage.Body))
        {
            return messageSerializer.Deserialize(stream, typesToDeserialize)
                .Select(x => messageFactory.Create(x.GetType(), x, physicalMessage.Headers))
                .ToList();
        }
    }

}

Running the Code

A simple execution

  • Set both Receiver and Sender as startup projects.
  • Run the solution.
  • In Sender press J (for a json message) and X (for a xml message).
  • The message will be received at Receiver.

The message on the wire

  • Start only the Sender.
  • Send both json and xml.

Now have a look in msmq and there will be two messages in the Receiver queue.

A xml message with the content

<?xml version="1.0"?>
<MessageWithXml xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://tempuri.net/">
<SomeProperty>Some content in a Xml message</SomeProperty>
</MessageWithXml>

And a Json message with the content

{"SomeProperty":"Some content in a json message"}

Related Articles


Last modified