The mechanism for header communication is either native headers, if the transport supports that feature, or via a serialized collection of key value pairs. This article covers the various ways of manipulating the message headers.
Reading incoming headers
Headers can be read for an incoming message.
From a behavior
public class IncomingBehavior :
Behavior<IIncomingPhysicalMessageContext>
{
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var headers = context.Message.Headers;
var nsbVersion = headers[Headers.NServiceBusVersion];
var customHeader = headers["MyCustomHeader"];
return next();
}
}
From a mutator
public class MutateIncomingTransportMessages :
IMutateIncomingTransportMessages
{
public Task MutateIncoming(MutateIncomingTransportMessageContext context)
{
var headers = context.Headers;
var nsbVersion = headers[Headers.NServiceBusVersion];
var customHeader = headers["MyCustomHeader"];
return Task.CompletedTask;
}
}
From a handler
public class ReadHandler :
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
var headers = context.MessageHeaders;
var nsbVersion = headers[Headers.NServiceBusVersion];
var customHeader = headers["MyCustomHeader"];
return Task.CompletedTask;
}
}
From a saga
public class ReadSaga :
Saga<ReadSagaData>,
IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
var headers = context.MessageHeaders;
var nsbVersion = headers[Headers.NServiceBusVersion];
var customHeader = headers["MyCustomHeader"];
return Task.CompletedTask;
}
Writing outgoing headers
Headers can be written for an outgoing message.
From a behavior
public class OutgoingBehavior :
Behavior<IOutgoingPhysicalMessageContext>
{
public override Task Invoke(IOutgoingPhysicalMessageContext context, Func<Task> next)
{
var headers = context.Headers;
headers["MyCustomHeader"] = "My custom value";
return next();
}
}
From a mutator
public class MutateOutgoingTransportMessages :
IMutateOutgoingTransportMessages
{
public Task MutateOutgoing(MutateOutgoingTransportMessageContext context)
{
context.OutgoingHeaders["MyCustomHeader"] = "My custom value";
return Task.CompletedTask;
}
}
From a handler
public class WriteHandler :
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var sendOptions = new SendOptions();
sendOptions.SetHeader("MyCustomHeader", "My custom value");
await context.Send(new SomeOtherMessage(), sendOptions)
.ConfigureAwait(false);
var replyOptions = new ReplyOptions();
replyOptions.SetHeader("MyCustomHeader", "My custom value");
await context.Reply(new SomeOtherMessage(), replyOptions)
.ConfigureAwait(false);
var publishOptions = new PublishOptions();
publishOptions.SetHeader("MyCustomHeader", "My custom value");
await context.Publish(new SomeOtherMessage(), publishOptions)
.ConfigureAwait(false);
}
}
From a saga
public class WriteSaga :
Saga<WriteSagaData>,
IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var sendOptions = new SendOptions();
sendOptions.SetHeader("MyCustomHeader", "My custom value");
await context.Send(new SomeOtherMessage(), sendOptions)
.ConfigureAwait(false);
var replyOptions = new ReplyOptions();
replyOptions.SetHeader("MyCustomHeader", "My custom value");
await context.Reply(new SomeOtherMessage(), replyOptions)
.ConfigureAwait(false);
var publishOptions = new PublishOptions();
publishOptions.SetHeader("MyCustomHeader", "My custom value");
await context.Publish(new SomeOtherMessage(), publishOptions)
.ConfigureAwait(false);
}
For all outgoing messages
NServiceBus supports registering headers at configuration time that are then added to all outgoing messages for the endpoint.
endpointConfiguration.AddHeaderToAllOutgoingMessages("MyGlobalHeader", "some static value");