This sample shows how to manually remove subscriptions when subscribers are decommissioned. The solution comprises of 4 projects.
Running the sample
Initial State
Run the sample and all three endpoints will start.
Subscriber initially subscribes to the SomethingHappened
message in Publisher.
Publish event
Hit enter
in Publisher and a SomethingHappened
will be published via the following process:
The SomethingHappened
event will be received by Subscriber.
Unsubscribe
Hit enter on SubscriptionManager. A ManualUnsubscribe
message for SomethingHappened
will be sent to Publisher.
Now hit enter
in Publisher and no SomethingHappened
will be published.
Solution structure
Subscriber
A sample endpoint subscribed to an event that will be published by Publisher
:
class SomethingHappenedHandler :
IHandleMessages<SomethingHappened>
{
static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();
public Task Handle(SomethingHappened message, IMessageHandlerContext context)
{
log.Info("Subscriber has received SomethingHappened event.");
return Task.CompletedTask;
}
}
Publisher
The publisher configuration
var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.Publisher");
endpointConfiguration.UseTransport(new MsmqTransport());
var persistence = endpointConfiguration.UsePersistence<MsmqPersistence, StorageType.Subscriptions>();
persistence.SubscriptionQueue("Samples.ManualUnsubscribe.Publisher.Subscriptions");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
This specific sample is configured to use MSMQ Subscription Persistence.
Subscriber decommissioning
In such a configuration, when a subscriber endpoint is decommissioned it may happen that subscriptions remain stored at the publisher. Publishers have no way to detect that a subscriber is no longer available. They will continue to publish events even for subscribers that no longer exist. This behavior will eventually lead to storage and quota issues.
To remove a subscription a message can be sent to the publisher:
public class ManualUnsubscribe :
IMessage
{
public string MessageTypeName { get; set; }
public string SubscriberEndpoint { get; set; }
}
Unsubscribe process
The Publisher handles the ManualUnsubscribe
message. This handling consists of three parts:
1. Handler
The IHandleMessages
implementation.
class ManualUnsubscribeHandler :
IHandleMessages<ManualUnsubscribe>
{
ISubscriptionStorage subscriptionStorage;
public ManualUnsubscribeHandler(ISubscriptionStorage subscriptionStorage)
{
this.subscriptionStorage = subscriptionStorage;
}
public async Task Handle(ManualUnsubscribe message, IMessageHandlerContext context)
{
var emptyContext = new ContextBag();
var type = Type.GetType(message.MessageTypeName, true);
var messageType = new MessageType(type);
var addressesForEndpoint = await GetAddressesForEndpoint(message.SubscriberEndpoint, messageType, emptyContext)
.ConfigureAwait(false);
await UnsubscribeFromEndpoint(addressesForEndpoint, messageType, emptyContext)
.ConfigureAwait(false);
}
The message handler relies on the ISubscriptionStorage
NServiceBus abstraction to perform the unsubscribe request regardless of the subscription storage configured for the publisher.
2. Query persistence for subscriptions
The handler then query the persistence for all subscriptions that match the message type and endpoint name.
async Task<IEnumerable<Subscriber>> GetAddressesForEndpoint(string endpoint, MessageType messageType, ContextBag emptyContext)
{
var messageTypes = new List<MessageType>
{
messageType
};
var addressesForMessage = await subscriptionStorage.GetSubscriberAddressesForMessage(messageTypes, emptyContext)
.ConfigureAwait(false);
return addressesForMessage
.Where(subscriber =>
{
return string.Equals(subscriber.Endpoint, endpoint, StringComparison.OrdinalIgnoreCase);
});
}
3. Unsubscribe
The handler then communicates the unsubscribe actions with the persistence.
Task UnsubscribeFromEndpoint(IEnumerable<Subscriber> addressesForEndpoint, MessageType messageType, ContextBag emptyContext)
{
var tasks = addressesForEndpoint
.Select(address => subscriptionStorage.Unsubscribe(
subscriber: address,
messageType: messageType,
context: emptyContext
));
return Task.WhenAll(tasks);
}
SubscriptionManager
SubscriptionManager
is a sample endpoint instance that can be used by operations or DevOps personnel to send unsubscribe requests to publishers whenever a subscriber is decommissioned:
Console.WriteLine("Press any key to unsubscribe 'Subscriber' from 'Publisher'");
Console.ReadKey();
var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.SubscriptionManager");
endpointConfiguration.UseTransport(new MsmqTransport());
endpointConfiguration.UsePersistence<NonDurablePersistence>();
endpointConfiguration.SendOnly();
var endpointInstance = await Endpoint.Start(endpointConfiguration)
.ConfigureAwait(false);
var typeToUnscubscribe = typeof(SomethingHappened);
var unsubscribeMessage = new ManualUnsubscribe
{
MessageTypeName = typeToUnscubscribe.AssemblyQualifiedName,
SubscriberEndpoint = "Samples.ManualUnsubscribe.Subscriber"
};
await endpointInstance.Send("Samples.ManualUnsubscribe.Publisher", unsubscribeMessage)
.ConfigureAwait(false);
await endpointInstance.Stop()
.ConfigureAwait(false);
Console.WriteLine("Unsubscribe message sent. Press any other key to exit");
Console.ReadKey();
Messages
The shared messages and events used by this sample.