Getting Started

Audit Filter Pipeline Extension

Component: NServiceBus
NuGet Package: NServiceBus (8-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

This sample shows how to extend the NServiceBus message-processing pipeline with custom behaviors to add filters which prevent certain message types from being forwarded to the audit queue.

Code walk-through

The solution contains a single endpoint with auditing enabled. When it starts, the endpoint sends one AuditThisMessage message and one DoNotAuditThisMessage message to itself. Both messages are handled by message handlers; however only the AuditThisMessage message will be moved to the audit queue, and the DoNotAuditThisMessage message is filtered out.

Three behaviors are added to the message processing pipeline to implement the desired filtering logic:


This behavior adds a class to the pipeline contexts Extensions bag. This class can later be accessed by the other behaviors to share state across behaviors. The state needs to be added early in the pipeline because anything added to the Extensions after the IIncomingPhysicalMessageContext is invisible to the IAuditContext.

public class AuditFilterContextBehavior :
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
        var auditFilterContext = new AuditFilterContext
            SkipAudit = false

        return next();


The AuditRulesBehavior uses the IIncomingLogicalMessageContext to inspect the incoming message type and applies its rules to determine whether that message should be audited or not. If the message should not be audited, it retrieves the shared state from the context's Extensions and marks the message.

public class AuditRulesBehavior :
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
        if (context.Message.Instance is DoNotAuditThisMessage)
            var auditFilterContext = context.Extensions.Get<AuditFilterContext>();
            auditFilterContext.SkipAudit = true;

        return next();


This behavior is invoked for every message which is sent to the audit queue. By retrieving the shared state and checking its value, this behavior can stop the auditing pipeline by not invoking the next step.

public class AuditFilterBehavior :
    public override Task Invoke(IAuditContext context, Func<Task> next)
        if (context.Extensions.TryGet(out AuditFilterContext auditFilterContext) &&
            return Task.CompletedTask;

        return next();

The filtering logic must be registered in the pipeline:


var pipeline = endpointConfiguration.Pipeline;
    stepId: "AuditFilter.Filter",
    behavior: typeof(AuditFilterBehavior),
    description: "prevents marked messages from being forwarded to the audit queue");
    stepId: "AuditFilter.Rules",
    behavior: typeof(AuditRulesBehavior),
    description: "checks whether a message should be forwarded to the audit queue");
    stepId: "AuditFilter.Context",
    behavior: typeof(AuditFilterContextBehavior),
    description: "adds a shared state for the rules and filter behaviors");

var endpointInstance = await Endpoint.Start(endpointConfiguration)

Running the code

  • Run the solution.
  • Wait until both messages are handled by their message handlers.
  • Verify the configured audit queue (Samples.AuditFilter.Audit) does not contain the DoNotAuditThisMessage message.

Related Articles

Last modified