This sample shows how to configure ServiceControl to monitor endpoints and retry messages when using the advanced features of the Azure Storage Queues transport not natively supported by ServiceControl.
The following diagram shows the topology of the solution:
Notice that ServiceControl is in a different namespace than the other endpoints, which means that it can't natively communicate with the Sales
and Shipping
endpoints. This sample shows how to create an adapter to bridge the gap.
The adapter also deals with advanced features of the Azure Storage Queues transport such as multi-storage accounts and secure connection strings.
Prerequisites
- An environment variable named
AzureStorageQueue.
with the connection string for the Azure Storage Queues account to be used byConnectionString. 1 Sales
andShipping
endpoints. - An environment variable named
AzureStorageQueue.
with the connection string for the Azure Storage Queues account to be used by ServiceControl and the adapter.ConnectionString. SC - Install ServiceControl.
- Using the ServiceControl Management tool, set up ServiceControl to monitor endpoints using the Azure Storage Queues transport:
- Add a new ServiceControl instance:
- Use
Particular.
as the instance name (ensure there is no other instance of SC running with the same name).ServiceControl. ASQ - Use the connection string supplied by the
AzureStorageQueue.
environment variable.ConnectionString. SC
- Ensure the
ServiceControl
process is running before running the sample. - Install ServicePulse
Running the project
- Start the projects: Adapter, Sales and Shipping. Ensure the adapter starts first because on start-up it creates a queue that is used for heartbeats.
- Open ServicePulse (by default it's available at
http:/
) and select the Endpoints Overview. The Shipping endpoint should be visible in the Active Endpoints tab as it has the Heartbeats plugin installed./ localhost:9090/ #/ dashboard - Go to the Sales console and press
o
to create an order. - Notice the Shipping endpoint receives the
OrderAccepted
event from Sales and publishesOrderShipped
event. - Notice the Sales endpoint logs that it processed the
OrderShipped
event. - Go to the Sales console and press
f
to simulate message processing failure. - Press
o
to create another order. Notice theOrderShipped
event fails processing in Sales and is moved to the error queue. - Press
f
again to disable message processing failure simulation in Sales. - Go to the Shipping console and press
f
to simulate message processing failure. - Go back to Sales and press
o
to create yet another order. Notice theOrderAccepted
event fails in Shipping and is moved to the error queue. - Press
f
again to disable message processing failure simulation in Shipping. - Open ServicePulse and select the Failed Messages view.
- Notice the existence of one failed message group with two messages. Open the group.
- One of the messages is
OrderAccepted
which failed inShipping
, the other isOrderShipped
which failed inSales
. - Press the "Retry all" button.
- Go to the Shipping console and verify that the
OrderAccepted
event has been successfully processed. - Go to the Sales console and verify that both
OrderShipped
events have been successfully processed. - Shut down the Shipping endpoint.
- Open ServicePulse and notice a red label next to the heart icon. Click on the that icon to open the Endpoints Overview. Notice that the Shipping is now displayed in the Inactive Endpoints tab.
Code walk-through
The code base consists of four projects.
Shared
The Shared project contains the message contracts.
Sales and Shipping
The Sales and Shipping projects contain endpoints that simulate the execution of a business process. The process consists of two messages: a ShipOrder
command sent by Sales and an OrderShipped
reply sent by Shipping.
The Sales and Shipping endpoints include a message processing failure simulation mode (toggled by pressing f
) which can be used to generate failed messages for demonstrating message retry functionality.
The Shipping endpoint has the Heartbeats plugin installed to enable uptime monitoring via ServicePulse.
Both endpoints are configured to use secure connection strings:
Adapter
The Adapter project hosts the ServiceControl.
. The adapter has two sides:
- endpoint facing
- ServiceControl facing
In this sample both use Azure Storage Queues transport:
var transportAdapterConfig =
new TransportAdapterConfig<AzureStorageQueueTransport, AzureStorageQueueTransport>("ServiceControl-ASQ-Adapter");
Azure Storage Queues service doesn't support message headers. NServiceBus transport implements transport headers by using MessageWrapper
to store headers and body as serialized storage message. For adapter to function it has to know how to de-serialize/serialize messages.
var settings = transport.GetSettings();
// Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
var serializer = Tuple.Create(new NewtonsoftJsonSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);
var settings = transport.GetSettings();
// Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
var serializer = Tuple.Create(new NewtonsoftJsonSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);
The following code configures the adapter to match advanced transport features enabled on the endpoints:
transportAdapterConfig.CustomizeEndpointTransport(
customization: transport =>
{
var connectionString = Environment.GetEnvironmentVariable("AzureStorageQueue.ConnectionString.Endpoints");
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new Exception("Could not read 'AzureStorageQueue.ConnectionString.Endpoints' environment variable. Check sample prerequisites.");
}
transport.ConnectionString(connectionString);
transport.DefaultAccountAlias("storage_account");
// Required to address https://github.com/Particular/NServiceBus.AzureStorageQueues/issues/308
transport.AccountRouting().AddAccount("storage_account", connectionString);
var settings = transport.GetSettings();
// Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
var serializer = Tuple.Create(new NewtonsoftJsonSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);
});
While the following code configures the adapter to communicate with ServiceControl:
transportAdapterConfig.CustomizeServiceControlTransport(
customization: transport =>
{
var connectionString = Environment.GetEnvironmentVariable("AzureStorageQueue.ConnectionString.SC");
if (string.IsNullOrWhiteSpace(connectionString))
{
throw new Exception("Could not read 'AzureStorageQueue.ConnectionString.SC' environment variable. Check sample prerequisites.");
}
transport.ConnectionString(connectionString);
var settings = transport.GetSettings();
// Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
var serializer = Tuple.Create(new NewtonsoftJsonSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);
});
Since ServiceControl has been installed under a non-default instance name (Particular.
) the control queue name needs to be overridden in the adapter configuration:
transportAdapterConfig.ServiceControlSideControlQueue = "Particular-ServiceControl-ASQ";
Shipping and Sales use different namespaces, therefore the adapter has to be configured to properly route retried messages:
transportAdapterConfig.RedirectRetriedMessages((failedQ, headers) =>
{
if (headers.TryGetValue(AdapterSpecificHeaders.OriginalStorageAccountAlias, out var storageAccountAlias))
{
return $"{failedQ}@{storageAccountAlias}";
}
return failedQ;
});
The destination address consists of the queue name and the storage account alias which is included in the failed messages:
recoverability.Failed(
customizations: settings =>
{
settings.HeaderCustomization(
customization: headers =>
{
headers[AdapterSpecificHeaders.OriginalStorageAccountAlias] = "storage_account";
});
});