Notifications

Component: NServiceBus | Nuget: NServiceBus (Version: 5.x)

Code walk-through

This sample shows how to use the notification API to capture the following:

Custom Settings in this sample

This sample uses several non-standard settings.

Logging

All errors below Fatal are suppressed to reduce the noise related to raising multiple exceptions

Edit
var defaultFactory = LogManager.Use<DefaultFactory>();
defaultFactory.Level(LogLevel.Fatal);

Delayed Retry Time increase

The time to increase changed to 1 second so the wait for all retries to occur is reduced.

Edit
class ProvideDelayedRetriesConfiguration :
    IProvideConfiguration<SecondLevelRetriesConfig>
{
    public SecondLevelRetriesConfig GetConfiguration()
    {
        return new SecondLevelRetriesConfig
        {
            TimeIncrease = TimeSpan.FromSeconds(1)
        };
    }
}

Plugging to the API

The notifications API is exposed as follows.

Edit
var busConfiguration = new BusConfiguration();
busConfiguration.EndpointName("Samples.Notifications");
Edit
public class SubscribeToNotifications :
    IWantToRunWhenBusStartsAndStops,
    IDisposable
{
    static ILog log = LogManager.GetLogger<SubscribeToNotifications>();
    BusNotifications busNotifications;
    List<IDisposable> unsubscribeStreams = new List<IDisposable>();

    public SubscribeToNotifications(BusNotifications busNotifications)
    {
        this.busNotifications = busNotifications;
    }

    public void Start()
    {
        var errors = busNotifications.Errors;
        var scheduler = Scheduler.Default;
        unsubscribeStreams.Add(
            errors.MessageSentToErrorQueue
                .ObserveOn(scheduler)
                .Subscribe(Log)
            );
        unsubscribeStreams.Add(
            errors.MessageHasBeenSentToSecondLevelRetries
                .ObserveOn(scheduler)
                .Subscribe(Log)
            );
        unsubscribeStreams.Add(
            errors.MessageHasFailedAFirstLevelRetryAttempt
                .ObserveOn(scheduler)
                .Subscribe(Log)
            );
    }

    static string GetMessageString(byte[] body)
    {
        return Encoding.UTF8.GetString(body);
    }

    void Log(FailedMessage failed)
    {
        log.Fatal($@"Message sent to error queue.
        Body:
        {GetMessageString(failed.Body)}");
    }

    void Log(SecondLevelRetry retry)
    {
        log.Fatal($@"Message sent to Delayed Retries.
        RetryAttempt: {retry.RetryAttempt}
        Body:
        {GetMessageString(retry.Body)}");
    }

    void Log(FirstLevelRetry retry)
    {
        log.Fatal($@"Message sent to Immediate Reties.
        RetryAttempt:{retry.RetryAttempt}
        Body:
        {GetMessageString(retry.Body)}");
    }

    public void Stop()
    {
        foreach (var unsubscribeStream in unsubscribeStreams)
        {
            unsubscribeStream.Dispose();
        }
    }

    public void Dispose()
    {
        Stop();
    }
}

Notifications are manipulated at startup time using a combination of IWantToRunWhenBusStartsAndStops and an instance of BusNotifications injected via the container.

Performance Impact

Notifications are executed on the same thread as the NServiceBus pipeline. If long running work needs to be done it should be executed on another thread otherwise the message processing performance can be impacted.

Message Body

Since the message could have failed due to a de-serialization exception it is not possible for the API to provide the instance of the message. For the same reason is it recommended that consumers of this API do not attempt to deserialize the message instance. If the message contents is required for debugging purposes it is recommended to convert it to a string using the .net encoding APIs.

Related Articles

  • Recoverability
    Explains how exception are handled, and actions retries, during message processing.

Last modified