Manual unsubscribe

Component: NServiceBus
NuGet Package NServiceBus (6.x)

This sample shows how to manually remove subscriptions when subscribers are decommissioned. The solution comprises of 4 projects.

While this sample uses the MSMQ transport, the concepts shown are valid for all transports based on message driven subscriptions and that don't support native pub/sub. For more information see Publish-Subscribe.

Running the sample

Initial State

Run the sample and all three endpoints will start.

Subscriber initially subscribes to the SomethingHappened message in Publisher.

Publish event

Hit enter in Publisher and a SomethingHappened will be published via the following process:

sequenceDiagram Participant Subscriber Participant Publisher Participant Persistence Note over Publisher: Publish SomethingHappened Publisher ->> Persistence: Requests "who wants SomethingHappened" Persistence ->> Publisher: "Subscribe" Publisher ->> Subscriber: Send SomethingHappened

The SomethingHappened event will be received by Subscriber.

Unsubscribe

Hit enter on SubscriptionManager. A ManualUnsubscribe message for SomethingHappened will be sent to Publisher.

sequenceDiagram Participant SubscriptionManager AS SubscriptionManager Participant Publisher Participant Persistence SubscriptionManager ->> Publisher: Send unsubscribe "SomethingHappened" Publisher ->> Persistence: Store "unsubscribe from SomethingHappened" Note over Publisher: Publish SomethingHappened Publisher ->> Persistence: Requests "who wants SomethingHappened" Persistence ->> Publisher: "No Endpoints" Note over Publisher: No Send

Now hit enter in Publisher and no SomethingHappened will be published.

Solution structure

Subscriber

A sample endpoint subscribed to an event that will be published by Publisher:

class SomethingHappenedHandler :
    IHandleMessages<SomethingHappened>
{
    static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();
    public Task Handle(SomethingHappened message, IMessageHandlerContext context)
    {
        log.Info("Subscriber has received SomethingHappened event.");
        return Task.CompletedTask;
    }
}

Publisher

The publisher configuration

var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.Publisher");
endpointConfiguration.UseTransport<MsmqTransport>();

var persistence = endpointConfiguration.UsePersistence<MsmqPersistence, StorageType.Subscriptions>();
persistence.SubscriptionQueue("Samples.ManualUnsubscribe.Publisher.Subscriptions");

endpointConfiguration.DisableFeature<TimeoutManager>();

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");

This specific sample is configured to use MSMQ Subscription Persistence.

Subscriber decommissioning

In such a configuration, when a subscriber endpoint is decommissioned it may happen that subscriptions remain stored at the publisher. Publishers have no way to detect that a subscriber is no longer available. They will continue to publish events even for subscribers that no longer exist. This behavior will eventually lead to storage and quota issues.

To remove a subscription a message can be sent to the publisher:

public class ManualUnsubscribe :
    IMessage
{
    public string MessageTypeName { get; set; }
    public string SubscriberEndpoint { get; set; }
}

Unsubscribe process

The Publisher handles the ManualUnsubscribe message. This handling consists of three parts:

1. Handler

The actual IHandleMessages<ManualUnsubscribe> implementation.

class ManualUnsubscribeHandler :
    IHandleMessages<ManualUnsubscribe>
{
    ISubscriptionStorage subscriptionStorage;

    public ManualUnsubscribeHandler(ISubscriptionStorage subscriptionStorage)
    {
        this.subscriptionStorage = subscriptionStorage;
    }

    public async Task Handle(ManualUnsubscribe message, IMessageHandlerContext context)
    {
        var emptyContext = new ContextBag();
        var type = Type.GetType(message.MessageTypeName, true);
        var messageType = new MessageType(type);
        var addressesForEndpoint = await GetAddressesForEndpoint(message.SubscriberEndpoint, messageType, emptyContext)
            .ConfigureAwait(false);
        await UnsubscribeFromEndpoint(addressesForEndpoint, messageType, emptyContext)
            .ConfigureAwait(false);
    }

The message handler relies on the ISubscriptionStorage NServiceBus abstraction to perform the unsubscribe request regardless of the subscription storage configured for the publisher.

2. Query persistence for subscriptions

The handler then query the persistence for all subscriptions that match the message type and endpoint name.

async Task<IEnumerable<Subscriber>> GetAddressesForEndpoint(string endpoint, MessageType messageType, ContextBag emptyContext)
{
    var messageTypes = new List<MessageType>
    {
        messageType
    };
    var addressesForMessage = await subscriptionStorage.GetSubscriberAddressesForMessage(messageTypes, emptyContext)
        .ConfigureAwait(false);
    return addressesForMessage
        .Where(subscriber =>
        {
            return string.Equals(subscriber.Endpoint, endpoint, StringComparison.OrdinalIgnoreCase);
        });
}

3. Unsubscribe

The handler then communicates the unsubscribe actions with the persistence.

Task UnsubscribeFromEndpoint(IEnumerable<Subscriber> addressesForEndpoint, MessageType messageType, ContextBag emptyContext)
{
    var tasks = addressesForEndpoint
        .Select(address => subscriptionStorage.Unsubscribe(
            subscriber: address,
            messageType: messageType,
            context: emptyContext
        ));
    return Task.WhenAll(tasks);
}

SubscriptionManager

SubscriptionManager is a sample endpoint instance that can be used by operations or DevOps personnel to send unsubscribe requests to publishers whenever a subscriber is decommissioned:

Console.WriteLine("Press any key to unsubscribe 'Subscriber' from 'Publisher'");
Console.ReadKey();
var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.SubscriptionManager");
endpointConfiguration.UseTransport<MsmqTransport>();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.SendOnly();

var endpointInstance = await Endpoint.Start(endpointConfiguration)
    .ConfigureAwait(false);
var typeToUnscubscribe = typeof(SomethingHappened);
var unsubscribeMessage = new ManualUnsubscribe
{
    MessageTypeName = typeToUnscubscribe.AssemblyQualifiedName,
    SubscriberEndpoint = "Samples.ManualUnsubscribe.Subscriber"
};
await endpointInstance.Send("Samples.ManualUnsubscribe.Publisher", unsubscribeMessage)
    .ConfigureAwait(false);

await endpointInstance.Stop()
    .ConfigureAwait(false);
Console.WriteLine("Unsubscribe message sent. Press any other key to exit");
Console.ReadKey();
This sample uses a stand alone send-only endpoint to send the unsubscribe message. However that message could also be sent from any endpoint, process, or script that has the knowledge of the endpoint and message to unsubscribe from.

Messages

The shared messages and events used by this sample.

Related Articles


Last modified