Monitor Azure Storage Queues endpoints with ServiceControl adapter

Component: ServiceControl Transport Adapter
NuGet Package ServiceControl.TransportAdapter (2-pre)
Target NServiceBus Version: 7.x
This page targets a pre-release version and is subject to change prior to the final release.

This sample shows how to configure ServiceControl to monitor endpoints and retry messages when using the advanced features of the Azure Storage Queues transport not natively supported by ServiceControl.

The following diagram shows the topology of the solution:

graph RL subgraph Namespace 1 sales["fa:fa-money Sales"] shipping["fa:fa-truck Shipping"] end subgraph Namespace 2 sc["fa:fa-wrench Service Control"] end adapter{"fa:fa-exchange Adapter"} sales ==> adapter adapter .-> sales shipping==>adapter adapter .-> shipping adapter==>sc sc .-> adapter

Notice that ServiceControl is in a different namespace than the other endpoints, which means that it can't natively communicate with the Sales and Shipping endpoints. This sample shows how to create an adapter to bridge the gap.

The adapter also deals with advanced features of the Azure Storage Queues transport such as multi-storage accounts and secure connection strings.

Prerequisites

  1. An environment variable named AzureStorageQueue.ConnectionString.1 with the connection string for the Azure Storage Queues account to be used by Sales and Shipping endpoints.
  2. An environment variable named AzureStorageQueue.ConnectionString.SC with the connection string for the Azure Storage Queues account to be used by ServiceControl and the adapter.
  3. Install ServiceControl.
  4. Using ServiceControl Management, set up ServiceControl to monitor endpoints using Azure Storage Queues transport:
  • Add a new ServiceControl instance:
  • Use Particular.ServiceControl.ASQ as the instance name (ensure there is no other instance of SC running with the same name).
  • Use the connection string supplied by the AzureStorageQueue.ConnectionString.SC environment variable.
If other ServiceControl instances have been running on this machine, it's necessary to specify a non-default instance name and port number. Adjust ServicePulse settings accordingly to point to this location.
  1. Ensure the ServiceControl process is running before running the sample.
  2. Install ServicePulse

Running the project

  1. Start the projects: Adapter, Sales and Shipping. Ensure the adapter starts first because on start-up it creates a queue that is used for heartbeats.
  2. Open ServicePulse (by default it's available at http://localhost:9090/#/dashboard) and select the Endpoints Overview. The Shipping endpoint should be visible in the Active Endpoints tab as it has the Heartbeats plugin installed.
  3. Go to the Sales console and press o to create an order.
  4. Notice the Shipping endpoint receives the OrderAccepted event from Sales and publishes OrderShipped event.
  5. Notice the Sales endpoint logs that it processed the OrderShipped event.
  6. Go to the Sales console and press f to simulate message processing failure.
  7. Press o to create another order. Notice the OrderShipped event fails processing in Sales and is moved to the error queue.
  8. Press f again to disable message processing failure simulation in Sales.
  9. Go to the Shipping console and press f to simulate message processing failure.
  10. Go back to Sales and press o to create yet another order. Notice the OrderAccepted event fails in Shipping and is moved to the error queue.
  11. Press f again to disable message processing failure simulation in Shipping.
  12. Open ServicePulse and select the Failed Messages view.
  13. Notice the existence of one failed message group with two messages. Open the group.
  14. One of the messages is OrderAccepted which failed in Shipping, the other is OrderShipped which failed in Sales.
  15. Press the "Retry all" button.
  16. Go to the Shipping console and verify that the OrderAccepted event has been successfully processed.
  17. Go to the Sales console and verify that both OrderShipped events have been successfully processed.
  18. Shut down the Shipping endpoint.
  19. Open ServicePulse and notice a red label next to the heart icon. Click on the that icon to open the Endpoints Overview. Notice that the Shipping is now displayed in the Inactive Endpoints tab.

Code walk-through

The code base consists of four projects.

Shared

The Shared project contains the message contracts.

Sales and Shipping

The Sales and Shipping projects contain endpoints that simulate the execution of a business process. The process consists of two messages: a ShipOrder command sent by Sales and an OrderShipped reply sent by Shipping.

The Sales and Shipping endpoints include a message processing failure simulation mode (toggled by pressing f) which can be used to generate failed messages for demonstrating message retry functionality.

The Shipping endpoint has the Heartbeats plugin installed to enable uptime monitoring via ServicePulse.

Both endpoints are configured to use secure connection strings:

transport.UseAccountAliasesInsteadOfConnectionStrings();

Adapter

The Adapter project hosts the ServiceControl.TransportAdapter. The adapter has two sides:

  • endpoint facing
  • ServiceControl facing

In this sample both use Azure Storage Queues transport:

var transportAdapterConfig =
    new TransportAdapterConfig<AzureStorageQueueTransport, AzureStorageQueueTransport>("ServiceControl.ASQ.Adapter");

Azure Storage Queues service doesn't support message headers. NServiceBus transport implements transport headers by using MessageWrapper to store headers and body as serialized storage message. For adapter to function it has to know how to de-serialize/serialize messages.

2-pre ServiceControl.TransportAdapter
var settings = transport.GetSettings();

// Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);
2-pre ServiceControl.TransportAdapter
var settings = transport.GetSettings();

// Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
settings.Set("MainSerializer", serializer);

The following code configures the adapter to match advanced transport features enabled on the endpoints:

transportAdapterConfig.CustomizeEndpointTransport(
    customization: transport =>
    {
        var connectionString = Environment.GetEnvironmentVariable("AzureStorageQueue.ConnectionString.Endpoints");
        if (string.IsNullOrWhiteSpace(connectionString))
        {
            throw new Exception("Could not read 'AzureStorageQueue.ConnectionString.Endpoints' environment variable. Check sample prerequisites.");
        }

        transport.ConnectionString(connectionString);
        transport.UseAccountAliasesInsteadOfConnectionStrings();
        transport.DefaultAccountAlias("storage_account");

        // Required to address https://github.com/Particular/NServiceBus.AzureStorageQueues/issues/308
        transport.AccountRouting().AddAccount("storage_account", connectionString);


        var settings = transport.GetSettings();

        // Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
        var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
        settings.Set("MainSerializer", serializer);

    });

While the following code configures the adapter to communicate with ServiceControl:

transportAdapterConfig.CustomizeServiceControlTransport(
    customization: transport =>
    {
        var connectionString = Environment.GetEnvironmentVariable("AzureStorageQueue.ConnectionString.SC");
        if (string.IsNullOrWhiteSpace(connectionString))
        {
            throw new Exception("Could not read 'AzureStorageQueue.ConnectionString.SC' environment variable. Check sample prerequisites.");
        }

        transport.ConnectionString(connectionString);


        var settings = transport.GetSettings();

        // Register serializer used to serialize MessageWrapper (custom MessageWrapper serializer or endpoint's serializer different than JSON)
        var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
        settings.Set("MainSerializer", serializer);

    });

Since ServiceControl has been installed under a non-default instance name (Particular.ServiceControl.ASQ) the control queue name needs to be overridden in the adapter configuration:

transportAdapterConfig.ServiceControlSideControlQueue = "Particular.ServiceControl.ASQ";

Shipping and Sales use different namespaces, therefore the adapter has to be configured to properly route retried messages:

transportAdapterConfig.RedirectRetriedMessages((failedQ, headers) =>
{
    if (headers.TryGetValue(AdapterSpecificHeaders.OriginalStorageAccountAlias, out var storageAccountAlias))
    {
        return $"{failedQ}@{storageAccountAlias}";
    }
    return failedQ;
});

The destination address consists of the queue name and the storage account alias which is included in the failed messages:

recoverability.Failed(
    customizations: settings =>
    {
        settings.HeaderCustomization(
            customization: headers =>
            {
                headers[AdapterSpecificHeaders.OriginalStorageAccountAlias] = "storage_account";
            });
    });

Related Articles


Last modified