Message mutators allow mutation of messages in the pipeline.
NServiceBus supports two categories of message mutators:
Logical message mutators
Message mutators change/react to individual messages being sent or received. The IMutateOutgoingMessages
or IMutateIncomingMessages
interfaces allow the implementation of hooks for the sending and receiving sides.
Mutators can be used to perform actions such as validation of outgoing/incoming messages.
IMutateIncomingMessages
public class MutateIncomingMessages :
IMutateIncomingMessages
{
public Task MutateIncoming(MutateIncomingMessageContext context)
{
// the incoming headers
var headers = context.Headers;
// the incoming message
// optionally replace the message instance by setting context.Message
var message = context.Message;
return Task.CompletedTask;
}
}
IMutateOutgoingMessages
public class MutateOutgoingMessages :
IMutateOutgoingMessages
{
public Task MutateOutgoing(MutateOutgoingMessageContext context)
{
// the outgoing headers
var outgoingHeaders = context.OutgoingHeaders;
if (context.TryGetIncomingMessage(out var incomingMessage))
{
// do something with the incoming message
}
if (context.TryGetIncomingHeaders(out var incomingHeaders))
{
// do something with the incoming headers
}
// the outgoing message
// optionally replace the message instance by setting context.OutgoingMessage
var outgoingMessage = context.OutgoingMessage;
return Task.CompletedTask;
}
}
Transport messages mutators
Transport message mutators work on the serialized transport message and are useful for compression, header manipulation, etc. Create transport message mutators by implementing the IMutateIncomingTransportMessages
or IMutateOutgoingTransportMessages
interfaces.
IMutateIncomingTransportMessages
public class MutateIncomingTransportMessages :
IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
// the bytes of the incoming messages.
var bytes = context.Body;
// optionally replace the Body
context.Body = ServiceThatChangesBody.Mutate(context.Body);
// the incoming headers
var headers = context.Headers;
// optional manipulate headers
// add a header
headers.Add("MyHeaderKey1", "MyHeaderValue");
// remove a header
headers.Remove("MyHeaderKey2");
return Task.CompletedTask;
}
}
IMutateOutgoingTransportMessages
public class MutateOutgoingTransportMessages :
IMutateOutgoingTransportMessages
{
public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
{
if (context.TryGetIncomingMessage(out var incomingMessage))
{
// do something with the incoming message
}
if (context.TryGetIncomingHeaders(out var incomingHeaders))
{
// do something with the incoming headers
}
// the outgoing message
var outgoingMessage = context.OutgoingMessage;
// the bytes containing the serialized outgoing messages.
var bytes = context.OutgoingBody;
// optionally replace the Body.
// this can be done using any information from the context
context.OutgoingBody = ServiceThatChangesBody.Mutate(context.OutgoingMessage);
// the outgoing headers
var headers = context.OutgoingHeaders;
// optional manipulate headers
// add a header
headers.Add("MyHeaderKey1", "MyHeaderValue");
// remove a header
headers.Remove("MyHeaderKey2");
return Task.CompletedTask;
}
}
Registering a mutator
Mutators are registered using:
endpointConfiguration.RegisterMessageMutator(new MyIncomingMessageMutator());
endpointConfiguration.RegisterMessageMutator(new MyOutgoingTransportMessageMutator());
Mutators are non-deterministic in terms of order of execution. If more fine-grained control is required over the pipeline see Pipeline Introduction.
When a mutator throws an exception
If an incoming mutator throws an exception, the message aborts, rolls back to the queue, and recoverability is applied.
If an outgoing mutator throws an exception, the exception bubbles up to the method performing the Send or Publish. If the operation is performed on a context in the pipeline the message aborts, rolls back to the queue, and recoverability is applied. If the operation is performed on the message session the exception might bubble up to the user code or tear down the application domain if not properly handled.
In NServiceBus Versions 6 and above, and all integrations that target those versions, all extension points that return Task
cannot return a null Task. These APIs must return an instance of a Task, i.e. a pending Task or a CompletedTask
, or be marked async
. For extension points that return a Task
, return the value directly (for async methods) or wrap the value in a Task.
.
Mutators versus Behaviors
Shared concepts and functionality
Both mutators and behaviors:
- Can manipulate pipeline state
- Can be executed in the incoming or outgoing pipeline
- Bubble exceptions up the pipeline and handle them by the recoverability mechanism
Differences
Note that these are relative differences. So, for example, a behavior is only "high complexity" in comparison to a mutator.
Mutator | Behavior | |
---|---|---|
Complexity to implement | Low | High |
Flexibility | Low | High |
Location in pipeline | Fixed | Flexible |
Complexity to test | Low | Medium* |
Can control nested action | No | Yes |
Affects call stack depth | No | Yes |
Can replace an existing behavior | No | Yes |
- For more information refer to the testing behaviors unit testing sample.