Each pipeline is composed of steps. A step is an identifiable value in the pipeline used to programmatically define order of execution. Each step represents a behavior which will be executed at the given place within the pipeline. Add additional behavior to the pipeline by registering a new step or replace the behavior of an existing step with the custom implementation.
The pipeline is broken down into smaller composable units called stages and connectors. A stage is a group of steps acting on the same level of abstraction.
There currently is no way to provide custom pipelines, stages, fork connectors, or stage connectors. Existing stages, fork connectors, or stage connectors can be replaced but extreme caution has to be applied, because stages, fork connectors and stage connectors define the inner workings of NServiceBus core behavior. Wrongly replaced stages or connectors could lead to message loss. The examples below are included for completeness.
Stages
The pipeline consists of three main parts: incoming, outgoing and recoverability. Both parts are composed of a number of stages.
The following lists describe some of the common stages that behaviors can be built for. Each stage has a context associated with it (which is used when implementing a custom behavior).
Data can be added to, and retrieved from, the context with the Extensions
property of type ContextBag
. Each following stage has access to data set in a previous stage but data set in a later stage is not available in a prior stage. The context bag is cloned at each stage transition and is not thread-safe.
In the diagram User Code can refer to a handler or a saga. If the handler or saga sends a message, publishes an event, or replies to a message, then the details from the incoming message will be added to the outgoing context.
Incoming pipeline stages
- Transport Receive: Behaviors in this stage are responsible for signaling failure or success to the transport. In this stage no outgoing operations are supported. This stage provides
ITransportReceiveContext
to its behaviors. - Incoming Physical Message: Behaviors on this stage have access to the raw message body before it is deserialized. This stage provides
IIncomingPhysicalMessageContext
to its behaviors. - Incoming Logical Message: This stage provides information about the received message type and its deserialized instance. It provides
IIncomingLogicalMessageContext
to its behaviors. - Invoke Handlers: Each received message can be handled by multiple handlers. This stage will be executed once for every associated handler and provides
IInvokeHandlerContext
to the its behaviors.
The connection between the Incoming Physical Message stage and the audit stages is an example of a fork. The message will flow down the main path and then down the fork path. The fork paths are only followed if the audit feature has been enabled.
Outgoing Pipeline Stages
- Operation specific processing: There is a dedicated stage for each context operation (e.g. Send, Publish, Subscribe, ...). Behaviors can use one of the following contexts:
IOutgoingSendContext
,IOutgoingPublishContext
,IOutgoingReplyContext
,ISubscribeContext
,IUnsubscribeContext
. Subscribe and Unsubscribe are not shown on the diagram below. - Outgoing Logical Message: Behaviors on this stage have access to the message which should be sent. Use
IOutgoingLogicalMessageContext
in a behavior to enlist in this stage. - Outgoing Physical Message: Enables to access the serialized message. This stage provides an
IOutgoingPhysicalMessageContext
instance to its behaviors. - Routing: Allows the selected routing strategies for outgoing messages to be manipulated. If the outgoing pipeline was initiated by the incoming pipeline, this stage collects all outgoing operations (except for immediate dispatch messages). This stage provides an
IRoutingContext
instance to its behaviors. - Batch Dispatch: Forwards all collected outgoing operations to the dispatch stage once message processing has been completed. This stage provides access to the collection of transport operations that are to be dispatched. This stage provides an
IBatchDispatchContext
instance to its behaviors. - Dispatch: Provides access to outgoing dispatch operations before they are handed off to the transport. This stage provides an
IDispatchContext
instance to its behaviors.
Recoverability pipeline stages
- Recoverability: Behaviors on this stage have access the raw message that failed. This stage provides
IRecoverabilityContext
to its behaviors, which exposes the endpoint's recoverability settings, the recoverability action (e.g.MoveToErrorQueue
), as well as theErrorContext
and metadata. - Routing: Provides access to the routing strategies that have been selected for the failed message. This stage provides
IRoutingContext
to its behaviors. - Dispatch: Provides access to outgoing dispatch operations before they are handed off to the transport. This stage provides
IDispatchContext
to its behaviors.
Optional pipeline stages
Behaviors in the audit stage have access to the message to be audited/sent to the audit queue and audit address. Behaviors should use IAuditContext
to enlist in this stage. This stage is only entered if message auditing is enabled.
Extension bag
Pipeline contexts have an extension bag which can be used to create, read, update or delete custom state with a key identifier. For example, this can be used to set metadata- or pipeline-specific state in an incoming behavior that can be used in later pipeline stages if needed. State stored via the extension bag will not be available once the extension bag runs out of scope at the end of the pipeline.
State set during a forked pipeline will not be available to the forking pipeline. For example, state changes during the outgoing pipeline will not be available in the incoming pipeline. If state has to be propagated, have the forking pipeline set a context object that the forked pipeline later can get and modify as needed.
public class SetContextBehavior :
Behavior<IIncomingPhysicalMessageContext>
{
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
// set the state
context.Extensions.Set(new SharedData());
await next();
}
}
public class GetContextBehavior :
Behavior<IOutgoingPhysicalMessageContext>
{
public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
{
if (context.Extensions.TryGet<SharedData>(out var state))
{
// work with the state
}
await next();
}
}
Stage connectors
Stage connectors connect from the current stage (e.g. IOutgoingLogicalMessageContext
) to another stage (e.g. IOutgoingPhysicalMessageContext
). In order to override an existing stage, inherit from StageConnector
and then replace an existing stage connector. Most pipeline extensions can be done by inheriting from Behavior
. It is rarely necessary to replace existing stage connectors. When implementing a stage connector, ensure that all required data is passed along for the next stage.
public class CustomStageConnector :
StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
public override Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
{
// Finalize the work in the current stage
byte[] body = { };
RoutingStrategy[] routingStrategies = { };
// Start the next stage
return stage(this.CreateOutgoingPhysicalMessageContext(body, routingStrategies, context));
}
}
public class FeatureReplacingExistingStage :
Feature
{
internal FeatureReplacingExistingStage()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Replace("SerializeMessageConnector", new CustomStageConnector());
}
}
Fork connectors
Fork connectors fork from a current stage (e.g. IIncomingPhysicalMessageContext
) to another independent pipeline (e.g. IAuditContext
). In order to override an existing fork connector inherit from ForkConnector
and then replace an existing fork connector.
public class CustomForkConnector :
ForkConnector<IIncomingPhysicalMessageContext, IAuditContext>
{
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next, Func<IAuditContext, Task> fork)
{
// Finalize the work in the current stage
await next();
OutgoingMessage message = null;
var auditAddress = "AuditAddress";
// Fork into new pipeline
await fork(this.CreateAuditContext(message, auditAddress, TimeSpan.FromHours(1), context));
}
}
public class FeatureReplacingExistingForkConnector :
Feature
{
internal FeatureReplacingExistingForkConnector()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Replace("AuditProcessedMessage", new CustomForkConnector());
}
}
Stage Fork Connector
Stage fork connectors are essentially a combination of a stage connector and a fork connector. They have the ability to connect from the current stage (e.g. ITransportReceiveContext
) to another stage (e.g. IIncomingPhysicalMessageContext
) and fork to another independent pipeline (e.g. IBatchedDispatchContext
). In order to override an existing stage fork connector inherit from StageForkConnector
and then replace an existing stage fork connector.
public class CustomStageForkConnector :
StageForkConnector<ITransportReceiveContext,
IIncomingPhysicalMessageContext,
IBatchDispatchContext>
{
public override async Task Invoke(ITransportReceiveContext context, Func<IIncomingPhysicalMessageContext, Task> stage, Func<IBatchDispatchContext, Task> fork)
{
// Finalize the work in the current stage
var physicalMessageContext = this.CreateIncomingPhysicalMessageContext(context.Message, context);
// Start the next stage
await stage(physicalMessageContext);
TransportOperation[] operations =
{
};
var batchDispatchContext = this.CreateBatchDispatchContext(operations, physicalMessageContext);
// Fork into new pipeline
await fork(batchDispatchContext);
}
}
public class FeatureReplacingExistingStageForkConnector :
Feature
{
internal FeatureReplacingExistingStageForkConnector()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
var pipeline = context.Pipeline;
pipeline.Replace("TransportReceiveToPhysicalMessageProcessingConnector", new CustomStageForkConnector());
}
}