Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Steps, Stages and Connectors

Component: NServiceBus
NuGet Package: NServiceBus (8.1)

Each pipeline is composed of steps. A step is an identifiable value in the pipeline used to programmatically define order of execution. Each step represents a behavior which will be executed at the given place within the pipeline. Add additional behavior to the pipeline by registering a new step or replace the behavior of an existing step with the custom implementation.

The pipeline is broken down into smaller composable units called stages and connectors. A stage is a group of steps acting on the same level of abstraction.

There currently is no way to provide custom pipelines, stages, fork connectors, or stage connectors. Existing stages, fork connectors, or stage connectors can be replaced but extreme caution has to be applied, because stages, fork connectors and stage connectors define the inner workings of NServiceBus core behavior. Wrongly replaced stages or connectors could lead to message loss. The examples below are included for completeness.

Stages

The pipeline consists of three main parts: incoming, outgoing and recoverability. Both parts are composed of a number of stages.

The following lists describe some of the common stages that behaviors can be built for. Each stage has a context associated with it (which is used when implementing a custom behavior).

Data can be added to, and retrieved from, the context with the Extensions property of type ContextBag. Each following stage has access to data set in a previous stage but data set in a later stage is not available in a prior stage. The context bag is cloned at each stage transition and is not thread-safe.

In the diagram User Code can refer to a handler or a saga. If the handler or saga sends a message, publishes an event, or replies to a message, then the details from the incoming message will be added to the outgoing context.

Incoming pipeline stages

  • Transport Receive: Behaviors in this stage are responsible for signaling failure or success to the transport. In this stage no outgoing operations are supported. This stage provides ITransportReceiveContext to its behaviors.
  • Incoming Physical Message: Behaviors on this stage have access to the raw message body before it is deserialized. This stage provides IIncomingPhysicalMessageContext to its behaviors.
  • Incoming Logical Message: This stage provides information about the received message type and its deserialized instance. It provides IIncomingLogicalMessageContext to its behaviors.
  • Invoke Handlers: Each received message can be handled by multiple handlers. This stage will be executed once for every associated handler and provides IInvokeHandlerContext to the its behaviors.

The connection between the Incoming Physical Message stage and the audit stages is an example of a fork. The message will flow down the main path and then down the fork path. The fork paths are only followed if the audit feature has been enabled.

graph LR Transport((Transport)) subgraph Incoming Pipeline TR[Transport<br>Receive] IH[Invoke<br>Handler] ILM[Incoming<br>Logical<br>Message] IPM[Incoming<br>Physical<br>Message] RUC((Receiving User Code)) subgraph Ancillary Actions Audit[Audit] Routing Dispatch end end AncillaryTransport[Transport] Transport -- onMessage --> TR IPM -- Step 1 --> ILM IPM -. Step 2 .-> Audit ILM --> IH TR --> IPM IH --> RUC Dispatch --> AncillaryTransport Audit --> Routing Routing --> Dispatch click IH "/nservicebus/handlers-and-sagas" click Transport "/transports/" click Audit "/nservicebus/operations/auditing"

Outgoing Pipeline Stages

  • Operation specific processing: There is a dedicated stage for each context operation (e.g. Send, Publish, Subscribe, ...). Behaviors can use one of the following contexts: IOutgoingSendContext, IOutgoingPublishContext, IOutgoingReplyContext, ISubscribeContext, IUnsubscribeContext. Subscribe and Unsubscribe are not shown on the diagram below.
  • Outgoing Logical Message: Behaviors on this stage have access to the message which should be sent. Use IOutgoingLogicalMessageContext in a behavior to enlist in this stage.
  • Outgoing Physical Message: Enables to access the serialized message. This stage provides an IOutgoingPhysicalMessageContext instance to its behaviors.
  • Routing: Allows the selected routing strategies for outgoing messages to be manipulated. If the outgoing pipeline was initiated by the incoming pipeline, this stage collects all outgoing operations (except for immediate dispatch messages). This stage provides an IRoutingContext instance to its behaviors.
  • Batch Dispatch: Forwards all collected outgoing operations to the dispatch stage once message processing has been completed. This stage provides access to the collection of transport operations that are to be dispatched. This stage provides an IBatchDispatchContext instance to its behaviors.
  • Dispatch: Provides access to outgoing dispatch operations before they are handed off to the transport. This stage provides an IDispatchContext instance to its behaviors.
graph LR UC((Initiating User Code)) subgraph Outgoing Pipeline Outgoing{Outgoing} OP[Outgoing<br>Publish] OS[Outgoing<br>Send] OR[Outgoing<br>Reply] OLM[Outgoing<br>Logical<br>Message] OPM[Outgoing<br>Physical<br>Message] Routing[Routing] end subgraph Dispatch Phase Dispatch{Dispatch} Transport((Transport)) BD[Batch<br>Dispatch] ID[Immediate<br>Dispatch] end UC --> Outgoing Outgoing --> OP Outgoing --> OS Outgoing --> OR OP --> OLM OS --> OLM OR --> OLM OLM --> OPM OPM --> Routing Routing --> Dispatch ID--> Transport Dispatch --> BD Dispatch --> ID BD --> Transport

Recoverability pipeline stages

  • Recoverability: Behaviors on this stage have access the raw message that failed. This stage provides IRecoverabilityContext to its behaviors, which exposes the endpoint's recoverability settings, the recoverability action (e.g. MoveToErrorQueue), as well as the ErrorContext and metadata.
  • Routing: Provides access to the routing strategies that have been selected for the failed message. This stage provides IRoutingContext to its behaviors.
  • Dispatch: Provides access to outgoing dispatch operations before they are handed off to the transport. This stage provides IDispatchContext to its behaviors.
graph LR Transport((Transport)) Transport2((Transport)) subgraph Recoverability Pipeline TR[Recoverability] RR[Routing] Dispatch[Immediate<br>Dispatch] end Transport -- onError --> TR TR -. 0-to-n .-> RR RR --> Dispatch Dispatch --> Transport2 click RR "/nservicebus/recoverability" click Transport "/transports/"

Optional pipeline stages

Behaviors in the audit stage have access to the message to be audited/sent to the audit queue and audit address. Behaviors should use IAuditContext to enlist in this stage. This stage is only entered if message auditing is enabled.

Extension bag

Pipeline contexts have an extension bag which can be used to create, read, update or delete custom state with a key identifier. For example, this can be used to set metadata- or pipeline-specific state in an incoming behavior that can be used in later pipeline stages if needed. State stored via the extension bag will not be available once the extension bag runs out of scope at the end of the pipeline.

State set during a forked pipeline will not be available to the forking pipeline. For example, state changes during the outgoing pipeline will not be available in the incoming pipeline. If state has to be propagated, have the forking pipeline set a context object that the forked pipeline later can get and modify as needed.

public class SetContextBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        // set the state
        context.Extensions.Set(new SharedData());

        await next();
    }
}

public class GetContextBehavior :
    Behavior<IOutgoingPhysicalMessageContext>
{
    public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {

        if (context.Extensions.TryGet<SharedData>(out var state))
        {
            // work with the state
        }

        await next();
    }
}

Stage connectors

Stage connectors connect from the current stage (e.g. IOutgoingLogicalMessageContext) to another stage (e.g. IOutgoingPhysicalMessageContext). In order to override an existing stage, inherit from StageConnector<TFromContext, TToContext> and then replace an existing stage connector. Most pipeline extensions can be done by inheriting from Behavior<TContext>. It is rarely necessary to replace existing stage connectors. When implementing a stage connector, ensure that all required data is passed along for the next stage.

public class CustomStageConnector :
    StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
    public override Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
    {
        // Finalize the work in the current stage

        byte[] body = { };
        RoutingStrategy[] routingStrategies = { };

        // Start the next stage
        return stage(this.CreateOutgoingPhysicalMessageContext(body, routingStrategies, context));
    }
}

public class FeatureReplacingExistingStage :
    Feature
{
    internal FeatureReplacingExistingStage()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("NServiceBus.SerializeMessageConnector", new CustomStageConnector());
    }
}

Fork connectors

graph LR subgraph Root pipeline A[TFromContext] --- B{Fork Connector} B --- C[TFromContext] end subgraph Forked pipeline B --> D[TForkContext] end

Fork connectors fork from a current stage (e.g. IIncomingPhysicalMessageContext) to another independent pipeline (e.g. IAuditContext). In order to override an existing fork connector inherit from ForkConnector<TFromContext, TForkContext> and then replace an existing fork connector.

public class CustomForkConnector :
    ForkConnector<IIncomingPhysicalMessageContext, IAuditContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next, Func<IAuditContext, Task> fork)
    {
        // Finalize the work in the current stage
        await next();

        OutgoingMessage message = null;
        var auditAddress = "AuditAddress";

        // Fork into new pipeline
        await fork(this.CreateAuditContext(message, auditAddress, TimeSpan.FromHours(1), context));
    }
}

public class FeatureReplacingExistingForkConnector :
    Feature
{
    internal FeatureReplacingExistingForkConnector()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("AuditProcessedMessage", new CustomForkConnector());
    }
}

Stage Fork Connector

graph LR subgraph Root stage A[TFromContext] --- B{StageFork<br/>Connector} B --- C[TToContext] end subgraph Forked pipeline B --> D[TForkContext] end

Stage fork connectors are essentially a combination of a stage connector and a fork connector. They have the ability to connect from the current stage (e.g. ITransportReceiveContext) to another stage (e.g. IIncomingPhysicalMessageContext) and fork to another independent pipeline (e.g. IBatchedDispatchContext). In order to override an existing stage fork connector inherit from StageForkConnector<TFromContext, TToContext, TForkContext and then replace an existing stage fork connector.

public class CustomStageForkConnector :
    StageForkConnector<ITransportReceiveContext,
        IIncomingPhysicalMessageContext,
        IBatchDispatchContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<IIncomingPhysicalMessageContext, Task> stage, Func<IBatchDispatchContext, Task> fork)
    {
        // Finalize the work in the current stage
        var physicalMessageContext = this.CreateIncomingPhysicalMessageContext(context.Message, context);

        // Start the next stage
        await stage(physicalMessageContext);

        TransportOperation[] operations =
        {
        };
        var batchDispatchContext = this.CreateBatchDispatchContext(operations, physicalMessageContext);

        // Fork into new pipeline
        await fork(batchDispatchContext);
    }
}

public class FeatureReplacingExistingStageForkConnector :
    Feature
{
    internal FeatureReplacingExistingStageForkConnector()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("TransportReceiveToPhysicalMessageProcessingConnector", new CustomStageForkConnector());
    }
}

Related Articles


Last modified