Monitor Azure Service Bus endpoints with the ServiceControl adapter

Component: ServiceControl Transport Adapter
NuGet Package ServiceControl.TransportAdapter (1.x)
Target NServiceBus Version: 6.x

This sample shows how to configure ServiceControl to monitor endpoints and retry messages when using the advanced features of the Azure Service Bus transport not natively supported by ServiceControl.

The following diagram shows the topology of the solution:

graph RL subgraph Namespace 2 shipping["fa:fa-truck Shipping"] end subgraph Namespace 1 sales["fa:fa-money Sales"] end subgraph Namespace 3 sc["fa:fa-wrench Service Control"] end adapter{"fa:fa-exchange Adapter"} sales ==> adapter adapter .-> sales shipping==>adapter adapter .-> shipping adapter==>sc sc .-> adapter

Notice that Sales and Shipping are in two different namespaces. This is done by using cross-namespace routing. The other important thing to note is that ServiceControl is in a different namespace from the other endpoints, which means that it can't natively communicate with them, which is why this sample shows how to create an adapter to bridge between everything.

The adapter also deals with other advanced features of the Azure Service Bus transport like secure connection strings and customized brokered message creation.

Prerequisistes

An environment variable named AzureServiceBus.ConnectionString with the connection string for the Azure Service Bus namespace.

  1. An environment variable named AzureServiceBus.ConnectionString.1 with the connection string for the Azure Service Bus namespace to be used by Sales endpoint.
  2. An environment variable named AzureServiceBus.ConnectionString.2 with the connection string for the Azure Service Bus namespace to be used by Shipping endpoint.
  3. An environment variable named AzureServiceBus.ConnectionString.SC with the connection string for the Azure Service Bus namespace to be used by ServiceControl and the adapter.
  4. Install ServiceControl.
  5. Using ServiceControl Management tool, set up ServiceControl to monitor endpoints using Azure Service Bus transport:
  • Add a new ServiceControl instance:
  • Use Particular.ServiceControl.ASB as the instance name (ensure there is no other instance of SC running with the same name).
  • Use connection string supplied with the AzureServiceBus.ConnectionString.SC environment variable.
If other ServiceControl instances have been running on this machine, it's necessary to specify a non-default port number for API. Adjust ServicePulse settings accordingly to point to this location.
  1. Ensure the ServiceControl process is running before running the sample.
  2. Install ServicePulse

Running the project

  1. Start the projects: Adapter, Sales and Shipping. Ensure the adapter starts first because on start-up it creates a queue that is used for heartbeats.
  2. Open ServicePulse (by default it's available at http://localhost:9090/#/dashboard) and select the Endpoints Overview. The Shipping endpoint should be visible in the Active Endpoints tab as it has the Heartbeats plugin installed.
  3. Go to the Sales console and press o to create an order.
  4. Notice the Shipping endpoint receives the OrderAccepted event from Sales and publishes OrderShipped event.
  5. Notice the Sales endpoint logs that it processed the OrderShipped event.
  6. Go to the Sales console and press f to simulate message processing failure.
  7. Press o to create another order. Notice the OrderShipped event fails processing in Sales and is moved to the error queue.
  8. Press f again to disable message processing failure simulation in Sales.
  9. Go to the Shipping console and press f to simulate message processing failure.
  10. Go back to Sales and press o to create yet another order. Notice the OrderAccepted event fails in Shipping and is moved to the error queue.
  11. Press f again to disable message processing failure simulation in Shipping.
  12. Open ServicePulse and select the Failed Messages view.
  13. Notice the existence of one failed message group with two messages. Open the group.
  14. One of the messages is OrderAccepted which failed in Shipping, the other is OrderShipped which failed in Sales.
  15. Press the "Retry all" button.
  16. Go to the Shipping console and verify that the OrderAccepted event has been successfully processed.
  17. Go to the Sales console and verify that both OrderShipped events have been successfully processed.
  18. Shut down the Shipping endpoint.
  19. Open ServicePulse and notice a red label next to the heart icon. Click on the that icon to open the Endpoints Overview. Notice that the Shipping is now displayed in the Inactive Endpoints tab.

Code walk-through

The code base consists of four projects.

Shared

The Shared project contains the message contracts.

Sales and Shipping

The Sales and Shipping projects contain endpoints that simulate the execution of a business process. The process consists of two messages: ShipOrder command sent by Sales and OrderShipped reply sent by Shipping.

The Sales and Shipping endpoints include a message processing failure simulation mode (toggled by pressing f) which can be used to generate failed messages for demonstrating message retry functionality.

The Shipping endpoint has the Heartbeats plugin installed to enable uptime monitoring via ServicePulse.

Both endpoints are configured to use:

transport.UseNamespaceAliasesInsteadOfConnectionStrings();
transport.DefaultNamespaceAlias("sales");
var routing = transport.NamespaceRouting();
routing.AddNamespace("shipping", shippingConnectionString);
transport.UseForwardingTopology();
transport.BrokeredMessageBodyType(SupportedBrokeredMessageBodyTypes.Stream);
var composition = transport.Composition();
composition.UseStrategy<HierarchyComposition>()
    .PathGenerator(path => "scadapter/");

Adapter

The Adapter project hosts the ServiceControl.TransportAdapter. The adapter has two sides:

  • endpoint facing
  • ServiceControl facing

In this sample both use Azure Service Bus transport:

var transportAdapterConfig = new TransportAdapterConfig<AzureServiceBusTransport, AzureServiceBusTransport>("ServiceControl.ASB.Adapter");

The following code configures the adapter to match advanced transport features enabled on the endpoints:

transportAdapterConfig.CustomizeEndpointTransport(
    customization: transport =>
    {
        var salesConnectionString = Environment.GetEnvironmentVariable("AzureServiceBus.ConnectionString.1");
        if (string.IsNullOrWhiteSpace(salesConnectionString))
        {
            throw new Exception("Could not read 'AzureServiceBus.ConnectionString.1' environment variable. Check sample prerequisites.");
        }
        var shippingConnectionString = Environment.GetEnvironmentVariable("AzureServiceBus.ConnectionString.2");
        if (string.IsNullOrWhiteSpace(shippingConnectionString))
        {
            throw new Exception("Could not read 'AzureServiceBus.ConnectionString.2' environment variable. Check sample prerequisites.");
        }

        transport.UseNamespaceAliasesInsteadOfConnectionStrings();
        var namespacePartitioning = transport.NamespacePartitioning();
        namespacePartitioning.AddNamespace("sales", salesConnectionString);
        namespacePartitioning.AddNamespace("shipping", shippingConnectionString);
        namespacePartitioning.UseStrategy<RoundRobinNamespacePartitioning>();
        transport.UseForwardingTopology();
        var composition = transport.Composition();
        composition.UseStrategy<HierarchyComposition>()
            .PathGenerator(path => "scadapter/");
    });

While the following code configures the adapter to communicate with ServiceControl:

transportAdapterConfig.CustomizeServiceControlTransport(
    customization: transport =>
    {
        var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus.ConnectionString.SC");
        if (!string.IsNullOrWhiteSpace(connectionString))
        {
            transport.ConnectionString(connectionString);
            transport.UseEndpointOrientedTopology();
            return;
        }
        throw new Exception("Could not read 'AzureServiceBus.ConnectionString.SC' environment variable. Check sample prerequisites.");
    });

Since ServiceControl has been installed under a non-default instance name (Particular.ServiceControl.ASB) the control queue name needs to be overridden in the adapter configuration:

transportAdapterConfig.ServiceControlSideControlQueue = "Particular.ServiceControl.ASB";

Shipping and Sales use different namespaces, therefore the adapter has to be configured to properly route retried messages:

transportAdapterConfig.RedirectRetriedMessages((failedQ, headers) =>
{
    if (headers.TryGetValue(AdapterSpecificHeaders.OriginalNamespace, out var namespaceAlias))
    {
        return $"{failedQ}@{namespaceAlias}";
    }
    return failedQ;
});

The destination address consists of the queue name and the namespace alias which is included in the failed messages:

recoverability.Failed(
    customizations: settings =>
    {
        settings.HeaderCustomization(
            customization: headers =>
            {
                headers[AdapterSpecificHeaders.OriginalNamespace] = "sales";
            });
    });

Related Articles


Last modified