Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Audit Filter Pipeline Extension

Component: NServiceBus
NuGet Package: NServiceBus (8.x)

This sample demonstrates how to extend the NServiceBus message-processing pipeline with custom behaviors to add filters which prevent certain message types from being forwarded to the audit queue.

Code walk-through

The solution contains a single endpoint with auditing enabled. When it starts, the endpoint sends one AuditThisMessage message and one DoNotAuditThisMessage message to itself. Both messages are handled by message handlers. However only the AuditThisMessage message will be moved to the audit queue, and the DoNotAuditThisMessage message is filtered out.

Three behaviors are added to the message processing pipeline to implement the desired filtering logic:

AuditFilterContextBehavior

This behavior adds a class to the pipeline contexts Extensions bag. This class can later be accessed by the other behaviors to share state across behaviors. The state must be added early in the pipeline because anything added to the Extensions after IIncomingPhysicalMessageContext is invisible to the IAuditContext instance.

public class AuditFilterContextBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var auditFilterContext = new AuditFilterContext
        {
            SkipAudit = false
        };
        context.Extensions.Set(auditFilterContext);

        return next();
    }
}

AuditRulesBehavior

AuditRulesBehavior uses the IIncomingLogicalMessageContext to inspect the incoming message type and it applies rules to determine whether that message should be audited or not. If the message should not be audited, it retrieves the shared state from the context's Extensions and marks the message.

public class AuditRulesBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is DoNotAuditThisMessage)
        {
            var auditFilterContext = context.Extensions.Get<AuditFilterContext>();
            auditFilterContext.SkipAudit = true;
        }

        return next();
    }
}

AuditFilterBehavior

This behavior is invoked for every message which is sent to the audit queue. By retrieving the shared state and checking its value, this behavior can stop the auditing pipeline by not invoking the next step.

public class AuditFilterBehavior :
    Behavior<IAuditContext>
{
    public override Task Invoke(IAuditContext context, Func<Task> next)
    {
        if (context.Extensions.TryGet(out AuditFilterContext auditFilterContext) &&
            auditFilterContext.SkipAudit)
        {
            return Task.CompletedTask;
        }

        return next();
    }
}

The filtering logic must be registered in the pipeline:

endpointConfiguration.AuditProcessedMessagesTo("audit");

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
    stepId: "AuditFilter.Filter",
    behavior: typeof(AuditFilterBehavior),
    description: "prevents marked messages from being forwarded to the audit queue");
pipeline.Register(
    stepId: "AuditFilter.Rules",
    behavior: typeof(AuditRulesBehavior),
    description: "checks whether a message should be forwarded to the audit queue");
pipeline.Register(
    stepId: "AuditFilter.Context",
    behavior: typeof(AuditFilterContextBehavior),
    description: "adds a shared state for the rules and filter behaviors");

var endpointInstance = await Endpoint.Start(endpointConfiguration);

Running the code

  • Run the solution.
  • Wait until both messages are handled by their message handlers.
  • Verify the configured audit queue (Samples.AuditFilter.Audit) does not contain the DoNotAuditThisMessage message.

Related Articles