Message Mutators

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

This sample shows how to create a custom message mutator.

  1. Run the solution.
  2. Press 's' and 'Enter' in the window. Then press 'e' followed by 'Enter'. The Console output will look something like this (the exception message is expected):
Press 's' to send a valid message, press 'e' to send a failed message. To exit, 'q'

s
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  TransportMessageCompressionMutator transportMessage.Body size before compression: 9787013
INFO  TransportMessageCompressionMutator transportMessage.Body size after compression: 9761
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  Handler Received a CreateProductCommand message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032

e
ERROR ValidationMessageMutator Validation failed for message CreateProductCommand: ProductId=XJ128, ProductName=Milk Milk Milk Milk Milk, ListPrice=15 Image (length)=7340032, with the following error/s:
The Product Name value cannot exceed 20 characters.
The field ListPrice must be between 1 and 5.

Code walk-through

The IMutateTransportMessages and IMessageMutator interfaces give access to the message so that the the inbound and/or outbound message can be modified.

As a consumer implement the desired interface and load it into the NServiceBus container.

Similar interfaces exist for IMessageMutator, i.e., IMutateTransportMessages, which mutates transport messages. The main difference from IMessageMutator is that the transport message may have several messages in a single transport message.

This sample implements two mutators:

ValidationMessageMutator

This message mutator validates all DataAnnotations attributes that exist in the message.

Edit
public class ValidationMessageMutator :
    IMutateIncomingMessages,
    IMutateOutgoingMessages
{
    static ILog log = LogManager.GetLogger("ValidationMessageMutator");

    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        ValidateDataAnnotations(context.OutgoingMessage);
        return Task.CompletedTask;
    }

    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        ValidateDataAnnotations(context.Message);
        return Task.CompletedTask;
    }

    static void ValidateDataAnnotations(object message)
    {
        var context = new ValidationContext(message, null, null);
        var results = new List<ValidationResult>();

        var isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            log.Info($"Validation succeeded for message: {message}");
            return;
        }

        var errorMessage = new StringBuilder();
        var error = $"Validation failed for message {message}, with the following error/s:";
        errorMessage.AppendLine(error);

        foreach (var validationResult in results)
        {
            errorMessage.AppendLine(validationResult.ErrorMessage);
        }

        log.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}

ValidationMessageMutator implements the two interface methods: outgoing and incoming. As can be seen in the code, both incoming and outgoing mutators have the exact same code in them. The mutation is symmetrical.

Both call a private static method called ValidateDataAnnotations.

This means that both the outgoing message and incoming message will be validated. The mutator is working on all incoming/outgoing message types.

It is possible to examine the message type and mutate only certain types of messages by checking the type of the message object received as a parameter to the method.

Browse the code. In this sample, if one of the validation fails, an exception is thrown, detailing the 'broken' validation.

TransportMessageCompressionMutator

This transport mutator compresses the whole transport message.

Edit
public class TransportMessageCompressionMutator :
    IMutateIncomingTransportMessages,
    IMutateOutgoingTransportMessages
{
    static ILog log = LogManager.GetLogger("TransportMessageCompressionMutator");

    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        log.Info($"transportMessage.Body size before compression: {context.OutgoingBody.Length}");

        var mStream = new MemoryStream(context.OutgoingBody);
        var outStream = new MemoryStream();

        using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed,
        // otherwise, not all the compressed message will be copied.
        context.OutgoingBody = outStream.ToArray();
        context.OutgoingHeaders["IWasCompressed"] = "true";
        log.Info($"transportMessage.Body size after compression: {context.OutgoingBody.Length}");
        return Task.CompletedTask;
    }

    public async Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        if (!context.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        var memoryStream = new MemoryStream(context.Body);
        using (var bigStream = new GZipStream(memoryStream, CompressionMode.Decompress))
        {
            var bigStreamOut = new MemoryStream();
            await bigStream.CopyToAsync(bigStreamOut)
                .ConfigureAwait(false);
            context.Body = bigStreamOut.ToArray();
        }
    }
}

The TransportMessageCompressionMutator is a transport message mutator, meaning that NServiceBus allows the mutation of the outgoing or/and incoming transport message.

In the TransportMessageCompressionMutator class, both the incoming and outgoing methods are implemented.

This mutator is acting on all transport messages, regardless of what message types the transport message carries.

The compression code is straightforward and utilizes the .NET framework GZipStream class to do the compression.

After the compression is done, the compressed array is placed back in the transport message Body property.

This sample signals to the receiving end that this transport message was mutated (compressed) by placing a "true" string in the header key IWasCompressed.

Decompression is done in the incoming method if the key IWasCompressed exists.

If the key is missing, the message is returned, unmutated.

Otherwise, the incoming method is replacing the transport message Body compressed content an uncompressed one.

Configuring NServiceBus to use the message mutators

To hook the sample message mutators into NServiceBus messaging flow:

Edit
endpointConfiguration.RegisterComponents(
    registration: components =>
    {
        components.ConfigureComponent<ValidationMessageMutator>(DependencyLifecycle.InstancePerCall);
        components.ConfigureComponent<TransportMessageCompressionMutator>(DependencyLifecycle.InstancePerCall);
    });

The Sending code

Edit
var smallMessage = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
};
await endpointInstance.SendLocal(smallMessage)
    .ConfigureAwait(false);

Since the message buffer field is empty, GZipStreamer in the outgoing transport message mutator easily compresses it to a size within the MSMQ limit of 4MB and the message will get to the server.

See how an invalid message is sent that will never be received since an exception will be thrown at the outgoing message mutator:

Edit
var largeMessage = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    // 7MB. MSMQ should throw an exception, but it will not since the buffer will be compressed
    // before it reaches MSMQ.
    Image = new byte[1024 * 1024 * 7]
};
await endpointInstance.SendLocal(largeMessage)
    .ConfigureAwait(false);

The message is invalid for several reasons: the product name is over the 20 character limit, the list price is too high, and the sell end date is not in the valid range. The thrown exception logs those invalid values. The server code is simple and straightforward:

Edit
public class Handler :
    IHandleMessages<CreateProductCommand>
{
    static ILog log = LogManager.GetLogger<Handler>();

    public Task Handle(CreateProductCommand message, IMessageHandlerContext context)
    {
        log.Info($"Received a CreateProductCommand message: {message}");
        return Task.CompletedTask;
    }
}

The handler code does not need to change on account of the message mutation.

Samples

Related Articles


Last modified