Header Manipulation

Component: NServiceBus
NuGet NServiceBus (5.x)

Introduction

Headers can be read and manipulated at many extension points. This Sample shows a minimal usage of that manipulation at each of those points.

Reading incoming headers inside the outgoing context

For all the below samples that mutate the outgoing pipeline they also (optionally) read from the incoming context. The reason it is "optionally" is that an incoming context will only exist when the current message being sent was triggered from inside a Saga or a handler. For all other scenarios it will be null.

Adding headers when sending a Message

When performing the standard messaging actions (Send, Publish, Reply etc) headers can be appended to the message being dispatched.

var myMessage = new MyMessage();
bus.SetMessageHeader(myMessage, "SendingMessage", "ValueSendingMessage");
bus.SendLocal(myMessage);

Using Mutators

Headers can be manipulated by implementing any of the message mutation interfaces.

IMessageMutator

public class MessageMutator :
    IMessageMutator
{
    IBus bus;

    public MessageMutator(IBus bus)
    {
        this.bus = bus;
    }
    public object MutateOutgoing(object message)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        bus.SetMessageHeader(
            msg: message,
            key: "MessageMutater_Outgoing",
            value: "ValueMessageMutater_Outgoing");
        return message;
    }

    public object MutateIncoming(object message)
    {
        var headers = bus.CurrentMessageContext.Headers;
        headers.Add("MessageMutator_Incoming", "ValueMessageMutator_Incoming");
        return message;
    }
}

IMutateTransportMessages

public class MutateTransportMessages :
    IMutateTransportMessages
{
    public void MutateIncoming(TransportMessage transportMessage)
    {
        var headers = transportMessage.Headers;
        headers.Add("MutateTransportMessages_Incoming", "ValueMutateTransportMessages_Incoming");
    }

    public void MutateOutgoing(LogicalMessage logicalMessage, TransportMessage transportMessage)
    {
        var headers = transportMessage.Headers;
        headers.Add("MutateTransportMessages_Outgoing", "ValueMutateTransportMessages_Outgoing");
    }
}

IMutateIncomingMessages

public class MutateIncomingMessages :
    IMutateIncomingMessages
{
    IBus bus;

    public MutateIncomingMessages(IBus bus)
    {
        this.bus = bus;
    }

    public object MutateIncoming(object message)
    {
        bus.CurrentMessageContext
            .Headers
            .Add("MutateIncomingMessages", "ValueMutateIncomingMessages");
        return message;
    }
}

IMutateIncomingTransportMessages

public class MutateIncomingTransportMessages :
    IMutateIncomingTransportMessages
{
    public void MutateIncoming(TransportMessage transportMessage)
    {
        transportMessage.Headers
            .Add("MutateIncomingTransportMessages", "ValueMutateIncomingTransportMessages");
    }
}

IMutateOutgoingMessages

public class MutateOutgoingMessages :
    IMutateOutgoingMessages
{
    IBus bus;

    public MutateOutgoingMessages(IBus bus)
    {
        this.bus = bus;
    }
    public object MutateOutgoing(object message)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        bus.SetMessageHeader(
            msg: message,
            key: "MutateOutgoingMessages",
            value: "ValueMutateOutgoingMessages");
        return message;
    }
}

IMutateOutgoingTransportMessages

public class MutateOutgoingTransportMessages :
    IMutateOutgoingTransportMessages
{
    IBus bus;

    public MutateOutgoingTransportMessages(IBus bus)
    {
        this.bus = bus;
    }

    public void MutateOutgoing(LogicalMessage logicalMessage, TransportMessage transportMessage)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        transportMessage.Headers
            .Add("MutateOutgoingTransportMessages", "ValueMutateOutgoingTransportMessages");
    }
}

Using the Pipeline

Headers can be manipulated at any step in the pipeline.

Configuring the Pipeline

Configure the pipeline changes as follows.

public class HeaderFeature :
    Feature
{
    internal HeaderFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Register<IncomingHeaderRegistration>();
        pipeline.Register<OutgoingHeaderRegistration>();
    }
}

public class IncomingHeaderRegistration :
    RegisterStep
{
    public IncomingHeaderRegistration()
        : base(
            stepId: "IncomingHeaderManipulation",
            behavior: typeof(IncomingHeaderBehavior),
            description: "Manipulates incoming headers")
    {
        InsertAfter(WellKnownStep.MutateIncomingTransportMessage);
        InsertBefore(WellKnownStep.InvokeHandlers);
    }
}

public class OutgoingHeaderRegistration :
    RegisterStep
{
    public OutgoingHeaderRegistration()
        : base(
            stepId: "OutgoingHeaderManipulation",
            behavior: typeof(OutgoingHeaderBehavior),
            description: "Manipulates outgoing headers")
    {
        InsertAfter(WellKnownStep.MutateOutgoingTransportMessage);
        InsertBefore(WellKnownStep.DispatchMessageToTransport);
    }
}

Note that the injection is contextual to the other exiting steps in the pipeline. Do in this case the injection is happening after Transport message mutation has occurred.

The outgoing Behavior

class OutgoingHeaderBehavior :
    IBehavior<OutgoingContext>
{
    IBus bus;

    public OutgoingHeaderBehavior(IBus bus)
    {
        this.bus = bus;
    }

    public void Invoke(OutgoingContext context, Action next)
    {
        var incomingContext = bus.CurrentMessageContext;
        var incomingMessageId = incomingContext?.Headers["NServiceBus.MessageId"];

        var headers = context.OutgoingMessage.Headers;
        headers.Add("OutgoingHeaderBehavior", "ValueOutgoingHeaderBehavior");
        next();
    }
}

The incoming Behavior

class IncomingHeaderBehavior :
    IBehavior<IncomingContext>
{
    public void Invoke(IncomingContext context, Action next)
    {
        var headers = context.PhysicalMessage.Headers;
        headers.Add("IncomingHeaderBehavior", "ValueIncomingHeaderBehavior");
        next();
    }
}

Globally for all outgoing messages

A list of headers can be defined that are automatically appended to all messages sent though a give instance of the Bus.

var startableBus = Bus.Create(busConfiguration);
startableBus.OutgoingHeaders.Add("AllOutgoing", "ValueAllOutgoing");
using (var bus = startableBus.Start())
{

The Handler

While the current contextual headers can be read in any of the above scenarios in this sample all headers will be written from the receiving Handler.

public class MyHandler :
    IHandleMessages<MyMessage>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<MyHandler>();

    public MyHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(MyMessage message)
    {
        log.Info("Hello from MyHandler");
        var headers = bus.CurrentMessageContext.Headers;
        foreach (var line in headers.OrderBy(x => x.Key)
            .Select(x => $"Key={x.Key}, Value={x.Value}"))
        {
            log.Info(line);
        }
    }
}

Samples

Related Articles


Last modified