Custom ASB Namespace Partitioning

This page refers to the legacy Azure Service Bus transport, which is rendered obsolete by the Azure Service Bus transport built to target both .NET Framework and .NET Core. All new projects should use the new Azure Service Bus transport.

Prerequisites

Two environment variables named AzureServiceBus.ConnectionString1 and AzureServiceBus.ConnectionString2 with a different connection string to an Azure Service Bus namespace each.

Azure Service Bus Transport

This sample utilizes the Azure Service Bus Transport (Legacy).

Code walk-through

This sample has three endpoints

  • Publisher
  • Subscriber1
  • Subscriber2

Publisher

Publisher publishes SomeEvent.

public class SomeEvent :
    IEvent
{
    public Guid EventId { get; set; }
}

using a custom partitionining strategy registered at startup time. Events are published to the appropriate namespace(s) based on the registered strategy which decides that events should be published to multiple namespaces.

By default, the sample is configured to use the data distribution strategy. This strategy publishes events to all registered namespaces.

An alternate strategy is Round-Robin with failover. This strategy demonstrates how to achieve high availability and disaster recoverability.

Subscriber

There are 2 identical instances of Subscriber, each instance subscribes to only one of the namespaces using the default SingleNamespacePartitioningStrategy and handles SomeEvent.

Creating a custom partitioning strategy

Data distribution strategy

DataDistributionPartitioningStrategy is used which simply decides that all namespaces configured should be used for all intents. In other words: all sends, receives and create operations will be executed on all registered namespaces.

public class DataDistributionPartitioningStrategy :
    INamespacePartitioningStrategy
{
    static ILog log = LogManager.GetLogger<DataDistributionPartitioningStrategy>();
    NamespaceConfigurations namespaces;

    public DataDistributionPartitioningStrategy(ReadOnlySettings settings)
    {
        if (settings.TryGet("AzureServiceBus.Settings.Topology.Addressing.Namespaces", out namespaces) && namespaces.Count > 1)
        {
            return;
        }
        throw new Exception("The 'Replicated' namespace partitioning strategy requires more than one namespace. Configure additional connection strings");
    }

    public IEnumerable<RuntimeNamespaceInfo> GetNamespaces(PartitioningIntent partitioningIntent)
    {
        log.Info($"Determining namespace for {partitioningIntent}");
        return namespaces.Select(selector: namespaceInfo =>
            {
                log.Info($"Choosing namespace {namespaceInfo.Alias} ({namespaceInfo.Connection})");
                return new RuntimeNamespaceInfo(
                    alias: namespaceInfo.Alias,
                    connectionString: namespaceInfo.Connection,
                    purpose: NamespacePurpose.Partitioning,
                    mode: NamespaceMode.Active);
            });
    }

    public bool SendingNamespacesCanBeCached { get; } = true;
}

Round-Robin with failover strategy

RoundRobinWithFailoverPartitioningStrategy alternates between two namespaces to distribute load among namespaces and ensure no throttling is taking place on the Azure Service Bus service level. This achieves high availability. Should one of the namespace fail, the strategy will fail over to use the other namespace. This achieves disaster recovery.

public class RoundRobinWithFailoverPartitioningStrategy : 
    INamespacePartitioningStrategy
{
    readonly CircularBuffer<RuntimeNamespaceInfo[]> _namespaces;

    public RoundRobinWithFailoverPartitioningStrategy(ReadOnlySettings settings)
    {
        if (!settings.TryGet("AzureServiceBus.Settings.Topology.Addressing.Namespaces",
            out NamespaceConfigurations namespaces))
        {
            throw new Exception($"The '{nameof(RoundRobinWithFailoverPartitioningStrategy)}' strategy requires exactly two namespaces to be configured, please use {nameof(AzureServiceBusTransportExtensions.NamespacePartitioning)}().{nameof(AzureServiceBusNamespacePartitioningSettings.AddNamespace)}() to register the namespaces.");
        }
        var partitioningNamespaces = namespaces.Where(x => x.Purpose == NamespacePurpose.Partitioning).ToList();
        if (partitioningNamespaces.Count != 2)
        {
            throw new Exception($"The '{nameof(RoundRobinWithFailoverPartitioningStrategy)}' strategy requires exactly two namespaces to be configured, please use {nameof(AzureServiceBusTransportExtensions.NamespacePartitioning)}().{nameof(AzureServiceBusNamespacePartitioningSettings.AddNamespace)}() to register the namespaces.");
        }
        _namespaces = new CircularBuffer<RuntimeNamespaceInfo[]>(partitioningNamespaces.Count);
        var first = namespaces.First();
        var second = namespaces.Last();

        _namespaces.Put(new[]
        {
            new RuntimeNamespaceInfo(first.Alias, first.Connection, first.Purpose, NamespaceMode.Active),
            new RuntimeNamespaceInfo(second.Alias, second.Connection, second.Purpose, NamespaceMode.Passive),
        });
        _namespaces.Put(new[]
        {
            new RuntimeNamespaceInfo(first.Alias, first.Connection, first.Purpose, NamespaceMode.Passive),
            new RuntimeNamespaceInfo(second.Alias, second.Connection, second.Purpose, NamespaceMode.Active),
        });
    }
    public IEnumerable<RuntimeNamespaceInfo> GetNamespaces(PartitioningIntent partitioningIntent)
    {
        return _namespaces.Get();
    }

    public bool SendingNamespacesCanBeCached { get; } = false;
}

Registering custom strategy

A custom namespace strategy is registered using NamespacePartitioning().UseStrategy<T>. Note that multiple namespaces can be registered using the NamespacePartitioning().AddNamespace() API:

var namespacePartitioning = transport.NamespacePartitioning();
namespacePartitioning.AddNamespace("namespace1", connectionString1);
namespacePartitioning.AddNamespace("namespace2", connectionString2);
namespacePartitioning.UseStrategy<DataDistributionPartitioningStrategy>();

or

namespacePartitioning.UseStrategy<RoundRobinWithFailoverPartitioningStrategy>();

Running the sample

  • Start 2 instances of this subscriber each with a different namespace.
  • Start 1 instance of the publisher
  • Both subscribers will receive the event

Choosing partitioning strategy

DataDistributionPartitioningStrategy is the default strategy and can be replaced with RoundRobinWithFailoverPartitioningStrategy upon Publisher startup.

Emulating namespace failure

The namespace failure can be toggled to emulate an outage for one of the namespaces represented by AzureServiceBus.ConnectionString1 connection string. When the namespace is in failed state, events published to the namespace with connection string AzureServiceBus.ConnectionString1 should be delivered to the second namespace (namespace with connection string AzureServiceBus.ConnectionString2).

Samples

Related Articles


Last modified