Satellites are lightweight message processors that run in the same process as their owning endpoint. While they are mostly used by NServiceBus to implement infrastructure features like the Gateway, they can be used in scenarios where messages from additional queues other than the main input queue need to be processed. This is useful when the robustness of a separate queue is needed without having to create and setup a new endpoint and configure any message mappings.
Implementing a satellite
The satellite infrastructure allows the handling of messages as they become available on the input queue. To create a satellite, place the following code in the Setup
method of a feature:
public class MySatelliteFeature :
Feature
{
public MySatelliteFeature()
{
EnableByDefault();
}
protected override void Setup(FeatureConfigurationContext context)
{
context.AddSatelliteReceiver(
name: "MyCustomSatellite",
transportAddress: new QueueAddress("targetQueue"),
runtimeSettings: PushRuntimeSettings.Default,
recoverabilityPolicy: (config, errorContext) =>
{
return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
},
onMessage: OnMessage);
}
Task OnMessage(IServiceProvider serviceProvider, MessageContext context, CancellationToken cancellationToken)
{
// Implement what this satellite needs to do once it receives a message
var messageId = context.NativeMessageId;
return Task.CompletedTask;
}
}
The call to AddSatelliteReceiver
registers the action to take when a message is received. In the above example, the satellite will watch a queue named targetQueue
.
The MessageContext
parameter provides details about the incoming message such as the body and headers. When migrating from a satellite extension from NServiceBus version 5 and below, the implementation steps that are in the Handle
method go into the func provided to the AddSatelliteReceiver
method.
Managing errors
Problems processing the message need to be handled by the satellite implementation. The available options are: immediate retry, delayed retry, or moving the message to the error queue.
To request an immediate retry:
context.AddSatelliteReceiver(
name: "CustomSatellite",
transportAddress: new QueueAddress("targetQueue"),
runtimeSettings: PushRuntimeSettings.Default,
recoverabilityPolicy: (config, errorContext) =>
{
return RecoverabilityAction.ImmediateRetry();
},
onMessage: OnMessage);
To request a delayed retry:
context.AddSatelliteReceiver(
name: "CustomSatellite",
transportAddress: new QueueAddress("targetQueue"),
runtimeSettings: PushRuntimeSettings.Default,
recoverabilityPolicy: (config, errorContext) =>
{
return RecoverabilityAction.DelayedRetry(
timeSpan: TimeSpan.FromMinutes(2));
},
onMessage: OnMessage);
To request the the message be moved to the error queue:
context.AddSatelliteReceiver(
name: "CustomSatellite",
transportAddress: new QueueAddress("targetQueue"),
runtimeSettings: PushRuntimeSettings.Default,
recoverabilityPolicy: (config, errorContext) =>
{
return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
},
onMessage: OnMessage);
Injecting CriticalError
In NServiceBus version 5 and below, the IAdvancedSatellite
offers additional customization when receiving messages in the satellite queue by implementing the GetReceiverCustomization()
method. For example, critical error processing or error handling customization, etc. In NServiceBus version 6 and above, CriticalError
can be resolved via the builder and be used to instruct the endpoint to shutdown if a critical error occurs during the processing of a message.
protected override void Setup(FeatureConfigurationContext context)
{
context.AddSatelliteReceiver(
name: "CustomSatellite",
transportAddress: new QueueAddress("targetQueue"),
runtimeSettings: PushRuntimeSettings.Default,
recoverabilityPolicy: (config, errorContext) =>
{
return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
},
onMessage: OnMessage);
}
Task OnMessage(IServiceProvider serviceProvider, MessageContext context, CancellationToken cancellationToken)
{
// To raise a critical error
var exception = new Exception("CriticalError occurred");
var criticalError = serviceProvider.GetRequiredService<CriticalError>();
criticalError.Raise("Something bad happened - trigger critical error", exception, cancellationToken);
return Task.CompletedTask;
}