Message Mutators


NuGet packages

  1. Run the solution.
  2. Press 's' and 'Enter' in the window. Then press 'e' followed by 'Enter'. The Console output will look something like this (the exception message is expected):
Press 's' to send a valid message, press 'e' to send a failed message. To exit, 'q'

s
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  TransportMessageCompressionMutator transportMessage.Body size before compression: 9787013
INFO  TransportMessageCompressionMutator transportMessage.Body size after compression: 9761
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  Handler Received a CreateProductCommand message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032

e
ERROR ValidationMessageMutator Validation failed for message CreateProductCommand: ProductId=XJ128, ProductName=Milk Milk Milk Milk Milk, ListPrice=15 Image (length)=7340032, with the following error/s:
The Product Name value cannot exceed 20 characters.
The field ListPrice must be between 1 and 5.

Now let's look at the code.

Code walk-through

This sample shows how to create a custom message mutator.

The IMutateTransportMessages and IMessageMutator interfaces give access to the message so that you can mutate on the inbound and/or outbound message.

All you have to do as a consumer is implement the desired interface and load it into the NServiceBus container.

Similar interfaces exist for IMessageMutator, i.e., IMutateTransportMessages, which mutates transport messages. The main difference from IMessageMutator is that the transport message may have several messages in a single transport message.

This sample implements two mutators:

ValidationMessageMutator

This message mutator validates all DataAnnotations attributes that exist in the message.

public class ValidationMessageMutator : IMessageMutator
{
    static ILog logger = LogManager.GetLogger("ValidationMessageMutator");

    public object MutateOutgoing(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    public object MutateIncoming(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    static void ValidateDataAnnotations(object message)
    {
        ValidationContext context = new ValidationContext(message, null, null);
        List<ValidationResult> results = new List<ValidationResult>();

        bool isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            logger.Info(string.Format("Validation succeeded for message: {0}", message));
            return;
        }

        StringBuilder errorMessage = new StringBuilder();
        string error = string.Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine,message);
        errorMessage.Append(error);

        foreach (ValidationResult validationResult in results)
        {
            errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine);
        }

        logger.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}
public class ValidationMessageMutator : IMessageMutator
{
    static ILog logger = LogManager.GetLogger("ValidationMessageMutator");

    public object MutateOutgoing(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    public object MutateIncoming(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    static void ValidateDataAnnotations(object message)
    {
        ValidationContext context = new ValidationContext(message, null, null);
        List<ValidationResult> results = new List<ValidationResult>();

        bool isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            logger.InfoFormat("Validation succeeded for message: {0}", message);
            return;
        }

        StringBuilder errorMessage = new StringBuilder();
        string error = string.Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine,message);
        errorMessage.Append(error);

        foreach (ValidationResult validationResult in results)
        {
            errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine);
        }

        logger.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}
public class ValidationMessageMutator : IMutateIncomingMessages, IMutateOutgoingMessages
{
    static ILog logger = LogManager.GetLogger("ValidationMessageMutator");

    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        ValidateDataAnnotations(context.OutgoingMessage);
        return Task.FromResult(0);
    }

    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        ValidateDataAnnotations(context.Message);
        return Task.FromResult(0);
    }

    static void ValidateDataAnnotations(object message)
    {
        ValidationContext context = new ValidationContext(message, null, null);
        List<ValidationResult> results = new List<ValidationResult>();

        bool isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            logger.InfoFormat("Validation succeeded for message: {0}", message);
            return;
        }

        StringBuilder errorMessage = new StringBuilder();
        string error = string.Format("Validation failed for message {0}, with the following error/s: " + Environment.NewLine,message);
        errorMessage.Append(error);

        foreach (ValidationResult validationResult in results)
        {
            errorMessage.Append(validationResult.ErrorMessage + Environment.NewLine);
        }

        logger.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}

ValidationMessageMutator implements the two interface methods: outgoing and incoming. As can be seen in the code, both incoming and outgoing mutators have the exact same code in them. The mutation is symmetrical.

Both call a private static method called ValidateDataAnnotations.

This means that both the outgoing message and incoming message will be validated. The mutator is working on all incoming/outgoing message types.

It is possible to examine the message type and mutate only certain types of messages by checking the type of the message object received as a parameter to the method.

Browse the code. In this sample, if one of the validation fails, an exception is thrown, detailing the 'broken' validation.

TransportMessageCompressionMutator

This transport mutator compresses the whole transport message.

public class TransportMessageCompressionMutator : IMutateTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
    {
        logger.Info(string.Format("transportMessage.Body size before compression: {0}", transportMessage.Body.Length));

        MemoryStream mStream = new MemoryStream(transportMessage.Body);
        MemoryStream outStream = new MemoryStream();

        using (GZipStream tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed, 
        // otherwise, not all the compressed message will be copied.
        transportMessage.Body = outStream.ToArray();
        transportMessage.Headers["IWasCompressed"] = "true";
        logger.Info(string.Format("transportMessage.Body size after compression: {0}", transportMessage.Body.Length));
    }

    public void MutateIncoming(TransportMessage transportMessage)
    {
        if (!transportMessage.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        using (GZipStream bigStream = new GZipStream(new MemoryStream(transportMessage.Body), CompressionMode.Decompress))
        {
            MemoryStream bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            transportMessage.Body = bigStreamOut.ToArray();
        }
    }
}
public class TransportMessageCompressionMutator : IMutateTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
    {
        logger.InfoFormat("transportMessage.Body size before compression: {0}", transportMessage.Body.Length);

        MemoryStream mStream = new MemoryStream(transportMessage.Body);
        MemoryStream outStream = new MemoryStream();

        using (GZipStream tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed, 
        // otherwise, not all the compressed message will be copied.
        transportMessage.Body = outStream.ToArray();
        transportMessage.Headers["IWasCompressed"] = "true";
        logger.InfoFormat("transportMessage.Body size after compression: {0}", transportMessage.Body.Length);
    }

    public void MutateIncoming(TransportMessage transportMessage)
    {
        if (!transportMessage.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        using (GZipStream bigStream = new GZipStream(new MemoryStream(transportMessage.Body), CompressionMode.Decompress))
        {
            MemoryStream bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            transportMessage.Body = bigStreamOut.ToArray();
        }
    }
}
public class TransportMessageCompressionMutator : IMutateTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public void MutateOutgoing(LogicalMessage message, TransportMessage transportMessage)
    {
        logger.InfoFormat("transportMessage.Body size before compression: {0}", transportMessage.Body.Length);

        MemoryStream mStream = new MemoryStream(transportMessage.Body);
        MemoryStream outStream = new MemoryStream();

        using (GZipStream tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed, 
        // otherwise, not all the compressed message will be copied.
        transportMessage.Body = outStream.ToArray();
        transportMessage.Headers["IWasCompressed"] = "true";
        logger.InfoFormat("transportMessage.Body size after compression: {0}", transportMessage.Body.Length);
    }

    public void MutateIncoming(TransportMessage transportMessage)
    {
        if (!transportMessage.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        using (GZipStream bigStream = new GZipStream(new MemoryStream(transportMessage.Body), CompressionMode.Decompress))
        {
            MemoryStream bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            transportMessage.Body = bigStreamOut.ToArray();
        }
    }
}
public class TransportMessageCompressionMutator : IMutateIncomingTransportMessages, IMutateOutgoingTransportMessages
{
    static ILog logger = LogManager.GetLogger("TransportMessageCompressionMutator");

    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        logger.InfoFormat("transportMessage.Body size before compression: {0}", context.OutgoingBody.Length);

        MemoryStream mStream = new MemoryStream(context.OutgoingBody);
        MemoryStream outStream = new MemoryStream();

        using (GZipStream tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed, 
        // otherwise, not all the compressed message will be copied.
        context.OutgoingBody = outStream.ToArray();
        context.OutgoingHeaders["IWasCompressed"]= "true";
        logger.InfoFormat("transportMessage.Body size after compression: {0}", context.OutgoingBody.Length);
        return Task.FromResult(0);
    }

    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        if (!context.Headers.ContainsKey("IWasCompressed"))
        {
            return Task.FromResult(0);
        }
        using (GZipStream bigStream = new GZipStream(new MemoryStream(context.Body), CompressionMode.Decompress))
        {
            MemoryStream bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            context.Body = bigStreamOut.ToArray();
        }
        return Task.FromResult(0);
    }
}

The TransportMessageCompressionMutator is a transport message mutator, meaning that NServiceBus allows you to mutate the outgoing or/and incoming transport message.

In the TransportMessageCompressionMutator class, both the incoming and outgoing methods are implemented.

This mutator is acting on all transport messages, regardless of what message types the transport message carries.

The compression code is straightforward and utilizes the .NET framework GZipStream class to do the compression.

After the compression is done, the compressed array is placed back in the transport message Body property.

This sample signals to the receiving end that this transport message was mutated (compressed) by placing a "true" string in the header key IWasCompressed.

Decompression is done in the incoming method if the key IWasCompressed exists.

If the key is missing, the message is returned, unmutated.

Otherwise, the incoming method is replacing the transport message Body compressed content an uncompressed one.

Now all we have to do it hook those two mutators into the NServiceBus message flow.

Configuring NServiceBus to use the message mutators

To hook the sample message mutators into NServiceBus messaging flow:

configure.Configurer.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
configure.Configurer.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
busConfiguration.RegisterComponents(components =>
{
    components.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
    components.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
});
busConfiguration.RegisterComponents(components =>
{
    components.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
    components.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
});

The Sending code

bus.SendLocal(new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed 
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
});
await busSession.SendLocal(new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed 
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
});

Since the message buffer field is empty, GZipStreamer in the outgoing transport message mutator easily compresses it to a size under the MSMQ limit of 4MB and the message will get to the server.

See how an invalid message is sent that will never be received since an exception will be thrown at the outgoing message mutator:

bus.SendLocal(new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed 
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
});
await busSession.SendLocal(new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed 
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
});

The message is invalid for several reasons: the product name is over the 20 character limit, the list price is too high, and the sell end date is not in the valid range. The thrown exception logs those invalid values. The server code is simple and straightforward:

public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger(typeof(Handler));

    public void Handle(CreateProductCommand createProductCommand)
    {
        logger.Info(string.Format("Received a CreateProductCommand message: {0}", createProductCommand));
    }
}
public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger(typeof(Handler));

    public void Handle(CreateProductCommand createProductCommand)
    {
        logger.InfoFormat("Received a CreateProductCommand message: {0}", createProductCommand);
    }
}
public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger<Handler>();

    public void Handle(CreateProductCommand createProductCommand)
    {
        logger.InfoFormat("Received a CreateProductCommand message: {0}", createProductCommand);
    }
}
public class Handler : IHandleMessages<CreateProductCommand>
{
    static ILog logger = LogManager.GetLogger<Handler>();

    public Task Handle(CreateProductCommand message, IMessageHandlerContext context)
    {
        logger.InfoFormat("Received a CreateProductCommand message: {0}", message);
        return Task.FromResult(0);
    }
}

The handler code does not need to change on account of the message mutation.

Samples

Related Articles


Last modified 2015-11-10 11:50:35Z