Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Satellites

Component: NServiceBus
NuGet Package: NServiceBus (9.x)

Satellites are lightweight message processors that run in the same process as their owning endpoint. While they are mostly used by NServiceBus to implement infrastructure features like the Gateway, they can be used in scenarios where messages from additional queues other than the main input queue need to be processed. This is useful when the robustness of a separate queue is needed without having to create and setup a new endpoint and configure any message mappings.

Implementing a satellite

The satellite infrastructure allows the handling of messages as they become available on the input queue. To create a satellite, place the following code in the Setup method of a feature:

public class MySatelliteFeature :
    Feature
{
    public MySatelliteFeature()
    {
        EnableByDefault();
    }
    protected override void Setup(FeatureConfigurationContext context)
    {
        context.AddSatelliteReceiver(
            name: "MyCustomSatellite",
            transportAddress: new QueueAddress("targetQueue"),
            runtimeSettings: PushRuntimeSettings.Default,
            recoverabilityPolicy: (config, errorContext) =>
            {
                return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
            },
            onMessage: OnMessage);
    }

    Task OnMessage(IServiceProvider serviceProvider, MessageContext context, CancellationToken cancellationToken)
    {
        // Implement what this satellite needs to do once it receives a message
        var messageId = context.NativeMessageId;
        return Task.CompletedTask;
    }
}

The call to AddSatelliteReceiver registers the action to take when a message is received. In the above example, the satellite will watch a queue named targetQueue.

The MessageContext parameter provides details about the incoming message such as the body and headers. When migrating from a satellite extension from NServiceBus version 5 and below, the implementation steps that are in the Handle method go into the func provided to the AddSatelliteReceiver method.

Managing errors

Problems processing the message need to be handled by the satellite implementation. The available options are: immediate retry, delayed retry, or moving the message to the error queue.

To request an immediate retry:

context.AddSatelliteReceiver(
    name: "CustomSatellite",
    transportAddress: new QueueAddress("targetQueue"),
    runtimeSettings: PushRuntimeSettings.Default,
    recoverabilityPolicy: (config, errorContext) =>
    {
        return RecoverabilityAction.ImmediateRetry();
    },
    onMessage: OnMessage);

To request a delayed retry:

context.AddSatelliteReceiver(
    name: "CustomSatellite",
    transportAddress: new QueueAddress("targetQueue"),
    runtimeSettings: PushRuntimeSettings.Default,
    recoverabilityPolicy: (config, errorContext) =>
    {
        return RecoverabilityAction.DelayedRetry(
            timeSpan: TimeSpan.FromMinutes(2));
    },
    onMessage: OnMessage);

To request the the message be moved to the error queue:

context.AddSatelliteReceiver(
    name: "CustomSatellite",
    transportAddress: new QueueAddress("targetQueue"),
    runtimeSettings: PushRuntimeSettings.Default,
    recoverabilityPolicy: (config, errorContext) =>
    {
        return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
    },
    onMessage: OnMessage);

Injecting CriticalError

In NServiceBus version 5 and below, the IAdvancedSatellite offers additional customization when receiving messages in the satellite queue by implementing the GetReceiverCustomization() method. For example, critical error processing or error handling customization, etc. In NServiceBus version 6 and above, CriticalError can be resolved via the builder and be used to instruct the endpoint to shutdown if a critical error occurs during the processing of a message.

protected override void Setup(FeatureConfigurationContext context)
{
    context.AddSatelliteReceiver(
        name: "CustomSatellite",
        transportAddress: new QueueAddress("targetQueue"),
        runtimeSettings: PushRuntimeSettings.Default,
        recoverabilityPolicy: (config, errorContext) =>
        {
            return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
        },
        onMessage: OnMessage);
}

Task OnMessage(IServiceProvider serviceProvider, MessageContext context, CancellationToken cancellationToken)
{
    // To raise a critical error
    var exception = new Exception("CriticalError occurred");

    var criticalError = serviceProvider.GetRequiredService<CriticalError>();
    criticalError.Raise("Something bad happened - trigger critical error", exception, cancellationToken);

    return Task.CompletedTask;
}

Related Articles