This sample shows how to repair a malformed message by implementing a pipeline behavior. The sample uses the Learning Transport and a portable version of the Particular Service Platform tools. Installing ServiceControl is not required.
This sample includes a portable version of the Particular Service Platform tools. These tools are available only on Windows environments but are not necessary to run the sample.
Running the project
Running the project will result in 4 console windows. Wait until the ServicePulse window opens in a web browser.
- Press Enter in the Sender console window.
- Observe the error log output in the Receiver console window: the endpoint cannot process the message because the
Id
field is malformed - it contains lowercase characters. - Switch to ServicePulse. It shows one failed message. Press the retry button and wait until the message is retried.
- Observe the error log output in the Receiver console window. The endpoint still can't process the message.
- Stop the Receiver project.
- Update the Receiver configuration code to register the
FixMessageIdBehavior
behavior in the pipeline by uncommenting the code in theRegisterFixBehavior
region. - Start the updated Receiver endpoint.
- Go back to
Failed Messages
tab, select the failed message and press the retry button again. - Switch to the Receiver console window and observe the successful processing notification.
Code walk-through
Sender
Sends messages of SimpleMessage
type and emulates a bug by sending malformed Id
field values.
Receiver
Retries are disabled in the sample for simplicity; messages are immediately moved to the error queue after a processing failure:
var recoverability = endpointConfiguration.Recoverability();
recoverability.Delayed(
customizations: retriesSettings =>
{
retriesSettings.NumberOfRetries(0);
});
recoverability.Immediate(
customizations: retriesSettings =>
{
retriesSettings.NumberOfRetries(0);
});
This endpoint processes messages of type SimpleMessage
and expects the Id
field not to contain any lowercase characters. Messages with lowercase characters are rejected (and sent to the error queue).
log.Info($"Received message with Id = {message.Id}.");
if (message.Id.Any(char.IsLower))
{
throw new Exception("Lowercase characters are not allowed in message Id.");
}
log.Info($"Successfully processed message with Id = {message.Id}.");
return Task.CompletedTask;
To fix failing messages, the endpoint defines a pipeline behavior to convert lower-case message identifiers to upper-case.
//var pipeline = endpointConfiguration.Pipeline;
//
//pipeline.Register(
// behavior: new FixMessageIdBehavior(),
// description: "Fix message Id");
where FixMessageIdBehavior
is an incoming pipeline behavior
public class FixMessageIdBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is SimpleMessage message)
{
message.Id = message.Id.ToUpper();
context.UpdateMessageInstance(message);
}
await next();
}
}