Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring

Message Mutators

Component: NServiceBus
NuGet Package: NServiceBus (8.x)

This sample shows how to create a custom message mutator.

Executing the sample

  1. Run the solution.
  2. Press 's' in the window to send a valid message. Then press 'e' to send an invalid message. (The exception is expected.) The console output will look something like this:
Press 's' to send a valid message, press 'e' to send a failed message. To exit, 'q'

s
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  TransportMessageCompressionMutator transportMessage.Body size before compression: 9787013
INFO  TransportMessageCompressionMutator transportMessage.Body size after compression: 9761
INFO  ValidationMessageMutator Validation succeeded for message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032
INFO  Handler Received a CreateProductCommand message: CreateProductCommand: ProductId=XJ128, ProductName=Milk, ListPrice=4 Image (length)=7340032

e
ERROR ValidationMessageMutator Validation failed for message CreateProductCommand: ProductId=XJ128, ProductName=Milk Milk Milk Milk Milk, ListPrice=15 Image (length)=7340032, with the following error/s:
The Product Name value cannot exceed 20 characters.
The field ListPrice must be between 1 and 5.

Code walk-through

Logical message mutators

The IMutateIncomingMessages and IMutateOutgoingMessages interfaces give access to the message so that the inbound and/or outbound message can be modified.

This sample implements two mutators, which validate all DataAnnotations attributes on both incoming or outgoing messages and throw an exception if the validation fails.

public class ValidationMessageMutator :
    IMutateIncomingMessages,
    IMutateOutgoingMessages
{
    static ILog log = LogManager.GetLogger("ValidationMessageMutator");

    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        ValidateDataAnnotations(context.OutgoingMessage);
        return Task.CompletedTask;
    }

    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        ValidateDataAnnotations(context.Message);
        return Task.CompletedTask;
    }

    static void ValidateDataAnnotations(object message)
    {
        var context = new ValidationContext(message, null, null);
        var results = new List<ValidationResult>();

        var isValid = Validator.TryValidateObject(message, context, results, true);

        if (isValid)
        {
            log.Info($"Validation succeeded for message: {message}");
            return;
        }

        var errorMessage = new StringBuilder();
        var error = $"Validation failed for message {message}, with the following error/s:";
        errorMessage.AppendLine(error);

        foreach (var validationResult in results)
        {
            errorMessage.AppendLine(validationResult.ErrorMessage);
        }

        log.Error(errorMessage.ToString());
        throw new Exception(errorMessage.ToString());
    }
}

Although not shown here, it is possible to mutate only certain types of messages by checking the type of the message object received as a parameter to the method.

Transport message mutators

Similar interfaces exist for transport messages, i.e. IMutateIncomingTransportMessages and IMutateOutgoingTransportMessages. The main difference from the logical message mutators is that the transport message may have several messages in a single transport message.

This transport mutator compresses the outgoing transport message and decompresses the incoming message.

public class TransportMessageCompressionMutator :
    IMutateIncomingTransportMessages,
    IMutateOutgoingTransportMessages
{
    static ILog log = LogManager.GetLogger("TransportMessageCompressionMutator");

    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        log.Info($"transportMessage.Body size before compression: {context.OutgoingBody.Length}");

        var mStream = new MemoryStream(context.OutgoingBody.ToArray(), false);
        var outStream = new MemoryStream();

        using (var tinyStream = new GZipStream(outStream, CompressionMode.Compress))
        {
            mStream.CopyTo(tinyStream);
        }
        // copy the compressed buffer only after the GZipStream is disposed,
        // otherwise, not all the compressed message will be copied.
        context.OutgoingBody = outStream.ToArray();
        context.OutgoingHeaders["IWasCompressed"] = "true";
        log.Info($"transportMessage.Body size after compression: {context.OutgoingBody.Length}");
        return Task.CompletedTask;
    }

    public async Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        if (!context.Headers.ContainsKey("IWasCompressed"))
        {
            return;
        }
        var memoryStream = new MemoryStream(context.Body.ToArray());
        using (var bigStream = new GZipStream(memoryStream, CompressionMode.Decompress))
        {
            var bigStreamOut = new MemoryStream();
            await bigStream.CopyToAsync(bigStreamOut, 81920, context.CancellationToken);
            context.Body = bigStreamOut.ToArray();
        }
    }
}

The TransportMessageCompressionMutator is a transport message mutator, meaning that NServiceBus allows the mutation of the outgoing and incoming transport message.

The compression code uses the GZipStream class to do the compression. After the compression is done, the compressed array is placed back in the transport message Body property. The sample then signals to the receiver that the transport message was mutated (compressed) by setting the header key IWasCompressed to "true". The incoming mutator uses this key to determine whether or not to mutate (decompress) the message.

Configuring NServiceBus to use the message mutators

To hook the sample message mutators into NServiceBus messaging flow:

endpointConfiguration.RegisterMessageMutator(new ValidationMessageMutator());
endpointConfiguration.RegisterMessageMutator(new TransportMessageCompressionMutator());

Sending a valid message

The message sent in the sample includes a 7MB image:

var smallMessage = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Milk",
    ListPrice = 4,
    Image = new byte[1024 * 1024 * 7]
};
await endpointInstance.SendLocal(smallMessage);

Sending an invalid message

The sample sends a similar message but with data that fails the logical message mutator's validation:

var largeMessage = new CreateProductCommand
{
    ProductId = "XJ128",
    ProductName = "Really long product name",
    ListPrice = 15,
    Image = new byte[1024 * 1024 * 7]
};
await endpointInstance.SendLocal(largeMessage);

The message is invalid for several reasons: the product name is over the 20 character limit, the list price is too high, and the sell end date is not in the valid range. The exception logs those invalid values.

Receiving a message

The receiver is straightforward. There is no special code to handle validation or compression since this is done by the logical and transport message mutators.

public class Handler :
    IHandleMessages<CreateProductCommand>
{
    static ILog log = LogManager.GetLogger<Handler>();

    public Task Handle(CreateProductCommand message, IMessageHandlerContext context)
    {
        log.Info($"Received a CreateProductCommand message: {message}");
        return Task.CompletedTask;
    }
}

Samples

Related Articles


Last modified