Custom exception handling can be implemented using pipeline behaviors. To learn more about pipeline and behaviors refer to the documentation on how to manipulate the pipeline with behaviors.
Create a new behavior
Implement a new behavior, which extends the ITransportReceiveContext
context interface. This context provides details about the message at the transport level. Calling next()
in the pipeline will invoke the subsequent pipeline processing steps.
class CustomErrorHandlingBehavior :
Behavior<ITransportReceiveContext>
{
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
return next();
}
}
Handling deserialization errors
To handle deserialization errors, wrap the next()
operation in a try-catch block and handle the MessageDeserializationException
as shown:
try
{
await next()
.ConfigureAwait(false);
}
catch (MessageDeserializationException deserializationException)
{
// Custom processing that needs to occur when a serialization failure occurs.
log.Error("Message deserialization failed", deserializationException);
throw;
}
MessageDeserializationException
exception in the catch block will immediately forward the message to the error queue. A message that fails due to a MessageDeserializationException
will not be retried. If the message must be consumed and removed from the queue, remove the throw
from the catch block to indicate that the message has been successfully processed.Handling other errors
To handle other errors, wrap the next()
operation in a try-catch block and handle the Exception
as shown:
try
{
await next()
.ConfigureAwait(false);
}
catch (Exception exception)
{
// Custom processing that need to occur when a message always fails.
log.Error("Message processing failed", exception);
throw;
}
throw
from the catch block to indicate that the message has been successfully processed.Rolling back
To rollback the receive operation instead of handling the message or to forward it to the error queue, invoke AbortReceiveOperation
as shown below:
try
{
await next()
.ConfigureAwait(false);
}
catch (Exception)
{
// Custom processing that need to occur when a message always fails.
// To rollback the receive operation instead of mark as processed:
context.AbortReceiveOperation();
}
Registering the behavior
In the example below, the behavior CustomErrorHandlingBehavior
is registered to be part of the message handling pipeline. This new behavior is placed at the very beginning of the pipeline. This placement allows inserting code right after a message has been received from the transport and right before the recoverability policy is invoked.
class NewMessageProcessingPipelineStep :
RegisterStep
{
public NewMessageProcessingPipelineStep()
: base(
stepId: "CustomErrorHandlingBehavior",
behavior: typeof(CustomErrorHandlingBehavior),
description: "Adds custom error behavior to pipeline")
{
// Within a stage it is sometimes necessary to configure a specific
// step order. This can be achieved by invoking one of the following methods:
// - InsertAfter,
// - InsertAfterIfExists,
// - InsertBefore,
// - InsertBeforeIfExists
}
}