Manipulate Pipeline with Behaviors

Component: NServiceBus
NuGet Package NServiceBus (5.x)

Pipelines are made up of a group of steps acting on the same level of abstraction. This allows scenarios such as

  • Defining a step that works with the "incoming physical" message before it has been deserialized.
  • Defining a step that is executed before and after each handler invocation (remember: there can be multiple message handlers per message).

Extending the pipeline is done with a custom behavior implementing Behavior<TContext>.TContext is the context of the stage that the behavior belongs to.

public class SampleBehavior :
    IBehavior<IncomingContext>
{
    public void Invoke(IncomingContext context, Action next)
    {
        // custom logic before calling the next step in the pipeline.
        next();
        // custom logic after all inner steps in the pipeline completed.
    }
}

In the above code snippet the SampleBehavior class derives from the Behavior contract and targets the incoming context. This tells the framework to execute this behavior after the incoming raw message has been deserialized and a matching message type has been found. At runtime, the pipeline will call the Invoke method of each registered behavior passing in as arguments the current message context and an action to invoke the next behavior in the pipeline.

Each behavior is responsible to call the next step in the pipeline chain by invoking next().

Add a new step

To add a custom behavior to the pipeline define a step for it:

class NewPipelineStep :
    RegisterStep
{
    public NewPipelineStep()
        : base(
            stepId: "NewPipelineStep",
            behavior: typeof(SampleBehavior),
            description: "Logs a warning when processing takes too long")
    {
        // Optional: Specify where it needs to be invoked in the pipeline,
        // for example InsertBefore or InsertAfter
        InsertBefore(WellKnownStep.InvokeHandlers);
    }
}

Then register the new step in the pipeline settings:

class NewPipelineStepRegistration :
    INeedInitialization
{
    public void Customize(BusConfiguration busConfiguration)
    {
        // Register the new step in the pipeline
        var pipeline = busConfiguration.Pipeline;
        pipeline.Register<NewPipelineStep>();
    }
}

Replace an existing step

To replace the implementation of an existing step replace it with a custom behavior:

public class ReplaceExistingStep :
    INeedInitialization
{
    public void Customize(BusConfiguration busConfiguration)
    {
        var pipeline = busConfiguration.Pipeline;
        pipeline.Replace(
            stepId: "Id of the step to replace",
            newBehavior: typeof(SampleBehavior),
            description: "Description");
    }
}
Steps can also be registered from a Feature.

Exception Handling

Exceptions thrown from a behavior's Invoke method bubble up the chain. If the exception is not handled by a behavior, the message is considered as faulted which results in putting the message back in the queue (and rolling back the transaction) or moving it to the error queue (depending on the endpoint configuration).

MessageDeserializationException

If a message fails to deserialize a MessageDeserializationException will be thrown by the DeserializeLogicalMessagesBehavior. In this case, the message is directly moved to the error queue to avoid blocking the system by poison messages.

Sharing data between Behaviors

Sometimes a parent behavior might need to pass some information to a child behavior and vice versa. The context parameter of a behavior's Invoke method facilitates passing data between behaviors. The context is very similar to a shared dictionary which allows adding and retrieving information from different behaviors.

public class ParentBehavior :
    IBehavior<IncomingContext>
{
    public void Invoke(IncomingContext context, Action next)
    {
        // set some shared information on the context
        context.Set(new SharedData());
        next();
    }
}

public class ChildBehavior :
    IBehavior<IncomingContext>
{
    public void Invoke(IncomingContext context, Action next)
    {
        // access the shared data
        var data = context.Get<SharedData>();
        next();
    }
}
Contexts are not concurrency safe.

Mutators versus Behavior

Shared concepts and functionality

  • Can manipulate pipeline state
  • Can be executed in the incoming or outgoing pipeline
  • Exceptions cause bubble up the pipeline and are handled by the Recoverability

Differences

Note that these are relative differences. So, for example, a Behavior is only "high complexity" in comparison to a Mutator.

MutatorBehavior
Complexity to implementLowHigh
FlexibilityLowHigh
Location in pipelineFixedFlexible
Complexity to testLowMedium*
Can control nested actionNoYes
Effects call stack depthNoYes
Can replace existing BehaviorNoYes

Related Articles


Last modified