Audit filter pipeline extension

Component: NServiceBus | Nuget: NServiceBus (Version: 5.x)

Introduction

This sample shows how to extend the NServiceBus message processing pipeline with custom behaviors to add filters which prevent certain message types from being forwarded to the audit queue.

Code Walk Through

The solution contains a single endpoint with auditing enabled. The endpoint sends one AuditThisMessage and one DoNotAuditThisMessage to itself on start up. Both messages are handled by message handlers however only the AuditThisMessage will be moved to the audit queue, and the DoNotAuditThisMessage is filtered out.

To implement the desired filtering logic, a custom audit behavior is required:

Edit
public class AuditRulesBehavior :
    IBehavior<IncomingContext>
{
    IAuditMessages messageAuditer;

    public AuditRulesBehavior(IAuditMessages messageAuditer)
    {
        this.messageAuditer = messageAuditer;
    }

    public void Invoke(IncomingContext context, Action next)
    {
        next();
        var sendOptions = new SendOptions("audit");

        //set audit related headers
        var headers = context.PhysicalMessage.Headers;

        var processingStarted = context.Get<DateTime>("IncomingMessage.ProcessingStarted");
        headers[Headers.ProcessingStarted] = DateTimeExtensions.ToWireFormattedString(processingStarted);

        var processingEnded = context.Get<DateTime>("IncomingMessage.ProcessingEnded");
        headers[Headers.ProcessingEnded] = DateTimeExtensions.ToWireFormattedString(processingEnded);

        // Don't audit control messages
        if (headers.ContainsKey(Headers.ControlMessageHeader))
        {
            return;
        }

        // Do not audit messages of type DoNotAuditThisMessage. 
        if (context.IncomingLogicalMessage.MessageType != typeof(DoNotAuditThisMessage))
        {
            messageAuditer.Audit(sendOptions, context.PhysicalMessage);
        }
    }
}
Since the behavior is going to replace the existing audit behavior, it is necessary to ensure the audit logic is handled completely by the new behavior. This is done by copying the logic from the core's AuditBehavior to the custom behavior.

The filtering logic then needs to be registered in the pipeline:

Edit
class InitializeNewAuditBehavior : 
    INeedInitialization
{
    public void Customize(BusConfiguration configuration)
    {
        // Replace the existing auditing step with our new one in the Pipeline
        configuration.Pipeline.Replace(
            wellKnownStep: WellKnownStep.AuditProcessedMessage,
            newBehavior: typeof(AuditRulesBehavior),
            description: "Replaces existing audit behavior with the new behavior that filters certain message types.");
    }
}

Running the Code

  • Run the solution.
  • Wait until both messages are handled by their message handlers.
  • Verify the configured audit queue (Samples.AuditFilter.Audit) does not contain the DoNotAuditThisMessage.

Related Articles


Last modified