This sample demonstrates how to extend the NServiceBus message-processing pipeline with custom behaviors to add filters which prevent certain message types from being forwarded to the audit queue.
Code walk-through
The solution contains a single endpoint with auditing enabled. When it starts, the endpoint sends one AuditThisMessage
message and one DoNotAuditThisMessage
message to itself. Both messages are handled by message handlers. However only the AuditThisMessage
message will be moved to the audit queue, and the DoNotAuditThisMessage
message is filtered out.
Three behaviors are added to the message processing pipeline to implement the desired filtering logic:
AuditFilterContextBehavior
This behavior adds a class to the pipeline contexts Extensions
bag. This class can later be accessed by the other behaviors to share state across behaviors. The state must be added early in the pipeline because anything added to the Extensions
after IIncomingPhysicalMessageContext
is invisible to the IAuditContext
instance.
public class AuditFilterContextBehavior :
Behavior<IIncomingPhysicalMessageContext>
{
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var auditFilterContext = new AuditFilterContext
{
SkipAudit = false
};
context.Extensions.Set(auditFilterContext);
return next();
}
}
AuditRulesBehavior
AuditRulesBehavior
uses the IIncomingLogicalMessageContext
to inspect the incoming message type and it applies rules to determine whether that message should be audited or not. If the message should not be audited, it retrieves the shared state from the context's Extensions
and marks the message.
public class AuditRulesBehavior :
Behavior<IIncomingLogicalMessageContext>
{
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is DoNotAuditThisMessage)
{
var auditFilterContext = context.Extensions.Get<AuditFilterContext>();
auditFilterContext.SkipAudit = true;
}
return next();
}
}
AuditFilterBehavior
This behavior is invoked for every message which is sent to the audit queue. By retrieving the shared state and checking its value, this behavior can stop the auditing pipeline by not invoking the next step.
public class AuditFilterBehavior :
Behavior<IAuditContext>
{
public override Task Invoke(IAuditContext context, Func<Task> next)
{
if (context.Extensions.TryGet(out AuditFilterContext auditFilterContext) &&
auditFilterContext.SkipAudit)
{
return Task.CompletedTask;
}
return next();
}
}
The filtering logic must be registered in the pipeline:
endpointConfiguration.AuditProcessedMessagesTo("audit");
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
stepId: "AuditFilter.Filter",
behavior: typeof(AuditFilterBehavior),
description: "prevents marked messages from being forwarded to the audit queue");
pipeline.Register(
stepId: "AuditFilter.Rules",
behavior: typeof(AuditRulesBehavior),
description: "checks whether a message should be forwarded to the audit queue");
pipeline.Register(
stepId: "AuditFilter.Context",
behavior: typeof(AuditFilterContextBehavior),
description: "adds a shared state for the rules and filter behaviors");
var endpointInstance = await Endpoint.Start(endpointConfiguration);
Running the code
- Run the solution.
- Wait until both messages are handled by their message handlers.
- Verify the configured audit queue (Samples.AuditFilter.Audit) does not contain the
DoNotAuditThisMessage
message.