Recoverability pipeline

Component: NServiceBus
NuGet Package: NServiceBus (8-pre)
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

The recoverability pipeline allows for advanced customization of the metadata captured during message failures as well as full control over the recoverability action taken.

The example below shows how to extend the pipeline with a behavior that:

  • Stores the message body in an external storage
  • Excludes the body from the message sent to the error queue
  • Adds a metadata entry that links to the stored body
public class EnableExternalBodyStorageBehavior : Behavior<IRecoverabilityContext>
{
    private readonly IExternalBodyStorage storage;

    public EnableExternalBodyStorageBehavior(IExternalBodyStorage storage)
    {
        this.storage = storage;
    }

    public async override Task Invoke(IRecoverabilityContext context, Func<Task> next)
    {
        if (context.RecoverabilityAction is MoveToError errorAction)
        {
            var message = context.FailedMessage;
            var bodyUrl = await storage.StoreBody(message.MessageId, message.Body);

            context.Metadata["body-url"] = bodyUrl;

            context.RecoverabilityAction = new SkipFailedMessageBody(errorAction.ErrorQueue);
        }

        await next();
    }

    class SkipFailedMessageBody : MoveToError
    {
        public SkipFailedMessageBody(string errorQueue) : base(errorQueue)
        {
        }

        public override IReadOnlyCollection<IRoutingContext> GetRoutingContexts(IRecoverabilityActionContext context)
        {
            var routingContexts = base.GetRoutingContexts(context);

            foreach (var routingContext in routingContexts)
            {
                // clear out the message body
                routingContext.Message.UpdateBody(ReadOnlyMemory<byte>.Empty);
            }

            return routingContexts;
        }
    }
}

In addition, the behavior must be registered in the pipeline.


Last modified