Introduction
Headers can be read and manipulated at many extension points. This sample shows a minimal usage of that manipulation at each of those points.
Reading incoming headers inside the outgoing context
For all the below samples that mutate the outgoing pipeline they also (optionally) read from the incoming context. The reason it is optional is that an incoming context will only exist when the current message being sent was triggered from inside a saga or a handler. For all other scenarios it will be null.
Adding headers when sending a Message
When performing the standard messaging actions (Send, Publish, Reply etc) headers can be appended to the message being dispatched.
var myMessage = new MyMessage();
await endpointInstance.SendLocal(myMessage);
Using Mutators
Headers can be manipulated by implementing any of the message mutation interfaces.
IMutateIncomingMessages
public class MutateIncomingMessages :
IMutateIncomingMessages
{
public Task MutateIncoming(MutateIncomingMessageContext context)
{
var headers = context.Headers;
headers.Add("MutateIncomingMessages", "ValueMutateIncomingMessages");
return Task.CompletedTask;
}
}
IMutateIncomingTransportMessages
public class MutateIncomingTransportMessages :
IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
var headers = context.Headers;
headers.Add("MutateIncomingTransportMessages", "ValueMutateIncomingTransportMessages");
return Task.CompletedTask;
}
}
IMutateOutgoingMessages
public class MutateOutgoingMessages :
IMutateOutgoingMessages
{
public Task MutateOutgoing(MutateOutgoingMessageContext context)
{
var headers = context.OutgoingHeaders;
headers["MutateOutgoingMessages"] = "ValueMutateOutgoingMessages";
return Task.CompletedTask;
}
}
IMutateOutgoingTransportMessages
public class MutateOutgoingTransportMessages :
IMutateOutgoingTransportMessages
{
public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
{
var headers = context.OutgoingHeaders;
headers["MutateOutgoingTransportMessages"] = "ValueMutateOutgoingTransportMessages";
return Task.CompletedTask;
}
}
Using the Pipeline
Headers can be manipulated at any step in the pipeline.
Configuring the Pipeline
Configure the pipeline changes as follows.
endpointConfiguration.Pipeline.Register(typeof(IncomingHeaderBehavior), "Manipulates incoming headers");
endpointConfiguration.Pipeline.Register(typeof(OutgoingHeaderBehavior), "Manipulates outgoing headers");
Note that the injection is contextual to the other existing steps in the pipeline. In this case the injection is happening after transport message mutation has occurred.
The outgoing Behavior
class OutgoingHeaderBehavior :
Behavior<IOutgoingPhysicalMessageContext>
{
public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
{
var headers = context.Headers;
headers["OutgoingHeaderBehavior"] = "ValueOutgoingHeaderBehavior";
return next();
}
}
The incoming Behavior
class IncomingHeaderBehavior :
Behavior<IIncomingPhysicalMessageContext>
{
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var headers = context.Message.Headers;
headers.Add("IncomingHeaderBehavior", "ValueIncomingHeaderBehavior");
return next();
}
}
Globally for all outgoing messages
A list of headers can be defined that are automatically appended to all messages sent though a given instance of the endpoint configuration.
endpointConfiguration.AddHeaderToAllOutgoingMessages("AllOutgoing", "ValueAllOutgoing");
The Handler
While the current contextual headers can be read in any of the above scenarios, in this sample all headers will be written from the receiving handler.
public class MyHandler :
IHandleMessages<MyMessage>
{
static ILog log = LogManager.GetLogger<MyHandler>();
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
log.Info("Hello from MyHandler");
var headers = context.MessageHeaders;
foreach (var line in headers.OrderBy(x => x.Key)
.Select(x => $"Key={x.Key}, Value={x.Value}"))
{
log.Info(line);
}
return Task.CompletedTask;
}
}