Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Message Mutators

Component: NServiceBus
NuGet Package: NServiceBus (7.x)

Message mutators allow mutation of messages in the pipeline.

NServiceBus supports two categories of message mutators:

Logical message mutators

Message mutators change/react to individual messages being sent or received. The IMutateOutgoingMessages or IMutateIncomingMessages interfaces allow the implementation of hooks for the sending and receiving sides.

Mutators can be used to perform actions such as validation of outgoing/incoming messages.

IMutateIncomingMessages

public class MutateIncomingMessages :
    IMutateIncomingMessages
{
    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        // the incoming headers
        var headers = context.Headers;

        // the incoming message
        // optionally replace the message instance by setting context.Message
        var message = context.Message;

        return Task.CompletedTask;
    }
}

IMutateOutgoingMessages

public class MutateOutgoingMessages :
    IMutateOutgoingMessages
{
    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        // the outgoing headers
        var outgoingHeaders = context.OutgoingHeaders;

        if (context.TryGetIncomingMessage(out var incomingMessage))
        {
            // do something with the incoming message
        }

        if (context.TryGetIncomingHeaders(out var incomingHeaders))
        {
            // do something with the incoming headers
        }

        // the outgoing message
        // optionally replace the message instance by setting context.OutgoingMessage
        var outgoingMessage = context.OutgoingMessage;

        return Task.CompletedTask;
    }
}

Transport messages mutators

Transport message mutators work on the serialized transport message and are useful for compression, header manipulation, etc. Create transport message mutators by implementing the IMutateIncomingTransportMessages or IMutateOutgoingTransportMessages interfaces.

IMutateIncomingTransportMessages

public class MutateIncomingTransportMessages :
    IMutateIncomingTransportMessages
{
    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        // the bytes of the incoming messages.
        var bytes = context.Body;

        // optionally replace the Body
        context.Body = ServiceThatChangesBody.Mutate(context.Body);

        // the incoming headers
        var headers = context.Headers;

        // optional manipulate headers

        // add a header
        headers.Add("MyHeaderKey1", "MyHeaderValue");

        // remove a header
        headers.Remove("MyHeaderKey2");

        return Task.CompletedTask;
    }
}

IMutateOutgoingTransportMessages

public class MutateOutgoingTransportMessages :
    IMutateOutgoingTransportMessages
{
    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        if (context.TryGetIncomingMessage(out var incomingMessage))
        {
            // do something with the incoming message
        }

        if (context.TryGetIncomingHeaders(out var incomingHeaders))
        {
            // do something with the incoming headers
        }

        // the outgoing message
        var outgoingMessage = context.OutgoingMessage;

        // the bytes containing the serialized outgoing messages.
        var bytes = context.OutgoingBody;

        // optionally replace the Body.
        // this can be done using any information from the context
        context.OutgoingBody = ServiceThatChangesBody.Mutate(context.OutgoingMessage);

        // the outgoing headers
        var headers = context.OutgoingHeaders;

        // optional manipulate headers

        // add a header
        headers.Add("MyHeaderKey1", "MyHeaderValue");

        // remove a header
        headers.Remove("MyHeaderKey2");

        return Task.CompletedTask;
    }
}

Registering a mutator

Mutators are registered using:

endpointConfiguration.RegisterMessageMutator(new MyIncomingMessageMutator());
endpointConfiguration.RegisterMessageMutator(new MyOutgoingTransportMessageMutator());
Mutators are non-deterministic in terms of order of execution. If more fine-grained control is required over the pipeline see Pipeline Introduction.

When a mutator throws an exception

If an incoming mutator throws an exception, the message aborts, rolls back to the queue, and recoverability is applied.

If an outgoing mutator throws an exception, the exception bubbles up to the method performing the Send or Publish. If the operation is performed on a context in the pipeline the message aborts, rolls back to the queue, and recoverability is applied. If the operation is performed on the message session the exception might bubble up to the user code or tear down the application domain if not properly handled.

In NServiceBus Versions 6 and above, and all integrations that target those versions, all extension points that return Task cannot return a null Task. These APIs must return an instance of a Task, i.e. a pending Task or a CompletedTask, or be marked async. For extension points that return a Task<T>, return the value directly (for async methods) or wrap the value in a Task.FromResult(value).

Mutators versus Behaviors

Shared concepts and functionality

Both mutators and behaviors:

  • Can manipulate pipeline state
  • Can be executed in the incoming or outgoing pipeline
  • Bubble exceptions up the pipeline and handle them by the recoverability mechanism

Differences

Note that these are relative differences. So, for example, a behavior is only "high complexity" in comparison to a mutator.

MutatorBehavior
Complexity to implementLowHigh
FlexibilityLowHigh
Location in pipelineFixedFlexible
Complexity to testLowMedium*
Can control nested actionNoYes
Affects call stack depthNoYes
Can replace an existing behaviorNoYes

Samples

  • Message Mutators
    Change messages by plugging custom logic in to a couple of interfaces, encrypting as required.

Related Articles


Last modified