Message Mutators


NuGet packages

6-pre NServiceBus
5.x NServiceBus
4.x NServiceBus
This topic contains pre-release documentation (for Version 6-pre) and is subject to change prior to the final release.
  1. Run the solution.
  2. Press 's' and 'Enter' in the window. Then press 'e' followed by 'Enter'. The Console output will look something like this (the exception message is expected):
Press 's' to send a valid message, press 'e' to send a failed message. To exit, 'q'

s
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  TransportMessageCompressionMutator transportMessage.Body size before compression: 9787013
INFO  TransportMessageCompressionMutator transportMessage.Body size after compression: 9761
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  Handler Received a CreateProductCommand message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032

e
ERROR ValidationMessageMutator Validation failed for message CreateProductCommand: ProductId=XJ128, ProductName=Milk Milk Milk Milk Milk, ListPrice=15 Image (length)=7340032, with the following error/s:
The Product Name value cannot exceed 20 characters.
The field ListPrice must be between 1 and 5.

Now look at the code.

Code walk-through

This sample shows how to create a custom message mutator.

The IMutateTransportMessages and IMessageMutator interfaces give access to the message so that the the inbound and/or outbound message can be modified.

As a consumer implement the desired interface and load it into the NServiceBus container.

Similar interfaces exist for IMessageMutator, i.e., IMutateTransportMessages, which mutates transport messages. The main difference from IMessageMutator is that the transport message may have several messages in a single transport message.

This sample implements two mutators:

ValidationMessageMutator

This message mutator validates all DataAnnotations attributes that exist in the message.

6-pre NServiceBus
public class ValidationMessageMutator : IMutateIncomingMessages, IMutateOutgoingMessages
{
    static ILog logger = LogManager.GetLogger("ValidationMessageMutator");

    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        ValidateDataAnnotations(context.OutgoingMessage);
        return Task.FromResult(0);
    }

    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        ValidateDataAnnotations(context.Message);
        return Task.FromResult(0);
    }

    static void ValidateDataAnnotations(object message)
    {
        var context = new ValidationContext(message, null, null);
        var results = new List<ValidationResult>();

        var isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            logger.Info($"Validation succeeded for message: {message}");
            return;
        }

        var errorMessage = new StringBuilder();
        var error = string.Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine,message);
        errorMessage.Append(error);

        foreach (var validationResult in results)
        {
            errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine);
        }

        logger.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}
5.x NServiceBus
public class ValidationMessageMutator : IMessageMutator
{
    static ILog logger = LogManager.GetLogger("ValidationMessageMutator");

    public object MutateOutgoing(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    public object MutateIncoming(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    static void ValidateDataAnnotations(object message)
    {
        var context = new ValidationContext(message, null, null);
        var results = new List<ValidationResult>();

        var isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            logger.InfoFormat("Validation succeeded for message: {0}", message);
            return;
        }

        var errorMessage = new StringBuilder();
        var error = string.Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine, message);
        errorMessage.Append(error);

        foreach (var validationResult in results)
        {
            errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine);
        }

        logger.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}
4.x NServiceBus
public class ValidationMessageMutator : IMessageMutator
{
    static ILog logger = LogManager.GetLogger("ValidationMessageMutator");

    public object MutateOutgoing(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    public object MutateIncoming(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    static void ValidateDataAnnotations(object message)
    {
        var context = new ValidationContext(message, null, null);
        var results = new List<ValidationResult>();

        var isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            logger.Info($"Validation succeeded for message: {message}");
            return;
        }

        var errorMessage = new StringBuilder();
        var error = string.Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine,message);
        errorMessage.Append(error);

        foreach (var validationResult in results)
        {
            errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine);
        }

        logger.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}

ValidationMessageMutator implements the two interface methods: outgoing and incoming. As can be seen in the code, both incoming and outgoing mutators have the exact same code in them. The mutation is symmetrical.

Both call a private static method called ValidateDataAnnotations.

This means that both the outgoing message and incoming message will be validated. The mutator is working on all incoming/outgoing message types.

It is possible to examine the message type and mutate only certain types of messages by checking the type of the message object received as a parameter to the method.

Browse the code. In this sample, if one of the validation fails, an exception is thrown, detailing the 'broken' validation.

TransportMessageCompressionMutator

This transport mutator compresses the whole transport message.

6-pre NServiceBus
public class TransportMessageCompressionMutator : IMutateIncomingTransportMessages, IMutateOutgoingTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        logger.Info($"transportMessage.Body size before compression: {context.OutgoingBody.Length}");

        var mStream = new MemoryStream(context.OutgoingBody);
        var outStream = new MemoryStream();

        using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed,
        // otherwise, not all the compressed message will be copied.
        context.OutgoingBody = outStream.ToArray();
        context.OutgoingHeaders["IWasCompressed"]= "true";
        logger.Info($"transportMessage.Body size after compression: {context.OutgoingBody.Length}");
        return Task.FromResult(0);
    }

    public async Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        if (!context.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        var memoryStream = new MemoryStream(context.Body);
        using (var bigStream = new GZipStream(memoryStream, CompressionMode.Decompress))
        {
            var bigStreamOut = new MemoryStream();
            await bigStream.CopyToAsync(bigStreamOut)
                .ConfigureAwait(false);
            context.Body = bigStreamOut.ToArray();
        }
    }
}
5.x NServiceBus
public class TransportMessageCompressionMutator : IMutateTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public void MutateOutgoing(LogicalMessage message, TransportMessage transportMessage)
    {
        logger.InfoFormat("transportMessage.Body size before compression: {0}", transportMessage.Body.Length);

        var mStream = new MemoryStream(transportMessage.Body);
        var outStream = new MemoryStream();

        using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed, 
        // otherwise, not all the compressed message will be copied.
        transportMessage.Body = outStream.ToArray();
        transportMessage.Headers["IWasCompressed"] = "true";
        logger.InfoFormat("transportMessage.Body size after compression: {0}", transportMessage.Body.Length);
    }

    public void MutateIncoming(TransportMessage transportMessage)
    {
        if (!transportMessage.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        var memoryStream = new MemoryStream(transportMessage.Body);
        using (var bigStream = new GZipStream(memoryStream, CompressionMode.Decompress))
        {
            var bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            transportMessage.Body = bigStreamOut.ToArray();
        }
    }
}
4.x NServiceBus
public class TransportMessageCompressionMutator : IMutateTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
    {
        logger.Info($"transportMessage.Body size before compression: {transportMessage.Body.Length}");

        var mStream = new MemoryStream(transportMessage.Body);
        var outStream = new MemoryStream();

        using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed, 
        // otherwise, not all the compressed message will be copied.
        transportMessage.Body = outStream.ToArray();
        transportMessage.Headers["IWasCompressed"] = "true";
        logger.Info($"transportMessage.Body size after compression: {transportMessage.Body.Length}");
    }

    public void MutateIncoming(TransportMessage transportMessage)
    {
        if (!transportMessage.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        var memoryStream = new MemoryStream(transportMessage.Body);
        using (var bigStream = new GZipStream(memoryStream, CompressionMode.Decompress))
        {
            var bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            transportMessage.Body = bigStreamOut.ToArray();
        }
    }
}

The TransportMessageCompressionMutator is a transport message mutator, meaning that NServiceBus allows the mutation of the outgoing or/and incoming transport message.

In the TransportMessageCompressionMutator class, both the incoming and outgoing methods are implemented.

This mutator is acting on all transport messages, regardless of what message types the transport message carries.

The compression code is straightforward and utilizes the .NET framework GZipStream class to do the compression.

After the compression is done, the compressed array is placed back in the transport message Body property.

This sample signals to the receiving end that this transport message was mutated (compressed) by placing a "true" string in the header key IWasCompressed.

Decompression is done in the incoming method if the key IWasCompressed exists.

If the key is missing, the message is returned, unmutated.

Otherwise, the incoming method is replacing the transport message Body compressed content an uncompressed one.

Configuring NServiceBus to use the message mutators

To hook the sample message mutators into NServiceBus messaging flow:

6-pre NServiceBus
endpointConfiguration.RegisterComponents(components =>
{
    components.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
    components.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
});
5.x NServiceBus
busConfiguration.RegisterComponents(components =>
{
    components.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
    components.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
});
4.x NServiceBus
configure.Configurer.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
configure.Configurer.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);

The Sending code

6-pre NServiceBus
var createProductCommand = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
};
await endpointInstance.SendLocal(createProductCommand)
    .ConfigureAwait(false);
4.x - 5.x NServiceBus
bus.SendLocal(new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed 
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
});

Since the message buffer field is empty, GZipStreamer in the outgoing transport message mutator easily compresses it to a size under the MSMQ limit of 4MB and the message will get to the server.

See how an invalid message is sent that will never be received since an exception will be thrown at the outgoing message mutator:

6-pre NServiceBus
var productCommand = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
};
await endpointInstance.SendLocal(productCommand)
    .ConfigureAwait(false);
4.x - 5.x NServiceBus
bus.SendLocal(new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed 
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
});

The message is invalid for several reasons: the product name is over the 20 character limit, the list price is too high, and the sell end date is not in the valid range. The thrown exception logs those invalid values. The server code is simple and straightforward:

6-pre NServiceBus
public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger<Handler>();

    public Task Handle(CreateProductCommand message, IMessageHandlerContext context)
    {
        logger.Info($"Received a CreateProductCommand message: {message}");
        return Task.FromResult(0);
    }
}
5.x NServiceBus
public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger<Handler>();

    public void Handle(CreateProductCommand createProductCommand)
    {
        logger.InfoFormat("Received a CreateProductCommand message: {0}", createProductCommand);
    }
}
4.x NServiceBus
public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger(typeof(Handler));

    public void Handle(CreateProductCommand createProductCommand)
    {
        logger.Info($"Received a CreateProductCommand message: {createProductCommand}");
    }
}

The handler code does not need to change on account of the message mutation.

Samples

Related Articles


Last modified 2016-05-17 23:14:55Z