This sample shows how to remove subscriptions from message transports that do not have native pub/sub support.
While this sample uses the MSMQ transport, the concepts shown are valid for all transports based on message-driven subscriptions except those that support native pub/sub. For more information see Publish-Subscribe.
Running the sample
Initial State
Run the sample and the three endpoints (SubscriptionManager
, Subscriber
, and Publisher
) will start.
Subscriber
initially subscribes to the SomethingHappened
message in Publisher
.
Publish event
Hit enter
in Publisher
and a SomethingHappened
event will be published via the following process:
The publisher queries the persistence to see if there are any subscribers for the event. The persistence returns the list of subscribers for the SomethingHappened
event, in this case Subscriber
. Publisher
sends the SomethingHappened
event to Subscriber
.
Unsubscribe
Hit enter
on SubscriptionManager
. A ManualUnsubscribe
command for SomethingHappened
will be sent to Publisher
.
Now hit enter
in Publisher
. The publisher queries the persistence to see if there are any subscribers for the event. The persistence returns no subscribers. SomethingHappened
will not be published to any endpoints.
Sample solution structure
Subscriber
An endpoint subscribed to an event that will be published by Publisher
:
class SomethingHappenedHandler :
IHandleMessages<SomethingHappened>
{
static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();
public Task Handle(SomethingHappened message, IMessageHandlerContext context)
{
log.Info("Subscriber has received SomethingHappened event.");
return Task.CompletedTask;
}
}
Publisher
The publisher configuration
var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.Publisher");
endpointConfiguration.UseTransport<MsmqTransport>();
var persistence = endpointConfiguration.UsePersistence<MsmqPersistence, StorageType.Subscriptions>();
persistence.SubscriptionQueue("Samples.ManualUnsubscribe.Publisher.Subscriptions");
endpointConfiguration.DisableFeature<TimeoutManager>();
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
This sample is configured to use MSMQ Subscription Persistence.
Subscriber decommissioning
Publishers have no way to detect that a subscriber is no longer available when a subscriber endpoint is decommissioned. If subscriptions aren't removed, publishers will continue to publish events even for subscribers that no longer exist. This behavior will eventually lead to storage and quota issues.
To remove a subscription, send a ManualUnsubscribe
message to the Publisher
from the SubscriptionManager
by hitting enter
:
public class ManualUnsubscribe :
IMessage
{
public string MessageTypeName { get; set; }
public string SubscriberEndpoint { get; set; }
}
Unsubscribe process
The Publisher
handles the ManualUnsubscribe
message. This handling consists of three parts: Handler
, Persistence
, and Unsubscribe
.
1. Handler
The ManualUnsubscribeHandler
message handler relies on the ISubscriptionStorage
NServiceBus abstraction to perform the unsubscribe request regardless of the subscription storage configured for the publisher.
class ManualUnsubscribeHandler :
IHandleMessages<ManualUnsubscribe>
{
ISubscriptionStorage subscriptionStorage;
public ManualUnsubscribeHandler(ISubscriptionStorage subscriptionStorage)
{
this.subscriptionStorage = subscriptionStorage;
}
public async Task Handle(ManualUnsubscribe message, IMessageHandlerContext context)
{
var emptyContext = new ContextBag();
var type = Type.GetType(message.MessageTypeName, true);
var messageType = new MessageType(type);
var addressesForEndpoint = await GetAddressesForEndpoint(message.SubscriberEndpoint, messageType, emptyContext);
await UnsubscribeFromEndpoint(addressesForEndpoint, messageType, emptyContext);
}
2. Persistence
The handler then queries the persistence for all subscriptions that match the message type and endpoint name.
async Task<IEnumerable<Subscriber>> GetAddressesForEndpoint(string endpoint, MessageType messageType, ContextBag emptyContext)
{
var messageTypes = new List<MessageType>
{
messageType
};
var addressesForMessage = await subscriptionStorage.GetSubscriberAddressesForMessage(messageTypes, emptyContext);
return addressesForMessage
.Where(subscriber =>
{
return string.Equals(subscriber.Endpoint, endpoint, StringComparison.OrdinalIgnoreCase);
});
}
3. Unsubscribe
The handler then removes the subscription from the persistence.
Task UnsubscribeFromEndpoint(IEnumerable<Subscriber> addressesForEndpoint, MessageType messageType, ContextBag emptyContext)
{
var tasks = addressesForEndpoint
.Select(address => subscriptionStorage.Unsubscribe(
subscriber: address,
messageType: messageType,
context: emptyContext
));
return Task.WhenAll(tasks);
}
SubscriptionManager
SubscriptionManager
is an endpoint instance that can be used to send unsubscribe requests to publishers whenever a subscriber is decommissioned:
Console.WriteLine("Press any key to unsubscribe 'Subscriber' from 'Publisher'");
Console.ReadKey();
var endpointConfiguration = new EndpointConfiguration("Samples.ManualUnsubscribe.SubscriptionManager");
endpointConfiguration.UseTransport<MsmqTransport>();
endpointConfiguration.UsePersistence<InMemoryPersistence>();
endpointConfiguration.SendOnly();
var endpointInstance = await Endpoint.Start(endpointConfiguration);
var typeToUnscubscribe = typeof(SomethingHappened);
var unsubscribeMessage = new ManualUnsubscribe
{
MessageTypeName = typeToUnscubscribe.AssemblyQualifiedName,
SubscriberEndpoint = "Samples.ManualUnsubscribe.Subscriber"
};
await endpointInstance.Send("Samples.ManualUnsubscribe.Publisher", unsubscribeMessage);
await endpointInstance.Stop();
Console.WriteLine("Unsubscribe message sent. Press any other key to exit");
Console.ReadKey();
This sample uses a stand alone send-only endpoint to send the unsubscribe message. However that message could also be sent from any endpoint, process, or script that has the knowledge of the endpoint and message to unsubscribe from.
Messages
The project contains the shared messages and events used by this sample.