Custom ASB Namespace Partitioning

Component: Azure Service Bus Transport
NuGet Package NServiceBus.Azure.Transports.WindowsAzureServiceBus (7.x)
Target NServiceBus Version: 6.x

Prerequisites

2 environment variables named AzureServiceBus.ConnectionString1 and AzureServiceBus.ConnectionString2 with a different connection string to an Azure Service Bus namespace each.

Azure Service Bus Transport

This sample utilizes the Azure Service Bus Transport.

Code walk-through

This sample has two endpoints

  • Publisher
  • Subscriber

Publisher

Publisher publishes SomeEvent.

public class SomeEvent :
    IEvent
{
    public Guid EventId { get; set; }
}

using a custom partitionining strategy, named replicated partitioning strategy, which decides that events should be published to multiple namespaces.

Subscriber

There are 2 identical instances of Subscriber, each instance subscribes to only one of the namespaces using the default SingleNamespacePartitioningStrategy and handles SomeEvent.

Creating a custom partitioning strategy

For the purpose of this sample, a custom partitioning strategy is used which simply decides that all namespaces configured should be used for all intents. In other words: all sends, receives and create operations will be executed on all registered namespaces.

public class ReplicatedNamespacePartitioningStrategy :
    INamespacePartitioningStrategy
{
    static ILog log = LogManager.GetLogger<ReplicatedNamespacePartitioningStrategy>();
    NamespaceConfigurations namespaces;

    public ReplicatedNamespacePartitioningStrategy(ReadOnlySettings settings)
    {
        if (
            settings.TryGet("AzureServiceBus.Settings.Topology.Addressing.Namespaces", out namespaces) &&
            namespaces.Count > 1
            )
        {
            return;
        }
        throw new Exception("The 'Replicated' namespace partitioning strategy requires more than one namespace. Configure additional connection strings");
    }

    public IEnumerable<RuntimeNamespaceInfo> GetNamespaces(PartitioningIntent partitioningIntent)
    {
        log.Info($"Determining namespace for {partitioningIntent}");
        return namespaces.Select(
            selector: namespaceInfo =>
            {
                log.Info($"Choosing namespace {namespaceInfo.Alias} ({namespaceInfo.ConnectionString})");
                return new RuntimeNamespaceInfo(
                    alias: namespaceInfo.Alias,
                    connectionString: namespaceInfo.ConnectionString,
                    purpose: NamespacePurpose.Partitioning,
                    mode: NamespaceMode.Active);
            });
    }
}

Registering custom strategy

A custom namespace strategy is registered using NamespacePartitioning().UseStrategy<T>. Note that multiple namespaces can be registered using the NamespacePartitioning().AddNamespace() API:

var namespacePartitioning = transport.NamespacePartitioning();
namespacePartitioning.AddNamespace("namespace1", connectionString1);
namespacePartitioning.AddNamespace("namespace2", connectionString2);
namespacePartitioning.UseStrategy<ReplicatedNamespacePartitioningStrategy>();

Running the sample

  • Start 2 instances of this subscriber each with a different namespace.
  • Start 1 instance of the publisher
  • Both subscribers will receive the event

Samples

Related Articles


Last modified