This sample shows how to repair a malformed message by implementing a pipeline behavior. The sample uses the Learning Transport and a portable version of the Particular Service Platform tools. Installing ServiceControl is not required.
Running the project
Running the project will result in 4 console windows. Wait a moment until the ServicePulse window opens in the browser.
- Press Enter in the Sender console window.
- Observe error log output in the Receiver console window: the endpoint cannot process the message because the
Id
field is malformed - it contains lowercase characters. - Switch to ServicePulse. It should show one failed message. Hit the retry button and wait until the message is retried.
- Observe error log output in the Receiver console window. The endpoint still can't process the message.
- Stop the Receiver project.
- Update the Receiver configuration code to register the
FixMessageIdBehavior
behavior in the pipeline by uncommenting the code in theRegisterFixBehavior
region. - Start the updated Receiver endpoint.
- Go back to
Failed Messages
tab, select the failed message and hit the retry button again. - Switch to the Receiver console window and observe the successful processing notification.
Code walk-through
Sender
Sends messages of SimpleMessage
type and emulates a bug by sending malformed Id
field values.
Receiver
Retries are disabled in the sample for simplicity; messages are immediately moved to the error queue after a processing failure:
var recoverability = endpointConfiguration.Recoverability();
recoverability.Delayed(
customizations: retriesSettings =>
{
retriesSettings.NumberOfRetries(0);
});
recoverability.Immediate(
customizations: retriesSettings =>
{
retriesSettings.NumberOfRetries(0);
});
This endpoint processes messages of type SimpleMessage
and expects the Id
field to not contain any lowercase characters. Messages with lowercase characters are rejected (and sent to the error queue).
log.Info($"Received message with Id = {message.Id}.");
if (message.Id.Any(char.IsLower))
{
throw new Exception("Lowercase characters are not allowed in message Id.");
}
log.Info($"Successfully processed message with Id = {message.Id}.");
return Task.CompletedTask;
To fix failing messages the endpoint defines a pipeline behavior to convert lower-case message identifiers to upper-case.
// var pipeline = endpointConfiguration.Pipeline;
//
// pipeline.Register(
// behavior: new FixMessageIdBehavior(),
// description: "Fix message Id");
where FixMessageIdBehavior
is an incoming pipeline behavior
public class FixMessageIdBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
if (context.Message.Instance is SimpleMessage message)
{
message.Id = message.Id.ToUpper();
context.UpdateMessageInstance(message);
}
await next().ConfigureAwait(false);
}
}