Transition serialization formats

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

This sample illustrates an approach for introducing a breaking change to the message serialization format in way that requires no endpoint downtime and no manipulation of message bodies.

This sample uses 4 "Phase" Endpoint Projects to illustrate the iterations of a single endpoint in one solution.

The External Json.NET Serializer is used in this sample, but the Phased upgrade approach is applicable to changing the format any serializer or even moving between different serializers.

Message Contracts

Edit
public class Order :
    IMessage
{
    public int OrderId { get; set; }

    // Where the dictionary key is the id of the order line item
    public Dictionary<int, OrderItem> OrderItems { get; set; }
}

public class OrderItem
{
    public int Quantity { get; set; }
}

Message Compatibility

To highlight compatibility between iterations of the endpoint, each Phase sends a message to itself, the upper phase, and the lower phase.

For example Phase3 sends to itself, Phase2, and Phase4.

Edit
var message = MessageCreator.NewOrder();
await endpointInstance.SendLocal(message)
    .ConfigureAwait(false);
await endpointInstance.Send("Samples.Serialization.TransitionPhase2", message)
    .ConfigureAwait(false);
await endpointInstance.Send("Samples.Serialization.TransitionPhase4", message)
    .ConfigureAwait(false);

Serialization format change

For demonstration purposes this sample uses a hypothetical scenario where the Json serialization format need to change the approach for serializing dictionaries. From using contents for the key and value to using an explicitly named key and value approach.

Json using standard approach

{
  "OrderId": 9,
  "OrderItems": {
    "3": {
      "Quantity": 2
    },
    "8": {
      "Quantity": 7
    }
  }
}

JSON using key-value approach

{
  "OrderId": 9,
  "OrderItems": [
    {
      "Key": 3,
      "Value": {
        "Quantity": 2
      }
    },
    {
      "Key": 8,
      "Value": {
        "Quantity": 7
      }
    }
  ]
}

Implementing the change

This is implemented using NewtonSoft ContractResolver and changing the using an array contract for the dictionary instead of the default.

Edit
public class ExtendedResolver :
    DefaultContractResolver
{
    protected override JsonContract CreateContract(Type objectType)
    {
        if (objectType.GetInterfaces().Any(IsDictionary))
        {
            return CreateArrayContract(objectType);
        }

        return base.CreateContract(objectType);
    }

    bool IsDictionary(Type i)
    {
        return i == typeof(IDictionary) ||
               (i.IsGenericType &&
                i.GetGenericTypeDefinition() == typeof(IDictionary<,>));
    }
}

Diagnostic helpers

To help visualize the serialization changes there are two Behaviors that write the contents of each incoming and outgoing message.

Edit
public class IncomingWriter :
    Behavior<IIncomingPhysicalMessageContext>
{
    static ILog log = LogManager.GetLogger<IncomingWriter>();

    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var builder = new StringBuilder(Environment.NewLine);
        builder.AppendLine($"OriginatingEndpoint: {context.ReplyToAddress}");
        var bodyAsString = Encoding.UTF8
            .GetString(context.Message.Body);
        builder.AppendLine("MessageBody:");
        builder.AppendLine(bodyAsString);
        log.Info(builder.ToString());
        return next();
    }
}
Edit
public class OutgoingWriter :
    Behavior<IOutgoingPhysicalMessageContext>
{
    static ILog log = LogManager.GetLogger<OutgoingWriter>();

    public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {
        var builder = new StringBuilder(Environment.NewLine);
        var routingStrategy = context.RoutingStrategies.Single();
        var targetEndpoint = routingStrategy.Apply(null);
        builder.AppendLine($"TargetEndpoint: {targetEndpoint}");
        var bodyAsString = Encoding.UTF8
            .GetString(context.Body);
        builder.AppendLine("MessageBody:");
        builder.AppendLine(bodyAsString);
        log.Info(builder.ToString());
        return next();
    }
}

Both Behaviors are registered at configuration time:

Edit
endpointConfiguration.AddMessageBodyWriter();
Edit
public static void AddMessageBodyWriter(this EndpointConfiguration endpointConfiguration)
{
    var pipeline = endpointConfiguration.Pipeline;
    pipeline.Register(
        stepId: "OutgoingWriter",
        behavior: typeof(OutgoingWriter),
        description: "Logs the contents of each outgoing message and the TargetEndpoint");
    pipeline.Register(
        stepId: "IncomingWriter",
        behavior: typeof(IncomingWriter),
        description: "Logs the contents of each incoming message and the OriginatingEndpoint");
}

Phases

Note that in production system each of the phases would need to be applied to every endpoint that needs communicate with the new serialization format. So each endpoint can be at-most one phase out of sync with any other endpoint it needs to communicate with.

Phase 1

Endpoint is running:

  • Version 1 serialization.

This is the initial state where all endpoints are using the standard JsonSerializerSettings registered with the ContentTypeKey of jsonv1. All endpoints are sending and receiving V1 messages.

Edit
var settingsV1 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented
};
var serializationV1 = endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
serializationV1.Settings(settingsV1);
serializationV1.ContentTypeKey("jsonv1");

Phase 2

Endpoint is running:

  • Version 1 serialization.
  • Version 2 deserialization.

The new JsonSerializerSettings registered as a deserializer with the ContentTypeKey of jsonv2. This makes all endpoint capable of receiving V2 messages while still sending V1 messages.

Edit
var settingsV1 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented
};
var serializationV1 = endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
serializationV1.Settings(settingsV1);
serializationV1.ContentTypeKey("jsonv1");

var settingsV2 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented,
    ContractResolver = new ExtendedResolver()
};
var serializationV2 = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
serializationV2.Settings(settingsV2);
serializationV2.ContentTypeKey("jsonv2");

Phase 3

Endpoint is running:

  • Version 2 serialization
  • Version 1 deserialization

The serializer and deserializer are switched. So all endpoints can still receive V1 messages but will be sending V2 messages.

Edit
var settingsV2 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented,
    ContractResolver = new ExtendedResolver()
};
var serializationV2 = endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
serializationV2.Settings(settingsV2);
serializationV2.ContentTypeKey("jsonv2");

var settingsV1 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented
};
var serializationV1 = endpointConfiguration.AddDeserializer<NewtonsoftSerializer>();
serializationV1.Settings(settingsV1);
serializationV1.ContentTypeKey("jsonv1");

Phase 4

Endpoint is running:

  • Version 2 serialization

All endpoints are now sending and receiving V2 messages.

Edit
var settingsV2 = new JsonSerializerSettings
{
    Formatting = Formatting.Indented,
    ContractResolver = new ExtendedResolver()
};
var serializationV2 = endpointConfiguration.UseSerialization<NewtonsoftSerializer>();
serializationV2.Settings(settingsV2);
serializationV2.ContentTypeKey("jsonv2");

Messages in transit

It is important to consider both Discard Old Messages and how the Error Queue is handled. For example the following time-line could be problematic

  • A message makes use of the old serialization format.
  • The message a long Time-To-Be-Received (TTBR).
  • The message fails processing and is forwarded to the error queue.
  • The above change in serialization format is performed.
  • A retry of the message is attempted.

Given the type of serialization change that has occurred, the structure of the message contract, and the serializer being used the resulting behavior is non-deterministic. The serializer may throw and exception for the old format or it may swallow the error and proceed with missing data.

So either

  • Delay the upgrading of each Phase so the overlapping time is greater than the maximum TTBR of message contracts. Or;
  • During the deployment of each Phase verify that messages in the error queue will not exhibit the above problem.

Related Articles


Last modified