Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Steps, Stages and Connectors

Component: NServiceBus
NuGet Package: NServiceBus (7.x)

Each pipeline is composed of steps. A step is an identifiable value in the pipeline used to programmatically define order of execution. Each step represents a behavior which will be executed at the given place within the pipeline. Add additional behavior to the pipeline by registering a new step or replace the behavior of an existing step with the custom implementation.

The pipeline is broken down into smaller composable units called stages and connectors. A stage is a group of steps acting on the same level of abstraction.

Stages

The pipeline consists of two main parts: incoming and outgoing. Both parts are composed of a number of stages.

The following lists describe some of the common stages that behaviors can be built for. Each stage has a context associated with it which is used when implementing a custom behavior.

In the diagram User Code can refer to a handler or a saga. If the handler or saga sends a message, publishes an event, or replies to a message, the details from the incoming message will be added to the outgoing context.

Incoming pipeline stages

  • Transport Receive: Behaviors in this stage are responsible for signaling failure or success to the transport. In this stage no outgoing operations are supported. This stage provides ITransportReceiveContext to its behaviors.
  • Incoming Physical Message: Behaviors on this stage have access to the raw message body before it is deserialized. This stage provides IIncomingPhysicalMessageContext to its behaviors.
  • Incoming Logical Message: This stage provides information about the received message type and its deserialized instance. It provides IIncomingLogicalMessageContext to its behaviors.
  • Invoke Handlers: Each received message can be handled by multiple handlers. This stage will be executed once for every associated handler and provides IInvokeHandlerContext to the its behaviors.

The connection between the incoming physical message stage and the Forwarding/Audit stages is an example of a fork. In both cases, the message will flow down the main path and then down the fork path. The fork paths are only followed if the corresponding feature (auditing, message forwarding) has been enabled.

graph LR Transport((Transport)) subgraph Incoming Pipeline TR[Transport<br>Receive] IH[Invoke<br>Handler] ILM[Incoming<br>Logical<br>Message] IPM[Incoming<br>Physical<br>Message] RUC((Receiving User Code)) subgraph Ancillary Actions Forwarding[Forwarding] Audit[Audit] Routing Dispatch end end AncillaryTransport[Transport] Transport -- onMessage --> TR IPM -- Step 1 --> ILM IPM -. Step 2 .-> Forwarding IPM -. Step 2 .-> Audit ILM --> IH TR --> IPM IH --> RUC Dispatch --> AncillaryTransport Audit --> Routing Forwarding --> Routing Routing --> Dispatch click IH "/nservicebus/handlers-and-sagas" click Transport "/transports/" click Audit "/nservicebus/operations/auditing" click Forwarding "/nservicebus/messaging/forwarding"

Outgoing Pipeline Stages

  • Operation specific processing: There is a dedicated stage for each context operation (e.g. Send, Publish, Subscribe, ...). Behaviors can use one of the following contexts: IOutgoingSendContext, IOutgoingPublishContext, IOutgoingReplyContext, ISubscribeContext, IUnsubscribeContext. Subscribe and Unsubscribe are not shown on the diagram below.
  • Outgoing Logical Message: Behaviors on this stage have access to the message which should be sent. Use IOutgoingLogicalMessageContext in a behavior to enlist in this stage.
  • Outgoing Physical Message: Enables to access the serialized message. This stage provides an IOutgoingPhysicalMessageContext instance to its behaviors.
  • Routing: Allows the selected routing strategies for outgoing messages to be manipulated. If the outgoing pipeline was initiated by the incoming pipeline, this stage collects all outgoing operations (except for immediate dispatch messages). This stage provides an IRoutingContext instance to its behaviors.
  • Batch Dispatch: Forwards all collected outgoing operations to the dispatch stage once message processing has been completed. This stage provides access to the collection of transport operations that are to be dispatched. This stage provides an IBatchDispatchContext instance to its behaviors.
  • Dispatch: Provides access to outgoing dispatch operations before they are handed off to the transport. This stage provides an IDispatchContext instance to its behaviors.
graph LR UC((Initiating User Code)) subgraph Outgoing Pipeline Outgoing{Outgoing} OP[Outgoing<br>Publish] OS[Outgoing<br>Send] OR[Outgoing<br>Reply] OLM[Outgoing<br>Logical<br>Message] OPM[Outgoing<br>Physical<br>Message] Routing[Routing] end subgraph Dispatch Phase Dispatch{Dispatch} Transport((Transport)) BD[Batch<br>Dispatch] ID[Immediate<br>Dispatch] end UC --> Outgoing Outgoing --> OP Outgoing --> OS Outgoing --> OR OP --> OLM OS --> OLM OR --> OLM OLM --> OPM OPM --> Routing Routing --> Dispatch ID--> Transport Dispatch --> BD Dispatch --> ID BD --> Transport

Optional Pipeline Stages

  • Audit: Behaviors in the audit stage have access to the message to be audited/sent to the audit queue and audit address. Behaviors should use IAuditContext to enlist in this stage. This stage is only entered if Message Auditing is enabled.
  • Forwarding: Behaviors in the forwarding stage have access to the message to be sent to the forwarding queue and the address of the forwarding queue. Behaviors should use IForwardingContext to enlist in this stage. This stage is only entered if message forwarding is enabled.

Extension bag

Pipeline contexts have an extension bag which can be used to create, read, update or delete custom state with a key identifier. For example, this can be used to set metadata- or pipeline-specific state in an incoming behavior that can be used in later pipeline stages if needed. State stored via the extension bag will not be available once the extension bag runs out of scope at the end of the pipeline.

State set during a forked pipeline will not be available to the forking pipeline. For example, state changes during the outgoing pipeline will not be available in the incoming pipeline. If state has to be propagated, have the forking pipeline set a context object that the forked pipeline later can get and modify as needed.

public class SetContextBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        // set the state
        context.Extensions.Set(new SharedData());

        await next();
    }
}

public class GetContextBehavior :
    Behavior<IOutgoingPhysicalMessageContext>
{
    public override async Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {

        if (context.Extensions.TryGet<SharedData>(out var state))
        {
            // work with the state
        }

        await next();
    }
}

Stage connectors

Stage connectors connect from the current stage (e.g. IOutgoingLogicalMessageContext) to another stage (e.g. IOutgoingPhysicalMessageContext). In order to override an existing stage, inherit from StageConnector<TFromContext, TToContext> and then replace an existing stage connector. Most pipeline extensions can be done by inheriting from Behavior<TContext>. It is rarely necessary to replace existing stage connectors. When implementing a stage connector, ensure that all required data is passed along for the next stage.

public class CustomStageConnector :
    StageConnector<IOutgoingLogicalMessageContext, IOutgoingPhysicalMessageContext>
{
    public override Task Invoke(IOutgoingLogicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> stage)
    {
        // Finalize the work in the current stage

        byte[] body = { };
        RoutingStrategy[] routingStrategies = { };

        // Start the next stage
        return stage(this.CreateOutgoingPhysicalMessageContext(body, routingStrategies, context));
    }
}

public class FeatureReplacingExistingStage :
    Feature
{
    internal FeatureReplacingExistingStage()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("SerializeMessageConnector", new CustomStageConnector());
    }
}

Fork connectors

graph LR subgraph Root pipeline A[TFromContext] --- B{Fork Connector} B --- C[TFromContext] end subgraph Forked pipeline B --> D[TForkContext] end

Fork connectors fork from a current stage (e.g. IIncomingPhysicalMessageContext) to another independent pipeline (e.g. IAuditContext). In order to override an existing fork connector inherit from ForkConnector<TFromContext, TForkContext> and then replace an existing fork connector.

public class CustomForkConnector :
    ForkConnector<IIncomingPhysicalMessageContext, IAuditContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next, Func<IAuditContext, Task> fork)
    {
        // Finalize the work in the current stage
        await next();

        OutgoingMessage message = null;
        var auditAddress = "AuditAddress";

        // Fork into new pipeline
        await fork(this.CreateAuditContext(message, auditAddress, context));
    }
}

public class FeatureReplacingExistingForkConnector :
    Feature
{
    internal FeatureReplacingExistingForkConnector()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("AuditProcessedMessage", new CustomForkConnector());
    }
}

Stage Fork Connector

graph LR subgraph Root stage A[TFromContext] --- B{StageFork<br/>Connector} B --- C[TToContext] end subgraph Forked pipeline B --> D[TForkContext] end

Stage fork connectors are essentially a combination of a stage connector and a fork connector. They have the ability to connect from the current stage (e.g. ITransportReceiveContext) to another stage (e.g. IIncomingPhysicalMessageContext) and fork to another independent pipeline (e.g. IBatchedDispatchContext). In order to override an existing stage fork connector inherit from StageForkConnector<TFromContext, TToContext, TForkContext and then replace an existing stage fork connector.

public class CustomStageForkConnector :
    StageForkConnector<ITransportReceiveContext,
        IIncomingPhysicalMessageContext,
        IBatchDispatchContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<IIncomingPhysicalMessageContext, Task> stage, Func<IBatchDispatchContext, Task> fork)
    {
        // Finalize the work in the current stage
        var physicalMessageContext = this.CreateIncomingPhysicalMessageContext(context.Message, context);

        // Start the next stage
        await stage(physicalMessageContext);

        TransportOperation[] operations =
        {
        };
        var batchDispatchContext = this.CreateBatchDispatchContext(operations, physicalMessageContext);

        // Fork into new pipeline
        await fork(batchDispatchContext);
    }
}

public class FeatureReplacingExistingStageForkConnector :
    Feature
{
    internal FeatureReplacingExistingStageForkConnector()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Replace("TransportReceiveToPhysicalMessageProcessingConnector", new CustomStageForkConnector());
    }
}

Related Articles