Control of serialization via the pipeline

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)
A subset of the functionality described in this sample was made a part of NServiceBus Version 6. See Serialization in NServiceBus for more information.

Introduction

This sample leverages the pipeline to provide an attribute based message serialization feature.

It is currently hard coded to only support xml and json serialization. It uses attributes, defined at the message level, to switch messages between different serializations but any code could be substituted here to control the choice of serialization.

This sample is not compatible with message serialization against Version 4 and below of NServiceBus. The reason is that, for simplicity of the sample, some wire compatibility workarounds are excluded. Have a look at the current serialization behaviors in the core of NServiceBus for more details.

Code Walk Through

The solution contains 3 projects

  • Shared contains the common message declarations and the actual behavior functionality.
  • Sender and Receiver are the actual endpoint using the behaviors.

The attribute definitions

These can be used to decorate messages.

Edit
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithJsonAttribute :
    Attribute
{
}
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithXmlAttribute :
    Attribute
{
}

The message definitions

The messages use of the above attributes to control how they are serialized.

Edit
[Serializable]
[SerializeWithXml]
public class MessageWithXml :
    IMessage
{
    public string SomeProperty { get; set; }
}

[SerializeWithJson]
public class MessageWithJson :
    IMessage
{
    public string SomeProperty { get; set; }
}

Serialization Mapper

This class interrogates the message information and derives what serializer to use.

Edit
public class SerializationMapper
{
    IMessageSerializer jsonSerializer;
    IMessageSerializer xmlSerializer;

    public SerializationMapper(IMessageMapper mapper, ReadOnlySettings settings)
    {
        xmlSerializer = new XmlSerializer()
            .Configure(settings)(mapper);
        jsonSerializer = new JsonSerializer()
            .Configure(settings)(mapper);
    }

    public IMessageSerializer GetSerializer(Dictionary<string, string> headers)
    {
        string contentType;
        if (!headers.TryGetValue(Headers.ContentType, out contentType))
        {
            // default to Json
            return jsonSerializer;
        }
        if (contentType == jsonSerializer.ContentType)
        {
            return jsonSerializer;
        }
        if (contentType == xmlSerializer.ContentType)
        {
            return xmlSerializer;
        }
        throw new Exception($"Could not derive serializer for contentType='{contentType}'");
    }

    public IMessageSerializer GetSerializer(Type messageType)
    {
        var isJsonMessage = messageType.ContainsAttribute<SerializeWithJsonAttribute>();
        var isXmlMessage = messageType.ContainsAttribute<SerializeWithXmlAttribute>();
        if (isXmlMessage && isJsonMessage)
        {
            throw new Exception($"Choose either [SerializeWithXml] or [SerializeWithJson] for serialization of '{messageType.Name}'.");
        }
        if (isXmlMessage)
        {
            return xmlSerializer;
        }
        // default to json
        return jsonSerializer;
    }
}

Behavior Configuration

This replaces the existing serialization behavior and also injects the Serialization Mapper into the container.

Edit
public class MultiSerializerFeature :
    Feature
{
    internal MultiSerializerFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("NServiceBus.DeserializeLogicalMessagesConnector", typeof(DeserializeConnector));
        pipeline.Replace("NServiceBus.SerializeMessageConnector", typeof(SerializeConnector));
        var container = context.Container;
        container.ConfigureComponent<SerializationMapper>(DependencyLifecycle.SingleInstance);
    }
}

Behaviors

DeserializeBehavior.cs and SerializeBehavior.cs are mostly a copies of the core NServiceBus behaviors. The main difference is that instead of using the core default serializer they request a serializer from Serialization Mapper.

Serialization Behavior

Edit
class SerializeConnector :
    StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
    SerializationMapper serializationMapper;
    MessageMetadataRegistry messageMetadataRegistry;

    public SerializeConnector(
        SerializationMapper serializationMapper,
        MessageMetadataRegistry messageMetadataRegistry)
    {
        this.serializationMapper = serializationMapper;
        this.messageMetadataRegistry = messageMetadataRegistry;
    }

    public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
    {
        if (context.ShouldSkipSerialization())
        {
            var emptyMessageContext = this.CreateOutgoingPhysicalMessageContext(
                messageBody: new byte[0],
                routingStrategies: context.RoutingStrategies,
                sourceContext: context);
            await stage(emptyMessageContext)
                .ConfigureAwait(false);
            return;
        }

        var messageType = context.Message.MessageType;
        var messageSerializer = serializationMapper.GetSerializer(messageType);
        var headers = context.Headers;
        headers[Headers.ContentType] = messageSerializer.ContentType;
        headers[Headers.EnclosedMessageTypes] = SerializeEnclosedMessageTypes(messageType);

        var array = Serialize(messageSerializer, context);
        var physicalMessageContext = this.CreateOutgoingPhysicalMessageContext(
            messageBody: array,
            routingStrategies: context.RoutingStrategies,
            sourceContext: context);
        await stage(physicalMessageContext)
            .ConfigureAwait(false);
    }

    byte[] Serialize(IMessageSerializer messageSerializer, IOutgoingLogicalMessageContext context)
    {
        using (var stream = new MemoryStream())
        {
            messageSerializer.Serialize(context.Message.Instance, stream);
            return stream.ToArray();
        }
    }

    string SerializeEnclosedMessageTypes(Type messageType)
    {
        var metadata = messageMetadataRegistry.GetMessageMetadata(messageType);
        var distinctTypes = metadata.MessageHierarchy.Distinct();
        return string.Join(";", distinctTypes.Select(t => t.AssemblyQualifiedName));
    }
}

Deserialization Behavior

Edit
class DeserializeConnector :
    StageConnector<IIncomingPhysicalMessageContext, IIncomingLogicalMessageContext>
{
    SerializationMapper serializationMapper;
    MessageMetadataRegistry messageMetadataRegistry;
    LogicalMessageFactory logicalMessageFactory;
    static ILog log = LogManager.GetLogger<DeserializeConnector>();

    public DeserializeConnector(
        SerializationMapper serializationMapper,
        MessageMetadataRegistry messageMetadataRegistry,
        LogicalMessageFactory logicalMessageFactory)
    {
        this.serializationMapper = serializationMapper;
        this.messageMetadataRegistry = messageMetadataRegistry;
        this.logicalMessageFactory = logicalMessageFactory;
    }

    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> stage)
    {
        var incomingMessage = context.Message;

        var messages = ExtractWithExceptionHandling(incomingMessage);

        foreach (var message in messages)
        {
            var logicalMessageContext = this.CreateIncomingLogicalMessageContext(message, context);
            await stage(logicalMessageContext)
                .ConfigureAwait(false);
        }
    }

    List<LogicalMessage> ExtractWithExceptionHandling(IncomingMessage message)
    {
        try
        {
            return Extract(message);
        }
        catch (Exception exception)
        {
            throw new MessageDeserializationException(message.MessageId, exception);
        }
    }

    List<LogicalMessage> Extract(IncomingMessage physicalMessage)
    {
        if (physicalMessage.Body == null || physicalMessage.Body.Length == 0)
        {
            return new List<LogicalMessage>();
        }

        string messageTypeIdentifier;
        var messageMetadata = new List<MessageMetadata>();

        var headers = physicalMessage.Headers;
        if (headers.TryGetValue(Headers.EnclosedMessageTypes, out messageTypeIdentifier))
        {
            foreach (var messageTypeString in messageTypeIdentifier.Split(';'))
            {
                var typeString = messageTypeString;
                var metadata = messageMetadataRegistry.GetMessageMetadata(typeString);
                if (metadata == null)
                {
                    continue;
                }

                messageMetadata.Add(metadata);
            }

            if (
                messageMetadata.Count == 0 &&
                physicalMessage.GetMesssageIntent() != MessageIntentEnum.Publish)
            {
                log.Warn($"Could not determine message type from message header '{messageTypeIdentifier}'. MessageId: {physicalMessage.MessageId}");
            }
        }

        using (var stream = new MemoryStream(physicalMessage.Body))
        {
            var messageSerializer = serializationMapper.GetSerializer(headers);
            var typesToDeserialize = messageMetadata
                .Select(metadata => metadata.MessageType)
                .ToList();
            return messageSerializer.Deserialize(stream, typesToDeserialize)
                .Select(x => logicalMessageFactory.Create(x.GetType(), x))
                .ToList();
        }
    }
}

Running the Code

A simple execution

  • Set both Receiver and Sender as startup projects.
  • Run the solution.
  • In Sender press J (for a json message) and X (for a xml message).
  • The message will be received at Receiver.

The message on the wire

  • Start only the Sender.
  • Send both json and xml.

Now have a look in msmq and there will be two messages in the Receiver queue.

A xml message with the content

<?xml version="1.0"?>
<MessageWithXml xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://tempuri.net/">
<SomeProperty>Some content in a Xml message</SomeProperty>
</MessageWithXml>

And a Json message with the content

{"SomeProperty":"Some content in a json message"}

Related Articles


Last modified