This sample demonstrates how to extend the NServiceBus message processing pipeline with custom behaviors to add filters which prevent certain message types from being forwarded to the audit queue.
Code walk-through
The solution contains a single endpoint with auditing enabled. When it starts, the endpoint sends one AuditThisMessage message and one DoNotAuditThisMessage message to itself. Both messages are handled by message handlers. However only the AuditThisMessage message will be sent to the audit queue. The DoNotAuditThisMessage message is filtered out and not sent.
Three behaviors are added to the message processing pipeline to implement the desired filtering logic:
AuditFilterContextBehavior
This behavior adds a class to the pipeline context's Extensions bag. This class can later be accessed by the other behaviors to share state across behaviors. The state must be added early in the pipeline because anything added to Extensions after IIncomingPhysicalMessageContext is invisible to the IAuditContext instance.
public class AuditFilterContextBehavior : Behavior<IIncomingPhysicalMessageContext>
{
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var auditFilterContext = new AuditFilterContext
{
SkipAudit = false
};
context.Extensions.Set(auditFilterContext);
return next();
}
}
AuditRulesBehavior
AuditRulesBehavior uses the IIncomingLogicalMessageContext to inspect the incoming message type and it applies rules to determine whether that message should be audited or not. If the message should not be audited, it retrieves the shared state from the context's Extensions and marks the message.
public class AuditRulesBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is DoNotAuditThisMessage)
{
var auditFilterContext = context.Extensions.Get<AuditFilterContext>();
auditFilterContext.SkipAudit = true;
}
return next();
}
}
AuditFilterBehavior
This behavior is invoked for every message which is sent to the audit queue. By retrieving the shared state and checking its value, this behavior can stop the auditing pipeline by not invoking the next step.
public class AuditFilterBehavior : Behavior<IAuditContext>
{
public override Task Invoke(IAuditContext context, Func<Task> next)
{
if (context.Extensions.TryGet(out AuditFilterContext auditFilterContext) && auditFilterContext.SkipAudit)
{
return Task.CompletedTask;
}
return next();
}
}
The filtering logic must be registered in the pipeline:
endpointConfiguration.AuditProcessedMessagesTo("audit");
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
stepId: "AuditFilter.Filter",
behavior: typeof(AuditFilterBehavior),
description: "prevents marked messages from being forwarded to the audit queue");
pipeline.Register(
stepId: "AuditFilter.Rules",
behavior: typeof(AuditRulesBehavior),
description: "checks whether a message should be forwarded to the audit queue");
pipeline.Register(
stepId: "AuditFilter.Context",
behavior: typeof(AuditFilterContextBehavior),
description: "adds a shared state for the rules and filter behaviors");
Running the code
- Run the solution.
- Wait until both messages are handled by their message handlers.
- Verify the configured audit queue (named
audit) does not contain theDoNotAuditThisMessagemessage. For the default learning transport, this can be found in the.folder.learningtransport