Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Transition serialization formats

Component: NServiceBus
NuGet Package: NServiceBus (7.x)

This sample illustrates an approach for introducing a breaking change to the message serialization format in a way that requires no endpoint downtime and no manipulation of message bodies.

The external Json.NET serializer is used in this sample, but the phased upgrade approach is applicable to changing the format of any serializer or moving between different serializers.

Message contracts

public class Order :
    IMessage
{
    public int OrderId { get; set; }

    // Where the dictionary key is the id of the order line item
    public Dictionary<int, OrderItem> OrderItems { get; set; }
}

public class OrderItem
{
    public int Quantity { get; set; }
}

Message compatibility

To highlight compatibility between iterations of the endpoint, each phase sends a message to itself, the upper phase, and the lower phase.

For example, Phase3 sends to itself as well as to Phase2 and Phase4.

var message = MessageCreator.NewOrder();
await endpointInstance.SendLocal(message);
await endpointInstance.Send("Samples.Serialization.TransitionPhase2", message);
await endpointInstance.Send("Samples.Serialization.TransitionPhase4", message);

Serialization format change

This sample uses a hypothetical scenario where the JSON serialization format needs to change how dictionaries are serialized from using contents for the key and value to an explicitly named key and value approach.

JSON using a standard approach

{
  "OrderId": 9,
  "OrderItems": {
    "3": {
      "Quantity": 2
    },
    "8": {
      "Quantity": 7
    }
  }
}

JSON using a key-value approach

{
  "OrderId": 9,
  "OrderItems": [
    {
      "Key": 3,
      "Value": {
        "Quantity": 2
      }
    },
    {
      "Key": 8,
      "Value": {
        "Quantity": 7
      }
    }
  ]
}

Implementing the change

This is implemented using the NewtonSoft ContractResolver that changes the array contract for the dictionary from the default.

public class ExtendedResolver :
    DefaultContractResolver
{
    protected override JsonContract CreateContract(Type objectType)
    {
        if (objectType.GetInterfaces().Any(IsDictionary))
        {
            return CreateArrayContract(objectType);
        }

        return base.CreateContract(objectType);
    }

    bool IsDictionary(Type type)
    {
        return type == typeof(IDictionary) ||
               type.IsGenericType &&
               type.GetGenericTypeDefinition() == typeof(IDictionary<,>);
    }
}

Diagnostic helpers

To help visualize the serialization changes, there are two behaviors that write the contents of each incoming and outgoing message.

public class IncomingWriter :
    Behavior<IIncomingPhysicalMessageContext>
{
    static ILog log = LogManager.GetLogger<IncomingWriter>();

    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var builder = new StringBuilder(Environment.NewLine);
        builder.AppendLine($"OriginatingEndpoint: {context.ReplyToAddress}");
        var bodyAsString = Encoding.UTF8
            .GetString(context.Message.Body);
        builder.AppendLine("MessageBody:");
        builder.AppendLine(bodyAsString);
        log.Info(builder.ToString());
        return next();
    }
}
public class OutgoingWriter :
    Behavior<IOutgoingPhysicalMessageContext>
{
    static ILog log = LogManager.GetLogger<OutgoingWriter>();

    public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {
        var builder = new StringBuilder(Environment.NewLine);
        var routingStrategy = context.RoutingStrategies.Single();
        var targetEndpoint = routingStrategy.Apply(new Dictionary<string, string>());
        builder.AppendLine($"TargetEndpoint: {targetEndpoint}");
        var bodyAsString = Encoding.UTF8
            .GetString(context.Body);
        builder.AppendLine("MessageBody:");
        builder.AppendLine(bodyAsString);
        log.Info(builder.ToString());
        return next();
    }
}

Both behaviors are registered at configuration time:

endpointConfiguration.AddMessageBodyWriter();
public static void AddMessageBodyWriter(this EndpointConfiguration endpointConfiguration)
{
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(
        stepId: "OutgoingWriter",
        behavior: typeof(OutgoingWriter),
        description: "Logs the contents of each outgoing message and the TargetEndpoint");
    pipeline.Register(
        stepId: "IncomingWriter",
        behavior: typeof(IncomingWriter),
        description: "Logs the contents of each incoming message and the OriginatingEndpoint");
}

Phases

Note that, in production, each of the phases must be applied to every endpoint that needs to communicate with the new serialization format. So each endpoint can be, at most, one phase out of sync with any other endpoint it needs to communicate with.

Phase 1

Endpoint is running:

  • Version 1 serialization.

This is the initial state where all endpoints are using the standard JsonSerializerSettings registered with the ContentTypeKey of jsonv1. All endpoints are sending and receiving V1 messages.

var settingsV1 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented
};
var serializationV1 = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serializationV1.Settings(settingsV1);
serializationV1.ContentTypeKey("jsonv1");

Phase 2

Endpoint is running:

  • Version 1 serialization.
  • Version 2 deserialization.

The new JsonSerializerSettings registered as a deserializer with the ContentTypeKey of jsonv2. This makes all endpoints capable of receiving V2 messages while still sending V1 messages.

var settingsV1 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented
};
var serializationV1 = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serializationV1.Settings(settingsV1);
serializationV1.ContentTypeKey("jsonv1");

var settingsV2 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented,
    ContractResolver = new ExtendedResolver()
};
var serializationV2 = endpointConfiguration.AddDeserializer<NewtonsoftJsonSerializer>();
serializationV2.Settings(settingsV2);
serializationV2.ContentTypeKey("jsonv2");

Phase 3

Endpoint is running:

  • Version 2 serialization
  • Version 1 deserialization

The serializer and deserializer are switched. So all endpoints can still receive V1 messages but will send messages in the V2 format.

var settingsV2 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented,
    ContractResolver = new ExtendedResolver()
};
var serializationV2 = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serializationV2.Settings(settingsV2);
serializationV2.ContentTypeKey("jsonv2");

var settingsV1 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented
};
var serializationV1 = endpointConfiguration.AddDeserializer<NewtonsoftJsonSerializer>();
serializationV1.Settings(settingsV1);
serializationV1.ContentTypeKey("jsonv1");

Phase 4

Endpoint is running:

  • Version 2 serialization

All endpoints are now sending and receiving V2 messages.

var settingsV2 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented,
    ContractResolver = new ExtendedResolver()
};
var serializationV2 = endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();
serializationV2.Settings(settingsV2);
serializationV2.ContentTypeKey("jsonv2");

Messages in transit

It is important to consider both discarding of old messages and how the error queue is handled. For example, the following time-line could be problematic:

  • A message makes use of the old serialization format.
  • The message has a long Time-To-Be-Received (TTBR).
  • The message fails processing and is forwarded to the error queue.
  • The above change in serialization format is performed.
  • A retry of the message is attempted.

Given the type of serialization change that has occurred, the structure of the message contract, and the serializer being used, the resulting behavior is non-deterministic. The serializer may throw an exception for the old format or it may swallow the error and proceed with missing data.

So either:

  • Delay the upgrading of each phase so the overlapping time is greater than the maximum TTBR of message contracts, or
  • During the deployment of each phase verify that messages in the error queue will not exhibit the above problem.

Related Articles

  • Serialization
    .NET messaging systems require serialization and deserialization of objects sent/received over transports. NServiceBus achieves this using serializers.