Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring

Manual unsubscribe

This sample shows how to remove subscriptions from message transports that do not have native pub/sub support.

Running the sample

Initial State

Run the sample and the three endpoints (SubscriptionManager, Subscriber, and Publisher) will start.

Subscriber initially subscribes to the SomethingHappened message in Publisher.

Publish event

Hit enter in Publisher and a SomethingHappened event will be published via the following process:

sequenceDiagram Participant Subscriber Participant Publisher Participant Persistence Note over Publisher: Publish "SomethingHappened" Publisher ->> Persistence: Query subscribers for "SomethingHappened" event Persistence ->> Publisher: "Subscriber" Publisher ->> Subscriber: Send "SomethingHappened" event

The publisher queries the persistence to see if there are any subscribers for the event. The persistence returns the list of subscribers for the SomethingHappened event, in this case Subscriber. Publisher sends the SomethingHappened event to Subscriber.

Unsubscribe

Hit enter on SubscriptionManager. A ManualUnsubscribe command for SomethingHappened will be sent to Publisher.

sequenceDiagram Participant SubscriptionManager AS SubscriptionManager Participant Publisher Participant Persistence SubscriptionManager ->> Publisher: Send "ManualUnsubscribe" command for "SomethingHappened" Publisher ->> Persistence: Remove "SomethingHappened" subscription for "Subscriber"

Now hit enter in Publisher. The publisher queries the persistence to see if there are any subscribers for the event. The persistence returns no subscribers. SomethingHappened will not be published to any endpoints.

sequenceDiagram Participant Publisher Participant Persistence Note over Publisher: Publish SomethingHappened Publisher ->> Persistence: Query subscribers for "SomethingHappened" event Persistence ->> Publisher: "No subscribers" Note over Publisher: No Send

Sample solution structure

Subscriber

An endpoint subscribed to an event that will be published by Publisher:

class SomethingHappenedHandler :
    IHandleMessages<SomethingHappened>
{
    static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();
    public Task Handle(SomethingHappened message, IMessageHandlerContext context)
    {
        log.Info("Subscriber has received SomethingHappened event.");
        return Task.CompletedTask;
    }
}

Publisher

The publisher configuration

var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.Publisher");
endpointConfiguration.UseTransport(new MsmqTransport());

var persistence = endpointConfiguration.UsePersistence<MsmqPersistence, StorageType.Subscriptions>();
persistence.SubscriptionQueue("Samples.ManualUnsubscribe.Publisher.Subscriptions");

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");

endpointConfiguration.EnableInstallers();

This sample is configured to use MSMQ Subscription Persistence.

Subscriber decommissioning

Publishers have no way to detect that a subscriber is no longer available when a subscriber endpoint is decommissioned. If subscriptions aren't removed, publishers will continue to publish events even for subscribers that no longer exist. This behavior will eventually lead to storage and quota issues.

To remove a subscription, send a ManualUnsubscribe message to the Publisher from the SubscriptionManager by hitting enter:

public class ManualUnsubscribe :
    IMessage
{
    public string MessageTypeName { get; set; }
    public string SubscriberEndpoint { get; set; }
}

Unsubscribe process

The Publisher handles the ManualUnsubscribe message. This handling consists of three parts: Handler, Persistence, and Unsubscribe.

1. Handler

The ManualUnsubscribeHandler message handler relies on the ISubscriptionStorage NServiceBus abstraction to perform the unsubscribe request regardless of the subscription storage configured for the publisher.

class ManualUnsubscribeHandler :
    IHandleMessages<ManualUnsubscribe>
{
    ISubscriptionStorage subscriptionStorage;

    public ManualUnsubscribeHandler(ISubscriptionStorage subscriptionStorage)
    {
        this.subscriptionStorage = subscriptionStorage;
    }

    public async Task Handle(ManualUnsubscribe message, IMessageHandlerContext context)
    {
        var emptyContext = new ContextBag();
        var type = Type.GetType(message.MessageTypeName, true);
        var messageType = new MessageType(type);
        var addressesForEndpoint = await GetAddressesForEndpoint(message.SubscriberEndpoint, messageType, emptyContext);
        await UnsubscribeFromEndpoint(addressesForEndpoint, messageType, emptyContext);
    }
2. Persistence

The handler then queries the persistence for all subscriptions that match the message type and endpoint name.

async Task<IEnumerable<Subscriber>> GetAddressesForEndpoint(string endpoint, MessageType messageType, ContextBag emptyContext)
{
    var messageTypes = new List<MessageType>
    {
        messageType
    };
    var addressesForMessage = await subscriptionStorage.GetSubscriberAddressesForMessage(messageTypes, emptyContext);
    return addressesForMessage
        .Where(subscriber =>
        {
            return string.Equals(subscriber.Endpoint, endpoint, StringComparison.OrdinalIgnoreCase);
        });
}
3. Unsubscribe

The handler then removes the subscription from the persistence.

Task UnsubscribeFromEndpoint(IEnumerable<Subscriber> addressesForEndpoint, MessageType messageType, ContextBag emptyContext)
{
    var tasks = addressesForEndpoint
        .Select(address => subscriptionStorage.Unsubscribe(
            subscriber: address,
            messageType: messageType,
            context: emptyContext
        ));
    return Task.WhenAll(tasks);
}

SubscriptionManager

SubscriptionManager is an endpoint instance that can be used to send unsubscribe requests to publishers whenever a subscriber is decommissioned:

Console.WriteLine("Press any key to unsubscribe 'Subscriber' from 'Publisher'");
Console.ReadKey();
var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.SubscriptionManager");
endpointConfiguration.UseTransport(new MsmqTransport());
endpointConfiguration.UsePersistence<NonDurablePersistence>();
endpointConfiguration.SendOnly();

var endpointInstance = await Endpoint.Start(endpointConfiguration);
var typeToUnscubscribe = typeof(SomethingHappened);
var unsubscribeMessage = new ManualUnsubscribe
{
    MessageTypeName = typeToUnscubscribe.AssemblyQualifiedName,
    SubscriberEndpoint = "Samples.ManualUnsubscribe.Subscriber"
};
await endpointInstance.Send("Samples.ManualUnsubscribe.Publisher", unsubscribeMessage);

await endpointInstance.Stop();
Console.WriteLine("Unsubscribe message sent. Press any other key to exit");
Console.ReadKey();

Messages

The project contains the shared messages and events used by this sample.

Related Articles