Satellites

Component: NServiceBus
NuGet Package NServiceBus (6.x)

Satellites are lightweight message processors that run in the same process as their owning endpoint. While they are mostly used by NServiceBus to implement infrastructure features like the TimeoutManager and Gateway, they can be used in scenarios where messages from additional queues other than the main input queue need to be processed. This is useful when the robustness of a separate queue is needed without having to create and setup a new endpoint and configure any message mappings.

Implementing a satellite

The satellite infrastructure allows the handling of messages as they become available on the input queue. To create a satellite, place the following code in the Setup method of a feature:

public class MySatelliteFeature :
    Feature
{
    public MySatelliteFeature()
    {
        EnableByDefault();
    }
    protected override void Setup(FeatureConfigurationContext context)
    {
        context.AddSatelliteReceiver(
            name: "MyCustomSatellite",
            transportAddress: "targetQueue",
            runtimeSettings: PushRuntimeSettings.Default,
            recoverabilityPolicy: (config, errorContext) =>
            {
                return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
            },
            onMessage: OnMessage);
    }

    Task OnMessage(IBuilder builder, MessageContext context)
    {
        // Implement what this satellite needs to do once it receives a message
        var messageId = context.MessageId;
        return Task.CompletedTask;
    }
}

The call to AddSatelliteReceiver registers the action to take when a message is received. In the above example, the satellite will watch a queue named targetQueue.

The MessageContext parameter provides details about the incoming message such as the body and headers. When migrating from a Satellite extension from Versions 5 and below, the implementation steps that were in the Handle method would go into the func provided to the AddSatelliteReceiver method.

Managing errors

Problems processing the message need to be handled by the satellite implementation. The available options are: immediate retry, delayed retry or moving the message to the error queue.

To request an immediate retry use:

context.AddSatelliteReceiver(
    name: "CustomSatellite",
    transportAddress: "targetQueue",
    runtimeSettings: PushRuntimeSettings.Default,
    recoverabilityPolicy: (config, errorContext) =>
    {
        return RecoverabilityAction.ImmediateRetry();
    },
    onMessage: OnMessage);

To request a delayed retry use:

context.AddSatelliteReceiver(
    name: "CustomSatellite",
    transportAddress: "targetQueue",
    runtimeSettings: PushRuntimeSettings.Default,
    recoverabilityPolicy: (config, errorContext) =>
    {
        return RecoverabilityAction.DelayedRetry(
            timeSpan: TimeSpan.FromMinutes(2));
    },
    onMessage: OnMessage);

To request the message to be moved to the error queue use:

context.AddSatelliteReceiver(
    name: "CustomSatellite",
    transportAddress: "targetQueue",
    runtimeSettings: PushRuntimeSettings.Default,
    recoverabilityPolicy: (config, errorContext) =>
    {
        return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
    },
    onMessage: OnMessage);

Injecting CriticalError

In Versions 5 and below, the IAdvancedSatellite offered additional customization when receiving messages in the satellite queue by implementing the GetReceiverCustomization() method. For example, critical error processing or error handling customization, etc. In Versions 6 and above, CriticalError can be resolved via the builder and be used to instruct the endpoint to shutdown if a critical error occurs during the processing of a message.

protected override void Setup(FeatureConfigurationContext context)
{
    context.AddSatelliteReceiver(
        name: "CustomSatellite",
        transportAddress: "targetQueue",
        runtimeSettings: PushRuntimeSettings.Default,
        recoverabilityPolicy: (config, errorContext) =>
        {
            return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
        },
        onMessage: OnMessage);
}

Task OnMessage(IBuilder builder, MessageContext context)
{
    // To raise a critical error
    var exception = new Exception("CriticalError occurred");

    var criticalError = builder.Build<CriticalError>();
    criticalError.Raise("Something bad happened - trigger critical error", exception);

    return Task.CompletedTask;
}

Related Articles


Last modified