Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Manipulate pipeline with behaviors

Component: NServiceBus
NuGet Package: NServiceBus (8.1)

Pipelines are made up of a group of steps acting on the same level of abstraction. This allows a number of scenarios, such as:

  • Defining a step that works with the "incoming physical" message before it has been deserialized.
  • Defining a step that is executed before and after each handler invocation (keeping in mind that there can be multiple message handlers per message).

Extending the pipeline is done with a custom behavior implementing Behavior<TContext>. TContext is the context of the stage that the behavior belongs to. A list of possible pipeline stages to which a behavior can be attached can be found in the Steps, Stages, and Connectors article.

Custom behavior should not rely on ordering within the same stage. If a custom behavior requires access to information that is generated by a previous stage, put the behavior into the next stage of the pipeline. If a custom behavior must create information that other behaviors rely on, place it in appropriate stages before those behaviors.

public class SampleBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // custom logic before calling the next step in the pipeline.

        await next();

        // custom logic after all inner steps in the pipeline completed.
    }
}

In the above code snippet the SampleBehavior class derives from the Behavior contract and targets the incoming context. This tells the framework to execute this behavior after the incoming raw message has been deserialized and a matching message type has been found. At runtime, the pipeline will call the Invoke method of each registered behavior passing in as arguments the current message context and an action to invoke the next behavior in the pipeline.

Add a new step

To add a custom behavior to the pipeline, register it from the endpoint configuration:

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
    behavior: new SampleBehavior(),
    description: "Logs a warning when processing takes too long");

Behaviors can also be registered from a Feature as shown below:

protected override void Setup(FeatureConfigurationContext context)
{
    var pipeline = context.Pipeline;
    pipeline.Register(
        behavior: new SampleBehavior(),
        description: "Logs a warning when processing takes too long");
}

Replace an existing step

To replace the implementation of an existing step, substitute it with a custom behavior:

public class ReplaceExistingStep :
    INeedInitialization
{
    public void Customize(EndpointConfiguration endpointConfiguration)
    {
        var pipeline = endpointConfiguration.Pipeline;
        pipeline.Replace(
            stepId: "Id of the step to replace",
            newBehavior: typeof(SampleBehavior),
            description: "Description");
    }
}

In order to replace the existing step, it is necessary to provide a step ID. The most reliable way of determining the step ID is to find the step definition in the NServiceBus source code.

Note that step IDs are hard-coded strings and may change in the future resulting in an unexpected behavior change. When replacing built-in steps, create automatic tests that will detect potential ID changes or step removal.

Add or replace a step

The Register API will throw an exception when a behavior is registered with an ID that is already present in the pipeline. On the other hand, the Replace API will throw an exception when a behavior is registered with an ID that is not found in the pipeline.

Sometimes it's impossible to know up front what the content of the pipeline is when registering a behavior.

To ensure the creation or replacement of the behavior in the pipeline, the RegisterOrReplace-API can be used.

public class RegisterOrReplaceStep :
    INeedInitialization
{
    public void Customize(EndpointConfiguration endpointConfiguration)
    {
        var pipeline = endpointConfiguration.Pipeline;
        pipeline.RegisterOrReplace(
            stepId: "StepIdThatMayOrMayNotExist",
            behavior: typeof(SampleBehavior),
            description: "Description");
    }
}

Disable an existing step

To disable the implementation of an existing step, substitute it with a behavior with no action:

public class NoActionBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        //no action taken. empty behavior
        await next();
    }
}

The behavior does nothing and calls the next step in the pipeline chain by invoking next().

public class NoActionExistingStep :
    INeedInitialization
{
    public void Customize(EndpointConfiguration endpointConfiguration)
    {
        var pipeline = endpointConfiguration.Pipeline;
        pipeline.Replace(
            stepId: "Id of the step to replace",
            newBehavior: typeof(NoActionBehavior),
            description: "Description");
    }
}

Exception handling

Exceptions thrown from a behavior's Invoke method bubble up the chain. If the exception is not handled by a behavior, the message is considered faulty which results in putting the message back in the queue (and rolling back the transaction, when configured) or moving it to the error queue (depending on the endpoint configuration).

MessageDeserializationException

If a message fails to deserialize, a MessageDeserializationException exception will be thrown by the DeserializeLogicalMessagesBehavior behavior. In this case, the message is directly moved to the error queue to avoid blocking the system with poison messages.

Skip serialization

When writing extensions to the pipeline it may be necessary to either take control of the serialization or to skip it entirely. One example of this is with the callbacks feature. Callbacks skip serialization for integers and enums and instead embed them in the message headers.

To skip serialization, implement a behavior that targets IOutgoingLogicalMessageContext. For example, the following behavior skips serialization if the message is an integer, placing it in a header instead.

class SkipSerializationForInts :
    Behavior<IOutgoingLogicalMessageContext>
{
    public override Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
    {
        var outgoingLogicalMessage = context.Message;
        if (outgoingLogicalMessage.MessageType == typeof(int))
        {
            var headers = context.Headers;
            headers["MyCustomHeader"] = outgoingLogicalMessage.Instance.ToString();
            context.SkipSerialization();
        }
        return next();
    }

    public class Registration :
        RegisterStep
    {
        public Registration()
            : base(
                stepId: "SkipSerializationForInts",
                behavior: typeof(SkipSerializationForInts),
                description: "Skips serialization for integers")
        {
        }
    }
}

On the receiving side, this header can then be extracted and decisions on the incoming message processing pipeline can be made based on it.

Sharing data between behaviors

Sometimes a parent behavior might need to pass information to a child behavior and vice versa. The context parameter of a behavior's Invoke method facilitates passing data between behaviors. The context is similar to a shared dictionary which allows adding and retrieving information from different behaviors.

public class ParentBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        // set some shared information on the context
        context.Extensions.Set(new SharedData());
        return next();
    }
}

public class ChildBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        // access the shared data
        var sharedData = context.Extensions.Get<SharedData>();

        return next();
    }
}

Injecting dependencies into behaviors

public class BehaviorUsingDependencyInjection :
    Behavior<IIncomingLogicalMessageContext>
{
    // Dependencies injected into the constructor are singletons and cached for the lifetime
    // of the endpoint
    public BehaviorUsingDependencyInjection(SingletonDependency singletonDependency)
    {
        this.singletonDependency = singletonDependency;
    }

    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var scopedDependency = context.Builder.GetRequiredService<ScopedDependency>();
        // do something with the scoped dependency before
        await next();
        // do something with the scoped dependency after
    }

    SingletonDependency singletonDependency;
}

Dependencies injected into the constructor of a behavior become singletons regardless of their actual scope on the dependency injection container. In order to create instances per request or scoped dependencies it is required to use the builder that is available on the context.

The service provider available via the context varies depending on the pipeline stage. Pipeline stages used within the context of an incoming message exposes a new child service provider created for each incoming message. All other use cases exposes the root service provider.

Behaviors in the outgoing pipeline stage exhibit different behaviors depending on how they are invoked. For example, when an outgoing behavior is invoked within the context of a message session the root service provider is exposed while when the same outgoing behavior is invoked within the scope of an incoming message, the child service provider for the incoming message will be used.

Accessing options from Behaviors

Settings configured on message operation options (e.g., SendOptions, PublishOptions, etc.) are internally stored in a dedicated ContextBag. These settings are accessibe within the pipeline via the context.GetOperationProperties() extension:

class MyMessageHandler : IHandleMessages<MyIncomingMessage>
{
    public Task Handle(MyIncomingMessage message, IMessageHandlerContext context)
    {
        var sendOptions = new SendOptions();

        // Configure a custom setting for the outgoing message:
        sendOptions.GetExtensions().Set("MySettingsKey", true);

        return context.Send(new MyOutgoingMessage(), sendOptions);
    }
}

class MyCustomBehavior : Behavior<IOutgoingSendContext>
{
    public override Task Invoke(IOutgoingSendContext context, Func<Task> next)
    {
        // Retrieve the custom setting in the outgoing pipeline:
        if (context.GetOperationProperties().TryGet("MySettingsKey", out bool settingValue))
        {
            // ...
        }

        return next();
    }
}

Mutators versus Behaviors

Shared concepts and functionality

Both mutators and behaviors:

  • Can manipulate pipeline state
  • Can be executed in the incoming or outgoing pipeline
  • Bubble exceptions up the pipeline and handle them by the recoverability mechanism

Differences

Note that these are relative differences. So, for example, a behavior is only "high complexity" in comparison to a mutator.

MutatorBehavior
Complexity to implementLowHigh
FlexibilityLowHigh
Location in pipelineFixedFlexible
Complexity to testLowMedium*
Can control nested actionNoYes
Affects call stack depthNoYes
Can replace an existing behaviorNoYes

Related Articles