Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Repair malformed messages using pipeline behavior

Component: NServiceBus
NuGet Package: NServiceBus (7.x)

This sample shows how to repair a malformed message by implementing a pipeline behavior. The sample uses the Learning Transport and a portable version of the Particular Service Platform tools. Installing ServiceControl is not required.

Running the project

Running the project will result in 4 console windows. Wait until the ServicePulse window opens in a web browser.

  1. Press Enter in the Sender console window.
  2. Observe the error log output in the Receiver console window: the endpoint cannot process the message because the Id field is malformed - it contains lowercase characters.
  3. Switch to ServicePulse. It shows one failed message. Press the retry button and wait until the message is retried.
  4. Observe the error log output in the Receiver console window. The endpoint still can't process the message.
  5. Stop the Receiver project.
  6. Update the Receiver configuration code to register the FixMessageIdBehavior behavior in the pipeline by uncommenting the code in the RegisterFixBehavior region.
  7. Start the updated Receiver endpoint.
  8. Go back to Failed Messages tab, select the failed message and press the retry button again.
  9. Switch to the Receiver console window and observe the successful processing notification.

Code walk-through

Sender

Sends messages of SimpleMessage type and emulates a bug by sending malformed Id field values.

Receiver

Retries are disabled in the sample for simplicity; messages are immediately moved to the error queue after a processing failure:

var recoverability = endpointConfiguration.Recoverability();

recoverability.Delayed(
    customizations: retriesSettings =>
    {
        retriesSettings.NumberOfRetries(0);
    });
recoverability.Immediate(
    customizations: retriesSettings =>
    {
        retriesSettings.NumberOfRetries(0);
    });

This endpoint processes messages of type SimpleMessage and expects the Id field not to contain any lowercase characters. Messages with lowercase characters are rejected (and sent to the error queue).

log.Info($"Received message with Id = {message.Id}.");
if (message.Id.Any(char.IsLower))
{
    throw new Exception("Lowercase characters are not allowed in message Id.");
}
log.Info($"Successfully processed message with Id = {message.Id}.");
return Task.CompletedTask;

To fix failing messages, the endpoint defines a pipeline behavior to convert lower-case message identifiers to upper-case.

//var pipeline = endpointConfiguration.Pipeline;
//
//pipeline.Register(
//    behavior: new FixMessageIdBehavior(),
//    description: "Fix message Id");

where FixMessageIdBehavior is an incoming pipeline behavior

public class FixMessageIdBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        if (context.Message.Instance is SimpleMessage message)
        {
            message.Id = message.Id.ToUpper();

            context.UpdateMessageInstance(message);
        }

        await next();
    }
}