Message Mutators

Component: NServiceBus
NuGet Package NServiceBus (5.x)

This sample shows how to create a custom message mutator.

Executing the sample

  1. Run the solution.
  2. Press 's' in the window to send a valid message. Then press 'e' to send an invalid message. (The exception is expected.) The console output will look something like this:
Press 's' to send a valid message, press 'e' to send a failed message. To exit, 'q'

s
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  TransportMessageCompressionMutator transportMessage.Body size before compression: 9787013
INFO  TransportMessageCompressionMutator transportMessage.Body size after compression: 9761
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  Handler Received a CreateProductCommand message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032

e
ERROR ValidationMessageMutator Validation failed for message CreateProductCommand: ProductId=XJ128, ProductName=Milk Milk Milk Milk Milk, ListPrice=15 Image (length)=7340032, with the following error/s:
The Product Name value cannot exceed 20 characters.
The field ListPrice must be between 1 and 5.

Code walk-through

Logical message mutators

The IMutateIncomingMessages and IMutateOutgoingMessages interfaces give access to the message so that the inbound and/or outbound message can be modified.

This sample implements two mutators, which validate all DataAnnotations attributes on both incoming or outgoing messages and throw an exception if the validation fails.

public class ValidationMessageMutator :
    IMutateIncomingMessages,
    IMutateOutgoingMessages
{
    static ILog log = LogManager.GetLogger("ValidationMessageMutator");

    public object MutateOutgoing(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    public object MutateIncoming(object message)
    {
        ValidateDataAnnotations(message);
        return message;
    }

    static void ValidateDataAnnotations(object message)
    {
        var context = new ValidationContext(message, null, null);
        var results = new List<ValidationResult>();

        var isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            log.Info($"Validation succeeded for message: {message}");
            return;
        }

        var errorMessage = new StringBuilder();
        var error = $"Validation failed for message {message}, with the following error/s:";
        errorMessage.AppendLine(error);

        foreach (var validationResult in results)
        {
            errorMessage.AppendLine(validationResult.ErrorMessage);
        }

        log.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}
IMessageMutator could be used in place of IMutateIncomingMessages and IMutateOutgoingMessages. The IMessageMutator interface is simply a shorthand that combines the other two.

Although not shown here, it is possible to mutate only certain types of messages by checking the type of the message object received as a parameter to the method.

Transport message mutators

Similar interfaces exist for transport messages, i.e. IMutateIncomingTransportMessages and IMutateOutgoingTransportMessages. The main difference from the logical message mutators is that the transport message may have several messages in a single transport message.

This transport mutator compresses the outgoing transport message and decompresses the incoming message.

public class TransportMessageCompressionMutator :
    IMutateIncomingTransportMessages,
    IMutateOutgoingTransportMessages
{
    static ILog log = LogManager.GetLogger("TransportMessageCompressionMutator");

    public void MutateOutgoing(LogicalMessage message, TransportMessage transportMessage)
    {
        log.Info($"transportMessage.Body size before compression: {transportMessage.Body.Length}");

        var mStream = new MemoryStream(transportMessage.Body);
        var outStream = new MemoryStream();

        using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed,
        // otherwise, not all the compressed message will be copied.
        transportMessage.Body = outStream.ToArray();
        transportMessage.Headers["IWasCompressed"] = "true";
        log.Info($"transportMessage.Body size after compression: {transportMessage.Body.Length}");
    }

    public void MutateIncoming(TransportMessage transportMessage)
    {
        if (!transportMessage.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        var memoryStream = new MemoryStream(transportMessage.Body);
        using (var bigStream = new GZipStream(memoryStream, CompressionMode.Decompress))
        {
            var bigStreamOut = new MemoryStream();
            bigStream.CopyTo(bigStreamOut);
            transportMessage.Body = bigStreamOut.ToArray();
        }
    }
}
IMutateTransportMessages could be used in place of IMutateIncomingTransportMessages and IMutateOutgoingTransportMessages. The IMutateTransportMessages interface is simply a shorthand that combines the other two.

The TransportMessageCompressionMutator is a transport message mutator, meaning that NServiceBus allows the mutation of the outgoing and incoming transport message.

The compression code uses the .NET framework GZipStream class to do the compression. After the compression is done, the compressed array is placed back in the transport message Body property. The sample then signals to the receiver that the transport message was mutated (compressed) by setting the header key IWasCompressed to "true". The incoming mutator uses this key to determine whether or not to mutate (decompress) the message.

Configuring NServiceBus to use the message mutators

To hook the sample message mutators into NServiceBus messaging flow:

busConfiguration.RegisterComponents(
    registration: components =>
    {
        components.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
        components.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
    });

Sending a valid message

The message sent in the sample includes a 7MB image:

var smallMessage = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
};
bus.SendLocal(smallMessage);

Since the sample uses the default MSMQ transport, this message will fail without the mutator since it would exceed MSMQ's message size limit of 4MB. However, the outgoing transport message mutator compresses it to a size within the MSMQ limit and the message will get to the server.

Sending an invalid message

The sample sends a similar message but with data that fails the logical message mutator's validation:

var largeMessage = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
};
bus.SendLocal(largeMessage);

The message is invalid for several reasons: the product name is over the 20 character limit, the list price is too high, and the sell end date is not in the valid range. The exception logs those invalid values.

Receiving a message

The receiver is straightforward. There is no special code to handle validation or compression since this is done by the logical and transport message mutators.

public class Handler :
    IHandleMessages<CreateProductCommand>
{
    static ILog log = LogManager.GetLogger<Handler>();

    public void Handle(CreateProductCommand createProductCommand)
    {
        log.Info($"Received a CreateProductCommand message: {createProductCommand}");
    }
}

Samples

Related Articles


Last modified