Header Manipulation

Component: NServiceBus
NuGet Package NServiceBus (6.x)

Introduction

Headers can be read and manipulated at many extension points. This sample shows a minimal usage of that manipulation at each of those points.

Reading incoming headers inside the outgoing context

For all the below samples that mutate the outgoing pipeline they also (optionally) read from the incoming context. The reason it is optional is that an incoming context will only exist when the current message being sent was triggered from inside a saga or a handler. For all other scenarios it will be null.

Adding headers when sending a Message

When performing the standard messaging actions (Send, Publish, Reply etc) headers can be appended to the message being dispatched.

var myMessage = new MyMessage();
await endpointInstance.SendLocal(myMessage)
    .ConfigureAwait(false);

Using Mutators

Headers can be manipulated by implementing any of the message mutation interfaces.

IMutateIncomingMessages

public class MutateIncomingMessages :
    IMutateIncomingMessages
{
    public Task MutateIncoming(MutateIncomingMessageContext context)
    {
        var headers = context.Headers;
        headers.Add("MutateIncomingMessages", "ValueMutateIncomingMessages");
        return Task.CompletedTask;
    }
}

IMutateIncomingTransportMessages

public class MutateIncomingTransportMessages :
    IMutateIncomingTransportMessages
{
    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        var headers = context.Headers;
        headers.Add("MutateIncomingTransportMessages", "ValueMutateIncomingTransportMessages");
        return Task.CompletedTask;
    }
}

IMutateOutgoingMessages

public class MutateOutgoingMessages :
    IMutateOutgoingMessages
{
    public Task MutateOutgoing(MutateOutgoingMessageContext context)
    {
        var headers = context.OutgoingHeaders;
        headers["MutateOutgoingMessages"] = "ValueMutateOutgoingMessages";
        return Task.CompletedTask;
    }
}

IMutateOutgoingTransportMessages

public class MutateOutgoingTransportMessages :
    IMutateOutgoingTransportMessages
{
    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        var headers = context.OutgoingHeaders;
        headers["MutateOutgoingTransportMessages"] = "ValueMutateOutgoingTransportMessages";
        return Task.CompletedTask;
    }
}

Using the Pipeline

Headers can be manipulated at any step in the pipeline.

Configuring the Pipeline

Configure the pipeline changes as follows.

public class HeaderFeature :
    Feature
{
    internal HeaderFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var pipeline = context.Pipeline;
        pipeline.Register<IncomingHeaderRegistration>();
        pipeline.Register<OutgoingHeaderRegistration>();
    }
}

public class IncomingHeaderRegistration :
    RegisterStep
{
    public IncomingHeaderRegistration()
        : base(
            stepId: "IncomingHeaderManipulation",
            behavior: typeof(IncomingHeaderBehavior),
            description: "Manipulates incoming headers")
    {
    }
}

public class OutgoingHeaderRegistration :
    RegisterStep
{
    public OutgoingHeaderRegistration()
        : base(
            stepId: "OutgoingHeaderManipulation",
            behavior: typeof(OutgoingHeaderBehavior),
            description: "Manipulates outgoing headers")
    {
    }
}

Note that the injection is contextual to the other existing steps in the pipeline. In this case the injection is happening after transport message mutation has occurred.

The outgoing Behavior

class OutgoingHeaderBehavior :
    Behavior<IOutgoingPhysicalMessageContext>
{
    public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {
        var headers = context.Headers;
        headers["OutgoingHeaderBehavior"] = "ValueOutgoingHeaderBehavior";
        return next();
    }
}

The incoming Behavior

class IncomingHeaderBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var headers = context.Message.Headers;
        headers.Add("IncomingHeaderBehavior", "ValueIncomingHeaderBehavior");
        return next();
    }
}

Globally for all outgoing messages

A list of headers can be defined that are automatically appended to all messages sent though a given instance of the endpoint configuration.

endpointConfiguration.AddHeaderToAllOutgoingMessages("AllOutgoing", "ValueAllOutgoing");

The Handler

While the current contextual headers can be read in any of the above scenarios, in this sample all headers will be written from the receiving handler.

public class MyHandler :
    IHandleMessages<MyMessage>
{
    static ILog log = LogManager.GetLogger<MyHandler>();

    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        log.Info("Hello from MyHandler");
        var headers = context.MessageHeaders;
        foreach (var line in headers.OrderBy(x => x.Key)
            .Select(x => $"Key={x.Key}, Value={x.Value}"))
        {
            log.Info(line);
        }
        return Task.CompletedTask;
    }
}

Samples

Related Articles


Last modified