This document describes how to access native message information with the Azure Service Bus transport.
- Incoming message access is available from version 1.4.0 and above.
- Outgoing message access is available from version 1.7.0 and above.
Access to the native Azure Service Bus incoming message
It can sometimes be useful to access the native Service Bus incoming message from behaviors and handlers. When a message is received, the transport adds the native Service Bus Message
to the message processing context. Use the code below to access the message details from a pipeline behavior:
class DoNotAttemptMessageProcessingIfMessageIsNotLocked : Behavior<ITransportReceiveContext>
{
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
var lockedUntilUtc = context.Extensions.Get<Message>().SystemProperties.LockedUntilUtc;
if (lockedUntilUtc <= DateTime.UtcNow)
{
return next();
}
throw new Exception($"Message lock lost for MessageId {context.Message.MessageId} and it cannot be processed.");
}
}
The behavior above uses the native message's LockedUntilUtc
system property to determine where the message lost its lock as a result of aggressive prefetching and slow processing. If desired, a custom recoverability policy can be used so that the message will skip attempted retry processing that otherwise would be guaranteed to fail due to the message's lost lock.
Access to the native Azure Service Bus outgoing message
It can also be useful to access the native Service Bus outgoing message from behaviors and handlers for customizations.
Customize an outgoing message from a message handler:
// send a command
var sendOptions = new SendOptions();
sendOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");
await context.Send(new MyCommand(), sendOptions);
// publish an event
var publishOptions = new PublishOptions();
publishOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");
await context.Publish(new MyEvent(), publishOptions);
Customize an outgoing message using IMessageSession
:
// send a command
var sendOptions = new SendOptions();
sendOptions.CustomizeNativeMessage(m => m.Label = "custom-label");
await messageSession.Send(new MyCommand(), sendOptions);
// publish an event
var publishOptions = new PublishOptions();
publishOptions.CustomizeNativeMessage(m => m.Label = "custom-label");
await messageSession.Publish(new MyEvent(), publishOptions);
Customize an outgoing message from a physical behavior:
public class PhysicalBehavior : Behavior<IIncomingPhysicalMessageContext>
{
public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var sendOptions = new SendOptions();
sendOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");
await context.Send(new MyCommand(), sendOptions);
await next();
}
}
Customize an outgoing message from a logical behavior:
public class LogicalBehavior : Behavior<IIncomingLogicalMessageContext>
{
public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var publishOptions = new PublishOptions();
publishOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");
await context.Publish(new MyEvent(), publishOptions);
await next();
}
}
Testing
Test the customization of an outgoing message using IMessageSession
:
[Test]
public async Task CustomizationSession()
{
var testableMessageSession = new TestableMessageSession();
var someCodeUsingTheSession = new SomeCodeUsingTheSession(testableMessageSession);
await someCodeUsingTheSession.Execute();
var publishedMessage = testableMessageSession.PublishedMessages.Single();
var customization = publishedMessage.Options.GetNativeMessageCustomization();
var nativeMessage = new Message();
customization(nativeMessage);
Assert.AreEqual("abc", nativeMessage.Label);
}
class SomeCodeUsingTheSession
{
readonly IMessageSession session;
public SomeCodeUsingTheSession(IMessageSession session)
{
this.session = session;
}
public async Task Execute()
{
var options = new PublishOptions();
options.CustomizeNativeMessage(m => m.Label = "abc");
await session.Publish(new MyEvent(), options);
}
}
Test the customization of an outgoing message using a handler or a behavior:
[Test]
public async Task CustomizationHandler()
{
var testableContext = new TestableMessageHandlerContext();
var handler = new MyHandlerUsingCustomizations();
await handler.Handle(new MyEvent(), testableContext);
var publishedMessage = testableContext.PublishedMessages.Single();
var customization = publishedMessage.Options.GetNativeMessageCustomization(testableContext);
var nativeMessage = new Message();
customization(nativeMessage);
Assert.AreEqual("abc", nativeMessage.Label);
}
class MyHandlerUsingCustomizations : IHandleMessages<MyEvent>
{
public async Task Handle(MyEvent message, IMessageHandlerContext context)
{
var options = new PublishOptions();
options.CustomizeNativeMessage(context, m => m.Label = "abc");
await context.Publish(message, options);
}
}
Native outgoing messages cannot be customized when using the outbox as customizations are not persistent.