Introduction
This sample leverages the pipeline to provide an attribute based message serialization feature.
It is currently hard-coded to only support xml and json serialization. It uses attributes, defined at the message level, to switch messages between different serializations, but any code could be substituted here to control the choice of serialization.
Code Walk Through
The solution contains 3 projects
Shared
contains the common message declarations and the behavior functionality.Sender
andReceiver
are the endpoints using the behaviors.
The attribute definitions
These can be used to decorate messages.
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithJsonAttribute :
Attribute
{
}
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithXmlAttribute :
Attribute
{
}
The message definitions
The messages use of the above attributes to control how they are serialized.
[Serializable]
[SerializeWithXml]
public class MessageWithXml :
IMessage
{
public string SomeProperty { get; set; }
}
[SerializeWithJson]
public class MessageWithJson :
IMessage
{
public string SomeProperty { get; set; }
}
Serialization Mapper
This class interrogates the message information and derives what serializer to use.
public class SerializationMapper
{
IMessageSerializer jsonSerializer;
IMessageSerializer xmlSerializer;
public SerializationMapper(IMessageMapper mapper, ReadOnlySettings settings)
{
xmlSerializer = new XmlSerializer()
.Configure(settings)(mapper);
jsonSerializer = new NewtonsoftSerializer()
.Configure(settings)(mapper);
}
public IMessageSerializer GetSerializer(Dictionary<string, string> headers)
{
if (!headers.TryGetValue(Headers.ContentType, out var contentType))
{
// default to Json
return jsonSerializer;
}
if (contentType == jsonSerializer.ContentType)
{
return jsonSerializer;
}
if (contentType == xmlSerializer.ContentType)
{
return xmlSerializer;
}
throw new Exception($"Could not derive serializer for contentType='{contentType}'");
}
public IMessageSerializer GetSerializer(Type messageType)
{
var isJsonMessage = messageType.ContainsAttribute<SerializeWithJsonAttribute>();
var isXmlMessage = messageType.ContainsAttribute<SerializeWithXmlAttribute>();
if (isXmlMessage && isJsonMessage)
{
throw new Exception($"Choose either [SerializeWithXml] or [SerializeWithJson] for serialization of '{messageType.Name}'.");
}
if (isXmlMessage)
{
return xmlSerializer;
}
// default to json
return jsonSerializer;
}
}
Behavior Configuration
This replaces the existing serialization behavior and also add the Serialization Mapper to dependency injection.
public class MultiSerializerFeature :
Feature
{
internal MultiSerializerFeature()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Replace("DeserializeLogicalMessagesConnector", typeof(DeserializeConnector));
pipeline.Replace("SerializeMessageConnector", typeof(SerializeConnector));
var container = context.Container;
container.ConfigureComponent<SerializationMapper>(DependencyLifecycle.SingleInstance);
}
}
Behaviors
DeserializeBehavior.
and SerializeBehavior.
are mostly copies of the core NServiceBus behaviors. The main difference is that instead of using the core default serializer they request a serializer from Serialization Mapper.
Serialization Behavior
class SerializeConnector :
StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
SerializationMapper serializationMapper;
MessageMetadataRegistry messageMetadataRegistry;
public SerializeConnector(
SerializationMapper serializationMapper,
MessageMetadataRegistry messageMetadataRegistry)
{
this.serializationMapper = serializationMapper;
this.messageMetadataRegistry = messageMetadataRegistry;
}
public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
{
if (context.ShouldSkipSerialization())
{
var emptyMessageContext = this.CreateOutgoingPhysicalMessageContext(
messageBody: new byte[0],
routingStrategies: context.RoutingStrategies,
sourceContext: context);
await stage(emptyMessageContext)
.ConfigureAwait(false);
return;
}
var messageType = context.Message.MessageType;
var messageSerializer = serializationMapper.GetSerializer(messageType);
var headers = context.Headers;
headers[Headers.ContentType] = messageSerializer.ContentType;
headers[Headers.EnclosedMessageTypes] = SerializeEnclosedMessageTypes(messageType);
var array = Serialize(messageSerializer, context);
var physicalMessageContext = this.CreateOutgoingPhysicalMessageContext(
messageBody: array,
routingStrategies: context.RoutingStrategies,
sourceContext: context);
await stage(physicalMessageContext)
.ConfigureAwait(false);
}
byte[] Serialize(IMessageSerializer messageSerializer, IOutgoingLogicalMessageContext context)
{
using (var stream = new MemoryStream())
{
messageSerializer.Serialize(context.Message.Instance, stream);
return stream.ToArray();
}
}
string SerializeEnclosedMessageTypes(Type messageType)
{
var metadata = messageMetadataRegistry.GetMessageMetadata(messageType);
var distinctTypes = metadata.MessageHierarchy.Distinct();
return string.Join(";", distinctTypes.Select(t => t.AssemblyQualifiedName));
}
}
Deserialization Behavior
class DeserializeConnector :
StageConnector<IIncomingPhysicalMessageContext, IIncomingLogicalMessageContext>
{
SerializationMapper serializationMapper;
MessageMetadataRegistry messageMetadataRegistry;
LogicalMessageFactory logicalMessageFactory;
static ILog log = LogManager.GetLogger<DeserializeConnector>();
public DeserializeConnector(
SerializationMapper serializationMapper,
MessageMetadataRegistry messageMetadataRegistry,
LogicalMessageFactory logicalMessageFactory)
{
this.serializationMapper = serializationMapper;
this.messageMetadataRegistry = messageMetadataRegistry;
this.logicalMessageFactory = logicalMessageFactory;
}
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> stage)
{
var incomingMessage = context.Message;
var messages = ExtractWithExceptionHandling(incomingMessage);
foreach (var message in messages)
{
var logicalMessageContext = this.CreateIncomingLogicalMessageContext(message, context);
await stage(logicalMessageContext)
.ConfigureAwait(false);
}
}
List<LogicalMessage> ExtractWithExceptionHandling(IncomingMessage message)
{
try
{
return Extract(message);
}
catch (Exception exception)
{
throw new MessageDeserializationException(message.MessageId, exception);
}
}
List<LogicalMessage> Extract(IncomingMessage physicalMessage)
{
if (physicalMessage.Body == null || physicalMessage.Body.Length == 0)
{
return new List<LogicalMessage>();
}
var messageMetadata = new List<MessageMetadata>();
var headers = physicalMessage.Headers;
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypeIdentifier))
{
foreach (var messageTypeString in messageTypeIdentifier.Split(';'))
{
var typeString = messageTypeString;
var metadata = messageMetadataRegistry.GetMessageMetadata(typeString);
if (metadata == null)
{
continue;
}
messageMetadata.Add(metadata);
}
if (
messageMetadata.Count == 0 &&
physicalMessage.GetMessageIntent() != MessageIntentEnum.Publish)
{
log.Warn($"Could not determine message type from message header '{messageTypeIdentifier}'. MessageId: {physicalMessage.MessageId}");
}
}
using (var stream = new MemoryStream(physicalMessage.Body))
{
var messageSerializer = serializationMapper.GetSerializer(headers);
var typesToDeserialize = messageMetadata
.Select(metadata => metadata.MessageType)
.ToList();
return messageSerializer.Deserialize(stream, typesToDeserialize)
.Select(x => logicalMessageFactory.Create(x.GetType(), x))
.ToList();
}
}
}
Running the Code
A simple execution
- Set both
Receiver
andSender
as startup projects. - Run the solution.
- In
Sender
pressJ
(for a json message) andX
(for a xml message). - The message will be received at
Receiver
.
The message on the wire
- Start only the
Sender
. - Send both json and xml.
Now have a look at the Learning Transport's message storage and there will be two messages in the Receiver
folder.
A xml message with the content
<?xml version="1.0"?>
<MessageWithXml xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://tempuri.net/">
<SomeProperty>Some content in a Xml message</SomeProperty>
</MessageWithXml>
And a Json message with the content
{"SomeProperty":"Some content in a json message"}