Pipelines are made up of a group of steps acting on the same level of abstraction. This allows a number of scenarios, such as:
- Defining a step that works with the "incoming physical" message before it has been deserialized.
- Defining a step that is executed before and after each handler invocation (keeping in mind that there can be multiple message handlers per message).
Extending the pipeline is done with a custom behavior implementing Behavior
. TContext
is the context of the stage that the behavior belongs to. A list of possible pipeline stages to which a behavior can be attached can be found in the Steps, Stages, and Connectors article.
Custom behavior should not rely on ordering within the same stage. If a custom behavior requires access to information that is generated by a previous stage, put the behavior into the next stage of the pipeline. If a custom behavior must create information that other behaviors rely on, place it in appropriate stages before those behaviors.
public class SampleBehavior :
Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
// custom logic before calling the next step in the pipeline.
await next();
// custom logic after all inner steps in the pipeline completed.
}
}
In the above code snippet the SampleBehavior
class derives from the Behavior contract and targets the incoming context. This tells the framework to execute this behavior after the incoming raw message has been deserialized and a matching message type has been found. At runtime, the pipeline will call the Invoke
method of each registered behavior passing in as arguments the current message context and an action to invoke the next behavior in the pipeline.
Each behavior is responsible to call the next step in the pipeline chain by invoking next()
.
Add a new step
To add a custom behavior to the pipeline, register it from the endpoint configuration:
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
behavior: new SampleBehavior(),
description: "Logs a warning when processing takes too long");
Behaviors can also be registered from a Feature
as shown below:
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Register(
behavior: new SampleBehavior(),
description: "Logs a warning when processing takes too long");
}
Behaviors are only created once and the same instance is reused on every invocation of the pipeline. Consequently, every behavior dependency behave as a singleton, even if a different option was specified when registering it in dependency injection. Furthermore, the behavior and all dependencies must be thread-safe. Storing state in a behavior instance should be avoided since it will cause the state to be shared across all message handling sessions. This could lead to unwanted side effects.
Replace an existing step
To replace the implementation of an existing step, substitute it with a custom behavior:
public class ReplaceExistingStep :
INeedInitialization
{
public void Customize(EndpointConfiguration endpointConfiguration)
{
var pipeline = endpointConfiguration.Pipeline;
pipeline.Replace(
stepId: "Id of the step to replace",
newBehavior: typeof(SampleBehavior),
description: "Description");
}
}
In order to replace the existing step, it is necessary to provide a step ID. The most reliable way of determining the step ID is to find the step definition in the NServiceBus source code.
Note that step IDs are hard-coded strings and may change in the future resulting in an unexpected behavior change. When replacing built-in steps, create automatic tests that will detect potential ID changes or step removal.
Steps can also be registered from a feature.
Adding or replacing behaviors is only possible before the endpoint has started.
Add or replace a step
The Register
API will throw an exception when a behavior is registered with an ID that is already present in the pipeline. On the other hand, the Replace
API will throw an exception when a behavior is registered with an ID that is not found in the pipeline.
Sometimes it's impossible to know up front what the content of the pipeline is when registering a behavior.
To ensure the creation or replacement of the behavior in the pipeline, the RegisterOrReplace
-API can be used.
public class RegisterOrReplaceStep :
INeedInitialization
{
public void Customize(EndpointConfiguration endpointConfiguration)
{
var pipeline = endpointConfiguration.Pipeline;
pipeline.RegisterOrReplace(
stepId: "StepIdThatMayOrMayNotExist",
behavior: typeof(SampleBehavior),
description: "Description");
}
}
Disable an existing step
To disable the implementation of an existing step, substitute it with a behavior with no action:
public class NoActionBehavior :
Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
//no action taken. empty behavior
await next();
}
}
The behavior does nothing and calls the next step in the pipeline chain by invoking next()
.
public class NoActionExistingStep :
INeedInitialization
{
public void Customize(EndpointConfiguration endpointConfiguration)
{
var pipeline = endpointConfiguration.Pipeline;
pipeline.Replace(
stepId: "Id of the step to replace",
newBehavior: typeof(NoActionBehavior),
description: "Description");
}
}
Exception handling
Exceptions thrown from a behavior's Invoke
method bubble up the chain. If the exception is not handled by any behavior, the message is considered faulty which results in putting the message back in the queue (and rolling back the transaction, when configured) or moving it to the error queue (depending on the endpoint's recoverability configuration).
MessageDeserializationException
If a message fails to deserialize, a MessageDeserializationException
exception is thrown by the DeserializeLogicalMessagesBehavior
behavior. In this case, the message is immediately moved to the error queue to avoid blocking the system with poison messages.
Skip serialization
When writing extensions to the pipeline it may be necessary to either take control of the serialization or to skip it entirely. One example of this is with the callbacks feature. Callbacks skip serialization for integers and enums and instead embed them in the message headers.
To skip serialization, implement a behavior that targets IOutgoingLogicalMessageContext
. For example, the following behavior skips serialization if the message is an integer, placing it in a header instead.
class SkipSerializationForInts :
Behavior<IOutgoingLogicalMessageContext>
{
public override Task Invoke(IOutgoingLogicalMessageContext context, Func<Task> next)
{
var outgoingLogicalMessage = context.Message;
if (outgoingLogicalMessage.MessageType == typeof(int))
{
var headers = context.Headers;
headers["MyCustomHeader"] = outgoingLogicalMessage.Instance.ToString();
context.SkipSerialization();
}
return next();
}
public class Registration :
RegisterStep
{
public Registration()
: base(
stepId: "SkipSerializationForInts",
behavior: typeof(SkipSerializationForInts),
description: "Skips serialization for integers")
{
}
}
}
On the receiving side, this header can then be extracted and decisions on the incoming message processing pipeline can be made based on it.
Sharing data between behaviors
Sometimes a parent behavior might need to pass information to a child behavior and vice versa. The context
parameter of a behavior's Invoke
method facilitates passing data between behaviors. The context is similar to a shared dictionary which allows adding and retrieving information from different behaviors.
Contexts are not thread-safe.
In NServiceBus version 6 and above, the context respects the stage hierarchy and only allows adding new entries in the scope of the current context. A child behavior (later in the pipeline chain) can read and even modify entries set by a parent behavior (earlier in the pipeline chain) but entries added by the child cannot be accessed from the parent.
Injecting dependencies into behaviors
public class BehaviorUsingDependencyInjection :
Behavior<IIncomingLogicalMessageContext>
{
// Dependencies injected into the constructor are singletons and cached for the lifetime
// of the endpoint
public BehaviorUsingDependencyInjection(SingletonDependency singletonDependency)
{
this.singletonDependency = singletonDependency;
}
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var scopedDependency = context.Builder.GetRequiredService<ScopedDependency>();
// do something with the scoped dependency before
await next();
// do something with the scoped dependency after
}
SingletonDependency singletonDependency;
}
Dependencies injected into the constructor of a behavior become singletons regardless of their actual scope on the dependency injection container. In order to create instances per request or scoped dependencies it is required to use the IServiceProvider
that is available as Builder
property on the context.
The service provider available via the context varies depending on the pipeline stage. Pipeline stages used within the context of an incoming message exposes a new child service provider created for each incoming message. All other use cases exposes the root service provider.
Behaviors in the outgoing pipeline stage exhibit different behaviors depending on how they are invoked. For example, when an outgoing behavior is invoked within the context of a message session the root service provider is exposed while when the same outgoing behavior is invoked within the scope of an incoming message, the child service provider for the incoming message will be used.
Accessing options from Behaviors
Settings configured on message operation options (e.g., SendOptions
, PublishOptions
, etc.) are internally stored in a dedicated ContextBag
. These settings are accessibe within the pipeline via the context.
extension:
class MyMessageHandler : IHandleMessages<MyIncomingMessage>
{
public Task Handle(MyIncomingMessage message, IMessageHandlerContext context)
{
var sendOptions = new SendOptions();
// Configure a custom setting for the outgoing message:
sendOptions.GetExtensions().Set("MySettingsKey", true);
return context.Send(new MyOutgoingMessage(), sendOptions);
}
}
class MyCustomBehavior : Behavior<IOutgoingSendContext>
{
public override Task Invoke(IOutgoingSendContext context, Func<Task> next)
{
// Retrieve the custom setting in the outgoing pipeline:
if (context.GetOperationProperties().TryGet("MySettingsKey", out bool settingValue))
{
// ...
}
return next();
}
}
Mutators versus Behaviors
Shared concepts and functionality
Both mutators and behaviors:
- Can manipulate pipeline state
- Can be executed in the incoming or outgoing pipeline
- Bubble exceptions up the pipeline and handle them by the recoverability mechanism
Differences
Note that these are relative differences. So, for example, a behavior is only "high complexity" in comparison to a mutator.
Mutator | Behavior | |
---|---|---|
Complexity to implement | Low | High |
Flexibility | Low | High |
Location in pipeline | Fixed | Flexible |
Complexity to test | Low | Medium* |
Can control nested action | No | Yes |
Affects call stack depth | No | Yes |
Can replace an existing behavior | No | Yes |
- For more information refer to the testing behaviors unit testing sample.