Manipulate Pipeline with Behaviors

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

Pipelines are made up of a group of steps acting on the same level of abstraction. This allows scenarios such as

  • Defining a step that works with the "incoming physical" message before it has been deserialized.
  • Defining a step that is executed before and after each handler invocation (remember: there can be multiple message handlers per message).

Extending the pipeline is done with a custom behavior implementing Behavior<TContext>.TContext is the context of the stage that the behavior belongs to.

Edit
public class SampleBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // custom logic before calling the next step in the pipeline.

        await next().ConfigureAwait(false);

        // custom logic after all inner steps in the pipeline completed.
    }
}

In the above code snippet the SampleBehavior class derives from the Behavior contract and targets the incoming context. This tells the framework to execute this behavior after the incoming raw message has been deserialized and a matching message type has been found. At runtime, the pipeline will call the Invoke method of each registered behavior passing in as arguments the current message context and an action to invoke the next behavior in the pipeline.

Each behavior is responsible to call the next step in the pipeline chain by invoking next().

Add a new step

To add a custom behavior to the pipeline, register it from the endpoint configuration:

Edit
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
    behavior: new SampleBehavior(),
    description: "Logs a warning when processing takes too long");

Behaviors can also be registered from a Feature as shown below:

Edit
protected override void Setup(FeatureConfigurationContext context)
{
    var pipeline = context.Pipeline;
    pipeline.Register(
        behavior: new SampleBehavior(),
        description: "Logs a warning when processing takes too long");
}
Behaviors are only created once and the same instance is reused on every invocation of the pipeline. Consequently, every behavior dependency will also behave as a singleton, even if a different option was specified when registering it in the DI container. Furthermore, the behavior, and all dependencies called during the invocation phase, need to be concurrency safe.

Replace an existing step

To replace the implementation of an existing step replace it with a custom behavior:

Edit
public class ReplaceExistingStep :
    INeedInitialization
{
    public void Customize(EndpointConfiguration endpointConfiguration)
    {
        var pipeline = endpointConfiguration.Pipeline;
        pipeline.Replace(
            stepId: "Id of the step to replace",
            newBehavior: typeof(SampleBehavior),
            description: "Description");
    }
}
Steps can also be registered from a Feature.

Exception Handling

Exceptions thrown from a behavior's Invoke method bubble up the chain. If the exception is not handled by a behavior, the message is considered as faulted which results in putting the message back in the queue (and rolling back the transaction) or moving it to the error queue (depending on the endpoint configuration).

MessageDeserializationException

If a message fails to deserialize a MessageDeserializationException will be thrown by the DeserializeLogicalMessagesBehavior. In this case, the message is directly moved to the error queue to avoid blocking the system by poison messages.

Skip Serialization

When writing extensions to the pipeline it may be necessary to either take control of the serialization or to skip it entirely. One example usage of this is the Callbacks. Callbacks skips serialization for integers and enums and instead embeds them in the message headers.

To skip serialization implement a behavior that targets IOutgoingLogicalMessageContext. For example, the following behavior skips serialization if a send on an integer is requested. It instead places that in the header.

Edit
class SkipSerializationForInts :
    Behavior<IOutgoingLogicalMessageContext>
{
    public override Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
    {
        var outgoingLogicalMessage = context.Message;
        if (outgoingLogicalMessage.MessageType == typeof(int))
        {
            var headers = context.Headers;
            headers["MyCustomHeader"] = outgoingLogicalMessage.Instance.ToString();
            context.SkipSerialization();
        }
        return next();
    }

    public class Registration :
        RegisterStep
    {
        public Registration()
            : base(
                stepId: "SkipSerializationForInts",
                behavior: typeof(SkipSerializationForInts),
                description: "Skips serialization for integers")
        {
        }
    }
}

On the receiving side this header can then be extracted and decisions on the incoming message processing pipeline can be made based on it.

Sharing data between Behaviors

Sometimes a parent behavior might need to pass some information to a child behavior and vice versa. The context parameter of a behavior's Invoke method facilitates passing data between behaviors. The context is very similar to a shared dictionary which allows adding and retrieving information from different behaviors.

Edit
public class ParentBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        // set some shared information on the context
        context.Extensions.Set(new SharedData());
        return next();
    }
}

public class ChildBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // access the shared data
        var sharedData = context.Extensions.Get<SharedData>();

        return next();
    }
}
Contexts are not concurrency safe.
In Versions 6 and above the context respects the stage hierarchy and only allows adding new entries in the scope of the current context. A child behavior (later in the pipeline chain) can read and even modify entries set by a parent behavior (earlier in the pipeline chain) but entries added by the child cannot be accessed from the parent.

Mutators versus Behavior

Shared concepts and functionality

  • Can manipulate pipeline state
  • Can be executed in the incoming or outgoing pipeline
  • Exceptions cause bubble up the pipeline and are handled by the Recoverability

Differences

Note that these are relative differences. So, for example, a Behavior is only "high complexity" in comparison to a Mutator.

MutatorBehavior
Complexity to implementLowHigh
FlexibilityLowHigh
Location in pipelineFixedFlexible
Complexity to testLowMedium*
Can control nested actionNoYes
Effects call stack depthNoYes
Can replace existing BehaviorNoYes

Related Articles


Last modified