Introduction
This sample leverages the pipeline to provide an attribute-based message serialization feature. It is currently hard-coded to support only XML and JSON serialization. It uses attributes, defined at the message level, to switch messages between different serializations, but any code could be substituted here to control the choice of serialization.
Code walk-through
The solution contains 3 projects
Shared
contains the common message declarations and the behavior functionalitySender
andReceiver
are the endpoints using the behaviors
The attribute definitions
These can be used to decorate messages.
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithJsonAttribute :
Attribute
{
}
[AttributeUsage(AttributeTargets.Class, Inherited = false)]
public class SerializeWithXmlAttribute :
Attribute
{
}
The message definitions
The messages use the above attributes to control how they are serialized.
[Serializable]
[SerializeWithXml]
public class MessageWithXml :
IMessage
{
public string SomeProperty { get; set; }
}
[SerializeWithJson]
public class MessageWithJson :
IMessage
{
public string SomeProperty { get; set; }
}
Serialization mapper
This class interrogates the message information and uses that to determine which serializer to use.
public class SerializationMapper
{
IMessageSerializer jsonSerializer;
IMessageSerializer xmlSerializer;
public SerializationMapper(IMessageMapper mapper, ReadOnlySettings settings)
{
xmlSerializer = new XmlSerializer()
.Configure(settings)(mapper);
jsonSerializer = new NewtonsoftJsonSerializer()
.Configure(settings)(mapper);
}
public IMessageSerializer GetSerializer(Dictionary<string, string> headers)
{
if (!headers.TryGetValue(Headers.ContentType, out var contentType))
{
// default to Json
return jsonSerializer;
}
if (contentType == jsonSerializer.ContentType)
{
return jsonSerializer;
}
if (contentType == xmlSerializer.ContentType)
{
return xmlSerializer;
}
throw new Exception($"Could not derive serializer for contentType='{contentType}'");
}
public IMessageSerializer GetSerializer(Type messageType)
{
var isJsonMessage = messageType.ContainsAttribute<SerializeWithJsonAttribute>();
var isXmlMessage = messageType.ContainsAttribute<SerializeWithXmlAttribute>();
if (isXmlMessage && isJsonMessage)
{
throw new Exception($"Choose either [SerializeWithXml] or [SerializeWithJson] for serialization of '{messageType.Name}'.");
}
if (isXmlMessage)
{
return xmlSerializer;
}
// default to json
return jsonSerializer;
}
}
Behavior configuration
This replaces the existing serialization behavior and also add the serialization mapper to dependency injection.
public class MultiSerializerFeature :
Feature
{
internal MultiSerializerFeature()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Replace("DeserializeLogicalMessagesConnector", typeof(DeserializeConnector));
pipeline.Replace("SerializeMessageConnector", typeof(SerializeConnector));
var container = context.Container;
container.ConfigureComponent<SerializationMapper>(DependencyLifecycle.SingleInstance);
}
}
Behaviors
DeserializeBehavior.
and SerializeBehavior.
are mostly copies of the core NServiceBus behaviors. The difference is that instead of using the core default serializer, they request a serializer from the serialization mapper.
Serialization behavior
class SerializeConnector :
StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
SerializationMapper serializationMapper;
MessageMetadataRegistry messageMetadataRegistry;
public SerializeConnector(
SerializationMapper serializationMapper,
MessageMetadataRegistry messageMetadataRegistry)
{
this.serializationMapper = serializationMapper;
this.messageMetadataRegistry = messageMetadataRegistry;
}
public override async Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
{
if (context.ShouldSkipSerialization())
{
var emptyMessageContext = this.CreateOutgoingPhysicalMessageContext(
messageBody: new byte[0],
routingStrategies: context.RoutingStrategies,
sourceContext: context);
await stage(emptyMessageContext)
.ConfigureAwait(false);
return;
}
var messageType = context.Message.MessageType;
var messageSerializer = serializationMapper.GetSerializer(messageType);
var headers = context.Headers;
headers[Headers.ContentType] = messageSerializer.ContentType;
headers[Headers.EnclosedMessageTypes] = SerializeEnclosedMessageTypes(messageType);
var array = Serialize(messageSerializer, context);
var physicalMessageContext = this.CreateOutgoingPhysicalMessageContext(
messageBody: array,
routingStrategies: context.RoutingStrategies,
sourceContext: context);
await stage(physicalMessageContext)
.ConfigureAwait(false);
}
byte[] Serialize(IMessageSerializer messageSerializer, IOutgoingLogicalMessageContext context)
{
using (var stream = new MemoryStream())
{
messageSerializer.Serialize(context.Message.Instance, stream);
return stream.ToArray();
}
}
string SerializeEnclosedMessageTypes(Type messageType)
{
var metadata = messageMetadataRegistry.GetMessageMetadata(messageType);
var distinctTypes = metadata.MessageHierarchy.Distinct();
return string.Join(";", distinctTypes.Select(t => t.AssemblyQualifiedName));
}
}
Deserialization behavior
class DeserializeConnector :
StageConnector<IIncomingPhysicalMessageContext, IIncomingLogicalMessageContext>
{
SerializationMapper serializationMapper;
MessageMetadataRegistry messageMetadataRegistry;
LogicalMessageFactory logicalMessageFactory;
static ILog log = LogManager.GetLogger<DeserializeConnector>();
public DeserializeConnector(
SerializationMapper serializationMapper,
MessageMetadataRegistry messageMetadataRegistry,
LogicalMessageFactory logicalMessageFactory)
{
this.serializationMapper = serializationMapper;
this.messageMetadataRegistry = messageMetadataRegistry;
this.logicalMessageFactory = logicalMessageFactory;
}
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<IIncomingLogicalMessageContext, Task> stage)
{
var incomingMessage = context.Message;
var messages = ExtractWithExceptionHandling(incomingMessage);
foreach (var message in messages)
{
var logicalMessageContext = this.CreateIncomingLogicalMessageContext(message, context);
await stage(logicalMessageContext)
.ConfigureAwait(false);
}
}
List<LogicalMessage> ExtractWithExceptionHandling(IncomingMessage message)
{
try
{
return Extract(message);
}
catch (Exception exception)
{
throw new MessageDeserializationException(message.MessageId, exception);
}
}
List<LogicalMessage> Extract(IncomingMessage physicalMessage)
{
if (physicalMessage.Body == null || physicalMessage.Body.Length == 0)
{
return new List<LogicalMessage>();
}
var messageMetadata = new List<MessageMetadata>();
var headers = physicalMessage.Headers;
if (headers.TryGetValue(Headers.EnclosedMessageTypes, out var messageTypeIdentifier))
{
foreach (var messageTypeString in messageTypeIdentifier.Split(';'))
{
var typeString = messageTypeString;
var metadata = messageMetadataRegistry.GetMessageMetadata(typeString);
if (metadata == null)
{
continue;
}
messageMetadata.Add(metadata);
}
if (
messageMetadata.Count == 0 &&
physicalMessage.GetMessageIntent() != MessageIntentEnum.Publish)
{
log.Warn($"Could not determine message type from message header '{messageTypeIdentifier}'. MessageId: {physicalMessage.MessageId}");
}
}
using (var stream = new MemoryStream(physicalMessage.Body))
{
var messageSerializer = serializationMapper.GetSerializer(headers);
var typesToDeserialize = messageMetadata
.Select(metadata => metadata.MessageType)
.ToList();
return messageSerializer.Deserialize(stream, typesToDeserialize)
.Select(x => logicalMessageFactory.Create(x.GetType(), x))
.ToList();
}
}
}
Running the code
A simple execution
- Set both
Receiver
andSender
as startup projects - Run the solution
- In
Sender
press J (for a JSON message) and X (for an XML message) - The message will be received by
Receiver
The message on the wire
- Start only the
Sender
- Send both JSON and SML
Now look at the learning transport's message storage and there will be two messages in the Receiver
folder.
An XML message with the following content
<?xml version="1.0"?>
<MessageWithXml xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns="http://tempuri.net/">
<SomeProperty>Some content in an XML message</SomeProperty>
</MessageWithXml>
And a JSON message with the following content
{"SomeProperty":"Some content in a JSON message"}