Native message access

Component: Azure Service Bus Transport
NuGet Package NServiceBus.Transport.AzureServiceBus (1.9)
Target NServiceBus Version: 7.x

This document describes how to access native message information with the Azure Service Bus transport.

  • Incoming message access is available from version 1.4.0 and above.
  • Outgoing message access is available from version 1.7.0 and above.

Access to the native Azure Service Bus incoming message

It can sometimes be useful to access the native Service Bus incoming message from behaviors and handlers. When a message is received, the transport adds the native Service Bus Message to the message processing context. Use the code below to access the message details from a pipeline behavior:

class DoNotAttemptMessageProcessingIfMessageIsNotLocked : Behavior<ITransportReceiveContext>
{
    public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        var lockedUntilUtc = context.Extensions.Get<Message>().SystemProperties.LockedUntilUtc;

        if (lockedUntilUtc <= DateTime.UtcNow)
        {
            return next();
        }

        throw new Exception($"Message lock lost for MessageId {context.Message.MessageId} and it cannot be processed.");
    }
}

The behavior above uses the native message's LockedUntilUtc system property to determine where the message lost its lock as a result of aggressive prefetching and slow processing. If desired, a custom recoverability policy can be used so that the message will skip attempted retry processing that otherwise would be guaranteed to fail due to the message's lost lock.

Access to the native Azure Service Bus outgoing message

It can also be useful to access the native Service Bus outgoing message from behaviors and handlers for customizations.

Customize an outgoing message from a message handler:

// send a command
var sendOptions = new SendOptions();
sendOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");
await context.Send(new MyCommand(), sendOptions).ConfigureAwait(false);

// publish an event
var publishOptions = new PublishOptions();
publishOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");
await context.Publish(new MyEvent(), publishOptions).ConfigureAwait(false);

Customize an outgoing message using IMessageSession:

// send a command
var sendOptions = new SendOptions();
sendOptions.CustomizeNativeMessage(m => m.Label = "custom-label");
await messageSession.Send(new MyCommand(), sendOptions).ConfigureAwait(false);

// publish an event
var publishOptions = new PublishOptions();
publishOptions.CustomizeNativeMessage(m => m.Label = "custom-label");
await messageSession.Publish(new MyEvent(), publishOptions).ConfigureAwait(false);

Customize an outgoing message from a physical behavior:

public class PhysicalBehavior : Behavior<IIncomingPhysicalMessageContext>
{
    public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
    {
        var sendOptions = new SendOptions();
        sendOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");

        await context.Send(new MyCommand(), sendOptions);

        await next();
    }
}

Customize an outgoing message from a logical behavior:

public class LogicalBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var publishOptions = new PublishOptions();
        publishOptions.CustomizeNativeMessage(context, m => m.Label = "custom-label");

        await context.Publish(new MyEvent(), publishOptions);

        await next();
    }
}

Testing

Test the customization of an outgoing message using IMessageSession:

[Test]
public async Task CustomizationSession()
{
    var testableMessageSession = new TestableMessageSession();
    var someCodeUsingTheSession = new SomeCodeUsingTheSession(testableMessageSession);

    await someCodeUsingTheSession.Execute();

    var publishedMessage = testableMessageSession.PublishedMessages.Single();
    var customization = publishedMessage.Options.GetNativeMessageCustomization();

    var nativeMessage = new Message();
    customization(nativeMessage);

    Assert.AreEqual("abc", nativeMessage.Label);
}

class SomeCodeUsingTheSession
{
    readonly IMessageSession session;

    public SomeCodeUsingTheSession(IMessageSession session)
    {
        this.session = session;
    }

    public async Task Execute()
    {
        var options = new PublishOptions();
        options.CustomizeNativeMessage(m => m.Label = "abc");
        await session.Publish(new MyEvent(), options);
    }
}

Test the customization of an outgoing message using a handler or a behavior:

[Test]
public async Task CustomizationHandler()
{
    var testableContext = new TestableMessageHandlerContext();

    var handler = new MyHandlerUsingCustomizations();

    await handler.Handle(new MyEvent(), testableContext);

    var publishedMessage = testableContext.PublishedMessages.Single();
    var customization = publishedMessage.Options.GetNativeMessageCustomization(testableContext);

    var nativeMessage = new Message();
    customization(nativeMessage);

    Assert.AreEqual("abc", nativeMessage.Label);
}

class MyHandlerUsingCustomizations : IHandleMessages<MyEvent>
{
    public async Task Handle(MyEvent message, IMessageHandlerContext context)
    {
        var options = new PublishOptions();
        options.CustomizeNativeMessage(context, m => m.Label = "abc");
        await context.Publish(message, options);
    }
}
Native outgoing messages cannot be customized when using the outbox as customizations are not persistent.

Last modified