Header Manipulation

Component: NServiceBus | Nuget: NServiceBus (Version: 5.x)

Introduction

Headers can be read and manipulated at many extension points. This Sample shows a minimal usage of that manipulation at each of those points.

Reading incoming headers inside the outgoing context

For all the below samples that mutate the outgoing pipeline they also (optionally) read from the incoming context. The reason it is "optionally" is that an incoming context will only exist when the current message being sent was triggered from inside a Saga or a handler. For all other scenarios it will be null.

Adding headers when sending a Message

When performing the standard messaging actions (Send, Publish, Reply etc) headers can be appended to the message being dispatched.

Edit
var myMessage = new MyMessage();
bus.SetMessageHeader(myMessage, "SendingMessage", "ValueSendingMessage");
bus.SendLocal(myMessage);

Using Mutators

Headers can be manipulated by implementing any of the message mutation interfaces.

IMessageMutator

Edit
public class MessageMutator :
    IMessageMutator
{
    IBus bus;

    public MessageMutator(IBus bus)
    {
        this.bus = bus;
    }
    public object MutateOutgoing(object message)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        bus.SetMessageHeader(
            msg: message,
            key: "MessageMutater_Outgoing",
            value: "ValueMessageMutater_Outgoing");
        return message;
    }

    public object MutateIncoming(object message)
    {
        var headers = bus.CurrentMessageContext.Headers;
        headers.Add("MessageMutator_Incoming", "ValueMessageMutator_Incoming");
        return message;
    }
}

IMutateTransportMessages

Edit
public class MutateTransportMessages :
    IMutateTransportMessages
{
    public void MutateIncoming(TransportMessage transportMessage)
    {
        var headers = transportMessage.Headers;
        headers.Add("MutateTransportMessages_Incoming", "ValueMutateTransportMessages_Incoming");
    }

    public void MutateOutgoing(LogicalMessage logicalMessage, TransportMessage transportMessage)
    {
        var headers = transportMessage.Headers;
        headers.Add("MutateTransportMessages_Outgoing", "ValueMutateTransportMessages_Outgoing");
    }
}

IMutateIncomingMessages

Edit
public class MutateIncomingMessages :
    IMutateIncomingMessages
{
    IBus bus;

    public MutateIncomingMessages(IBus bus)
    {
        this.bus = bus;
    }

    public object MutateIncoming(object message)
    {
        bus.CurrentMessageContext
            .Headers
            .Add("MutateIncomingMessages", "ValueMutateIncomingMessages");
        return message;
    }
}

IMutateIncomingTransportMessages

Edit
public class MutateIncomingTransportMessages :
    IMutateIncomingTransportMessages
{
    public void MutateIncoming(TransportMessage transportMessage)
    {
        transportMessage.Headers
            .Add("MutateIncomingTransportMessages", "ValueMutateIncomingTransportMessages");
    }
}

IMutateOutgoingMessages

Edit
public class MutateOutgoingMessages :
    IMutateOutgoingMessages
{
    IBus bus;

    public MutateOutgoingMessages(IBus bus)
    {
        this.bus = bus;
    }
    public object MutateOutgoing(object message)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        bus.SetMessageHeader(
            msg: message,
            key: "MutateOutgoingMessages",
            value: "ValueMutateOutgoingMessages");
        return message;
    }
}

IMutateOutgoingTransportMessages

Edit
public class MutateOutgoingTransportMessages :
    IMutateOutgoingTransportMessages
{
    IBus bus;

    public MutateOutgoingTransportMessages(IBus bus)
    {
        this.bus = bus;
    }

    public void MutateOutgoing(LogicalMessage logicalMessage, TransportMessage transportMessage)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        transportMessage.Headers
            .Add("MutateOutgoingTransportMessages", "ValueMutateOutgoingTransportMessages");
    }
}

Using the Pipeline

Headers can be manipulated at any step in the pipeline.

Configuring the Pipeline

Configure the pipeline changes as follows.

Edit
public class HeaderFeature :
    Feature
{
    internal HeaderFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Register<IncomingHeaderRegistration>();
        pipeline.Register<OutgoingHeaderRegistration>();
    }
}

public class IncomingHeaderRegistration :
    RegisterStep
{
    public IncomingHeaderRegistration()
        : base(
            stepId: "IncomingHeaderManipulation",
            behavior: typeof(IncomingHeaderBehavior),
            description: "Manipulates incoming headers")
    {
        InsertAfter(WellKnownStep.MutateIncomingTransportMessage);
        InsertBefore(WellKnownStep.InvokeHandlers);
    }
}

public class OutgoingHeaderRegistration :
    RegisterStep
{
    public OutgoingHeaderRegistration()
        : base(
            stepId: "OutgoingHeaderManipulation",
            behavior: typeof(OutgoingHeaderBehavior),
            description: "Manipulates outgoing headers")
    {
        InsertAfter(WellKnownStep.MutateOutgoingTransportMessage);
        InsertBefore(WellKnownStep.DispatchMessageToTransport);
    }
}

Note that the injection is contextual to the other exiting steps in the pipeline. Do in this case the injection is happening after Transport message mutation has occurred.

The outgoing Behavior

Edit
class OutgoingHeaderBehavior :
    IBehavior<OutgoingContext>
{
    IBus bus;

    public OutgoingHeaderBehavior(IBus bus)
    {
        this.bus = bus;
    }

    public void Invoke(OutgoingContext context, Action next)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        var headers = context.OutgoingMessage.Headers;
        headers.Add("OutgoingHeaderBehavior", "ValueOutgoingHeaderBehavior");
        next();
    }
}

The incoming Behavior

Edit
class IncomingHeaderBehavior :
    IBehavior<IncomingContext>
{
    public void Invoke(IncomingContext context, Action next)
    {
        var headers = context.PhysicalMessage.Headers;
        headers.Add("IncomingHeaderBehavior", "ValueIncomingHeaderBehavior");
        next();
    }
}

Globally for all outgoing messages

A list of headers can be defined that are automatically appended to all messages sent though a give instance of the Bus.

Edit
var startableBus = Bus.Create(busConfiguration);
startableBus.OutgoingHeaders.Add("AllOutgoing", "ValueAllOutgoing");
using (var bus = startableBus.Start())
{

The Handler

While the current contextual headers can be read in any of the above scenarios in this sample all headers will be written from the receiving Handler.

Edit
public class MyHandler :
    IHandleMessages<MyMessage>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<MyHandler>();

    public MyHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(MyMessage message)
    {
        log.Info("Hello from MyHandler");
        var headers = bus.CurrentMessageContext.Headers;
        foreach (var line in headers.OrderBy(x => x.Key)
            .Select(x => $"Key={x.Key}, Value={x.Value}"))
        {
            log.Info(line);
        }
    }
}

Samples

Related Articles


Last modified