Audit filter pipeline extension

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

Introduction

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add filters which prevent certain message types from being forwarded to the audit queue.

Code Walk Through

The solution contains a single endpoint with auditing enabled. The endpoint sends one AuditThisMessage and one DoNotAuditThisMessage to itself on start up. Both messages are handled by message handlers however only the AuditThisMessage will be moved to the audit queue, and the DoNotAuditThisMessage is filtered out.

Three behaviors are added to the message processing pipeline to implement the desired filtering logic:

AuditFilterContextBehavior

This behavior adds a class to the pipeline contexts Extensions bag. This class can later be accessed by the other behaviors to share state across behaviors. The state needs to be added early in the pipeline because anything added to the Extensions after the IIncomingPhysicalMessageContext is invisible to the IAuditContext.

Edit
public class AuditFilterContextBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var auditFilterContext = new AuditFilterContext
        {
            SkipAudit = false
        };
        context.Extensions.Set(auditFilterContext);

        return next();
    }
}

AuditRulesBehavior

The AuditRulesBehavior uses the IIncomingLogicalMessageContext to inspect the incoming message type and applies its rules to determine whether that message should be audited or not. If the message should not be audited, it retrieves the shared state from the context's Extensions and marks the message.

Edit
public class AuditRulesBehavior :
    Behavior<IIncomingLogicalMessageContext>
{
    public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.MessageType == typeof(DoNotAuditThisMessage))
        {
            var auditFilterContext = context.Extensions.Get<AuditFilterContext>();
            auditFilterContext.SkipAudit = true;
        }

        return next();
    }
}

AuditFilterBehavior

This behavior is invoked for every message which is sent to the audit queue. By retrieving the shared state and checking its value, this behavior can stop the auditing pipeline by not invoking the next step.

Edit
public class AuditFilterBehavior :
    Behavior<IAuditContext>
{
    public override Task Invoke(IAuditContext context, Func<Task> next)
    {
        AuditFilterContext auditFilterContext;
        if (context.Extensions.TryGet(out auditFilterContext) &&
            auditFilterContext.SkipAudit)
        {
            return Task.CompletedTask;
        }

        return next();
    }
}

The filtering logic then needs to be registered in the pipeline:

Edit
endpointConfiguration.AuditProcessedMessagesTo("audit");

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
    stepId: "AuditFilter.Filter",
    behavior: typeof(AuditFilterBehavior),
    description: "prevents marked messages from being forwarded to the audit queue");
pipeline.Register(
    stepId: "AuditFilter.Rules",
    behavior: typeof(AuditRulesBehavior),
    description: "checks whether a message should be forwarded to the audit queue");
pipeline.Register(
    stepId: "AuditFilter.Context",
    behavior: typeof(AuditFilterContextBehavior),
    description: "adds a shared state for the rules and filter behaviors");

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);

Running the Code

  • Run the solution.
  • Wait until both messages are handled by their message handlers.
  • Verify the configured audit queue (Samples.AuditFilter.Audit) does not contain the DoNotAuditThisMessage.

Related Articles


Last modified