Pipelines are made up of a group of steps acting on the same level of abstraction. This allows a number of scenarios, such as:
- Defining a step that works with the "incoming physical" message before it has been deserialized.
- Defining a step that is executed before and after each handler invocation (keeping in mind that there can be multiple message handlers per message).
Extending the pipeline is done with a custom behavior implementing Behavior
. TContext
is the context of the stage that the behavior belongs to. A list of possible pipeline stages to which a behavior can be attached can be found in the Steps, Stages, and Connectors article.
Custom behavior should not rely on ordering within the same stage. If a custom behavior requires access to information that is generated by a previous stage, put the behavior into the next stage of the pipeline. If a custom behavior must create information that other behaviors rely on, place it in appropriate stages before those behaviors.
public class SampleBehavior :
Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// custom logic before calling the next step in the pipeline.
await next().ConfigureAwait(false);
// custom logic after all inner steps in the pipeline completed.
}
}
In the above code snippet the SampleBehavior
class derives from the Behavior contract and targets the incoming context. This tells the framework to execute this behavior after the incoming raw message has been deserialized and a matching message type has been found. At runtime, the pipeline will call the Invoke
method of each registered behavior passing in as arguments the current message context and an action to invoke the next behavior in the pipeline.
next()
.Add a new step
To add a custom behavior to the pipeline, register it from the endpoint configuration:
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
behavior: new SampleBehavior(),
description: "Logs a warning when processing takes too long");
Behaviors can also be registered from a Feature
as shown below:
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Register(
behavior: new SampleBehavior(),
description: "Logs a warning when processing takes too long");
}
Replace an existing step
To replace the implementation of an existing step, substitute it with a custom behavior:
public class ReplaceExistingStep :
INeedInitialization
{
public void Customize(EndpointConfiguration endpointConfiguration)
{
var pipeline = endpointConfiguration.Pipeline;
pipeline.Replace(
stepId: "Id of the step to replace",
newBehavior: typeof(SampleBehavior),
description: "Description");
}
}
In order to replace the existing step, it is necessary to provide a step ID. The most reliable way of determining the step ID is to find the step definition in the NServiceBus source code.
Note that step IDs are hard-coded strings and may change in the future resulting in an unexpected behavior change. When replacing built-in steps, create automatic tests that will detect potential ID changes or step removal.
Exception handling
Exceptions thrown from a behavior's Invoke
method bubble up the chain. If the exception is not handled by a behavior, the message is considered faulty which results in putting the message back in the queue (and rolling back the transaction, when configured) or moving it to the error queue (depending on the endpoint configuration).
MessageDeserializationException
If a message fails to deserialize, a MessageDeserializationException
exception will be thrown by the DeserializeLogicalMessagesBehavior
behavior. In this case, the message is directly moved to the error queue to avoid blocking the system with poison messages.
Skip serialization
When writing extensions to the pipeline it may be necessary to either take control of the serialization or to skip it entirely. One example of this is with the callbacks feature. Callbacks skip serialization for integers and enums and instead embed them in the message headers.
To skip serialization, implement a behavior that targets IOutgoingLogicalMessageContext
. For example, the following behavior skips serialization if the message is an integer, placing it in a header instead.
class SkipSerializationForInts :
Behavior<IOutgoingLogicalMessageContext>
{
public override Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
{
var outgoingLogicalMessage = context.Message;
if (outgoingLogicalMessage.MessageType == typeof(int))
{
var headers = context.Headers;
headers["MyCustomHeader"] = outgoingLogicalMessage.Instance.ToString();
context.SkipSerialization();
}
return next();
}
public class Registration :
RegisterStep
{
public Registration()
: base(
stepId: "SkipSerializationForInts",
behavior: typeof(SkipSerializationForInts),
description: "Skips serialization for integers")
{
}
}
}
On the receiving side, this header can then be extracted and decisions on the incoming message processing pipeline can be made based on it.
Sharing data between behaviors
Sometimes a parent behavior might need to pass information to a child behavior and vice versa. The context
parameter of a behavior's Invoke
method facilitates passing data between behaviors. The context is similar to a shared dictionary which allows adding and retrieving information from different behaviors.
Injecting dependencies into behaviors
public class BehaviorUsingDependencyInjection :
Behavior<IIncomingLogicalMessageContext>
{
// Dependencies injected into the constructor are singletons and cached for the lifetime
// of the endpoint
public BehaviorUsingDependencyInjection(SingletonDependency singletonDependency)
{
this.singletonDependency = singletonDependency;
}
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var scopedDependency = context.Builder.Build<ScopedDependency>();
// do something with the scoped dependency before
await next();
// do something with the scoped dependency after
}
SingletonDependency singletonDependency;
}
Dependencies injected into the constructor of a behavior become singletons regardless of their actual scope on the dependency injection container. In order to create instances per request or scoped dependencies it is required to use the builder that is available on the context.
Accessing options from Behaviors
Settings configured on message operation options (e.g., SendOptions
, PublishOptions
, etc.) are internally stored in a dedicated ContextBag
. These settings are accessibe within the pipeline via the context.
extension:
class MyMessageHandler : IHandleMessages<MyIncomingMessage>
{
public Task Handle(MyIncomingMessage message, IMessageHandlerContext context)
{
var sendOptions = new SendOptions();
// Configure a custom setting for the outgoing message:
sendOptions.GetExtensions().Set("MySettingsKey", true);
return context.Send(new MyOutgoingMessage(), sendOptions);
}
}
class MyCustomBehavior : Behavior<IOutgoingSendContext>
{
public override Task Invoke(IOutgoingSendContext context, Func<Task> next)
{
// Retrieve the custom setting in the outgoing pipeline:
if (context.GetOperationProperties().TryGet("MySettingsKey", out bool settingValue))
{
// ...
}
return next();
}
}
Mutators versus Behaviors
Shared concepts and functionality
Both mutators and behaviors:
- Can manipulate pipeline state
- Can be executed in the incoming or outgoing pipeline
- Bubble exceptions up the pipeline and handle them by the recoverability mechanism
Differences
Note that these are relative differences. So, for example, a behavior is only "high complexity" in comparison to a mutator.
Mutator | Behavior | |
---|---|---|
Complexity to implement | Low | High |
Flexibility | Low | High |
Location in pipeline | Fixed | Flexible |
Complexity to test | Low | Medium* |
Can control nested action | No | Yes |
Affects call stack depth | No | Yes |
Can replace an existing behavior | No | Yes |
- For more information refer to the testing behaviors unit testing sample.