Manipulating message headers

Component: NServiceBus
NuGet Package NServiceBus (6.x)

The mechanism for header communication is either native headers, if the transport supports that feature, or via a serialized collection of key value pairs. This article covers the various ways of manipulating the message headers.

Reading incoming Headers

Headers can be read for an incoming message.

From a Behavior

public class IncomingBehavior :
    Behavior<IIncomingPhysicalMessageContext>
{
    public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var headers = context.Message.Headers;
        var nsbVersion = headers[Headers.NServiceBusVersion];
        var customHeader = headers["MyCustomHeader"];
        return next();
    }
}

From a Mutator

public class MutateIncomingTransportMessages :
    IMutateIncomingTransportMessages
{
    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        var headers = context.Headers;
        var nsbVersion = headers[Headers.NServiceBusVersion];
        var customHeader = headers["MyCustomHeader"];
        return Task.CompletedTask;
    }
}

From a Handler

public class ReadHandler :
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var headers = context.MessageHeaders;
        var nsbVersion = headers[Headers.NServiceBusVersion];
        var customHeader = headers["MyCustomHeader"];
        return Task.CompletedTask;
    }
}

From a Saga

public class ReadSaga :
    Saga<ReadSagaData>,
    IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var headers = context.MessageHeaders;
        var nsbVersion = headers[Headers.NServiceBusVersion];
        var customHeader = headers["MyCustomHeader"];
        return Task.CompletedTask;
    }

Writing outgoing Headers

Headers can be written for an outgoing message.

From a Behavior

public class OutgoingBehavior :
    Behavior<IOutgoingPhysicalMessageContext>
{
    public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
    {
        var headers = context.Headers;
        headers["MyCustomHeader"] = "My custom value";
        return next();
    }
}

From a Mutator

public class MutateOutgoingTransportMessages :
    IMutateOutgoingTransportMessages
{
    public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
    {
        context.OutgoingHeaders["MyCustomHeader"] = "My custom value";
        return Task.CompletedTask;
    }
}

From a Handler

public class WriteHandler :
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var sendOptions = new SendOptions();
        sendOptions.SetHeader("MyCustomHeader", "My custom value");
        await context.Send(new SomeOtherMessage(), sendOptions)
            .ConfigureAwait(false);

        var replyOptions = new ReplyOptions();
        replyOptions.SetHeader("MyCustomHeader", "My custom value");
        await context.Reply(new SomeOtherMessage(), replyOptions)
            .ConfigureAwait(false);

        var publishOptions = new PublishOptions();
        publishOptions.SetHeader("MyCustomHeader", "My custom value");
        await context.Publish(new SomeOtherMessage(), publishOptions)
            .ConfigureAwait(false);
    }
}

From a Saga

public class WriteSaga :
    Saga<WriteSagaData>,
    IHandleMessages<MyMessage>
{
    public async Task Handle(MyMessage message, IMessageHandlerContext context)
    {
        var sendOptions = new SendOptions();
        sendOptions.SetHeader("MyCustomHeader", "My custom value");
        await context.Send(new SomeOtherMessage(), sendOptions)
            .ConfigureAwait(false);

        var replyOptions = new ReplyOptions();
        replyOptions.SetHeader("MyCustomHeader", "My custom value");
        await context.Reply(new SomeOtherMessage(), replyOptions)
            .ConfigureAwait(false);

        var publishOptions = new PublishOptions();
        publishOptions.SetHeader("MyCustomHeader", "My custom value");
        await context.Publish(new SomeOtherMessage(), publishOptions)
            .ConfigureAwait(false);
    }

For all outgoing messages

NServiceBus supports registering headers at configuration time that are then added to all outgoing messages for the endpoint.

endpointConfiguration.AddHeaderToAllOutgoingMessages("MyGlobalHeader", "some static value");

Samples

Related Articles

  • Handlers
    Write a class to handle messages in NServiceBus.
  • Message Handling Pipeline
    Overview of the message handling pipeline.
  • Message Headers
    List of built-in NServiceBus message headers.
  • Message Mutators
    Message Mutators allow mutation of messages in the pipeline.
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified