Header Manipulation

Component: NServiceBus | Nuget: NServiceBus (Version: 6.x)

Introduction

Headers can be read and manipulated at many extension points. This Sample shows a minimal usage of that manipulation at each of those points.

Reading incoming headers inside the outgoing context

For all the below samples that mutate the outgoing pipeline they also (optionally) read from the incoming context. The reason it is "optionally" is that an incoming context will only exist when the current message being sent was triggered from inside a Saga or a handler. For all other scenarios it will be null.

Adding headers when sending a Message

When performing the standard messaging actions (Send, Publish, Reply etc) headers can be appended to the message being dispatched.

Edit
var myMessage = new MyMessage();
await endpointInstance.SendLocal(myMessage)
    .ConfigureAwait(false);

Using Mutators

Headers can be manipulated by implementing any of the message mutation interfaces.

IMutateIncomingMessages

Edit
public class MutateIncomingMessages :
    IMutateIncomingMessages
{
    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        var headers = context.Headers;
        headers.Add("MutateIncomingMessages", "ValueMutateIncomingMessages");
        return Task.CompletedTask;
    }
}

IMutateIncomingTransportMessages

Edit
public class MutateIncomingTransportMessages :
    IMutateIncomingTransportMessages
{
    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        var headers = context.Headers;
        headers.Add("MutateIncomingTransportMessages", "ValueMutateIncomingTransportMessages");
        return Task.CompletedTask;
    }
}

IMutateOutgoingMessages

Edit
public class MutateOutgoingMessages :
    IMutateOutgoingMessages
{
    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        var headers = context.OutgoingHeaders;
        headers["MutateOutgoingMessages"] = "ValueMutateOutgoingMessages";
        return Task.CompletedTask;
    }
}

IMutateOutgoingTransportMessages

Edit
public class MutateOutgoingTransportMessages :
    IMutateOutgoingTransportMessages
{
    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        var headers = context.OutgoingHeaders;
        headers["MutateOutgoingTransportMessages"] = "ValueMutateOutgoingTransportMessages";
        return Task.CompletedTask;
    }
}

Using the Pipeline

Headers can be manipulated at any step in the pipeline.

Configuring the Pipeline

Configure the pipeline changes as follows.

Edit
public class HeaderFeature :
    Feature
{
    internal HeaderFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Register<IncomingHeaderRegistration>();
        pipeline.Register<OutgoingHeaderRegistration>();
    }
}

public class IncomingHeaderRegistration :
    RegisterStep
{
    public IncomingHeaderRegistration()
        : base(
            stepId: "IncomingHeaderManipulation",
            behavior: typeof(IncomingHeaderBehavior),
            description: "Manipulates incoming headers")
    {
    }
}

public class OutgoingHeaderRegistration :
    RegisterStep
{
    public OutgoingHeaderRegistration()
        : base(
            stepId: "OutgoingHeaderManipulation",
            behavior: typeof(OutgoingHeaderBehavior),
            description: "Manipulates outgoing headers")
    {
    }
}

Note that the injection is contextual to the other exiting steps in the pipeline. Do in this case the injection is happening after Transport message mutation has occurred.

The outgoing Behavior

Edit
class OutgoingHeaderBehavior :
    Behavior<IOutgoingPhysicalMessageContext>
{
    public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {
        var headers = context.Headers;
        headers["OutgoingHeaderBehavior"] = "ValueOutgoingHeaderBehavior";
        return next();
    }
}

The incoming Behavior

Edit
class IncomingHeaderBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var headers = context.Message.Headers;
        headers.Add("IncomingHeaderBehavior", "ValueIncomingHeaderBehavior");
        return next();
    }
}

Globally for all outgoing messages

A list of headers can be defined that are automatically appended to all messages sent though a give instance of the Bus.

Edit
endpointConfiguration.AddHeaderToAllOutgoingMessages("AllOutgoing", "ValueAllOutgoing");

The Handler

While the current contextual headers can be read in any of the above scenarios in this sample all headers will be written from the receiving Handler.

Edit
public class MyHandler :
    IHandleMessages<MyMessage>
{
    static ILog log = LogManager.GetLogger<MyHandler>();

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        log.Info("Hello from MyHandler");
        var headers = context.MessageHeaders;
        foreach (var line in headers.OrderBy(x => x.Key)
            .Select(x => $"Key={x.Key}, Value={x.Value}"))
        {
            log.Info(line);
        }
        return Task.CompletedTask;
    }
}

Samples

Related Articles


Last modified