SQL Bridge

Component: NServiceBus
NuGet Package NServiceBus (5.x)

SQL Bridge (Transport Integration)

This sample has been deprecated. Refer to the MsmqToSqlRelay Sample

This sample shows how to setup a SQL subscriber so it can subscribe to events from a MSMQ publisher. The solution comprises of these 5 projects.

This sample uses NHibernate persistence. It uses a database called, PersistenceForMsmqTransport for MSMQ transport endpoints and a different database called, PersistenceForSqlTransport for SQL Transport endpoints.

Shared

The event that will be published by MsmqPublisher

public class SomethingHappened :
    IEvent
{
}

MsmqPublisher

The publisher configuration

var busConfiguration = new BusConfiguration();
busConfiguration.EndpointName("MsmqPublisher");
busConfiguration.EnableInstallers();
var persistence = busConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForMsmqTransport;Integrated Security=True");

The publish loop

Console.WriteLine("Press Enter to publish the SomethingHappened Event");
while (Console.ReadLine() != null)
{
    Console.WriteLine("Event published");
    bus.Publish(new SomethingHappened());
}

Additional entry to the list of subscribers

Add a new entry in the Subscriptions collection for the new queue specified in the app.config to the list of subscribers in the MsmqPublisher's subscription storage.

Run this script to add the new entry:

Use PersistenceForMsmqTransport
Go

INSERT INTO Subscription
       ([SubscriberEndpoint]
       ,[MessageType]
       ,[Version]
       ,[TypeName])
 VALUES
       ('SqlMsmqTransportBridge@MachineName',
       'Shared.SomethingHappened,0.0.0.0',
       '0.0.0.0',
       'Shared.SomethingHappened')
GO

MsmqSubscriber

Subscribes to the events from the MsmqPublisher

The Msmq Subscriber configuration.

var busConfiguration = new BusConfiguration();
busConfiguration.EndpointName("MsmqSubscriber");
busConfiguration.EnableInstallers();
var persistence = busConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForMsmqTransport;Integrated Security=True");

The Msmq Subscriber handler.

public class SomethingHappenedHandler :
    IHandleMessages<SomethingHappened>
{
    static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();

    public void Handle(SomethingHappened message)
    {
        log.Info("MSMQ Subscriber has now received the event: SomethingHappened");
    }
}

SqlBridge

This endpoint is setup to read messages that arrive in MSMQ via an IAdvancedSatellite.

The bridge configuration

var busConfiguration = new BusConfiguration();
busConfiguration.EndpointName("SqlBridge");
busConfiguration.EnableInstallers();
var persistence = busConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForSqlTransport;Integrated Security=True");
var transport = busConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(@"Data Source=.\SqlExpress;Database=NServiceBus;Integrated Security=True");

The Satellite

// Implements an advanced satellite.
// Allows to receive messages on a different transport.
class MsmqReceiver :
    IAdvancedSatellite
{
    Configure configure;
    CriticalError criticalError;
    // Since this endpoint's transport is usingSqlServer, the IPublishMessages
    // will be using the SqlTransport to publish messages
    IPublishMessages publisher;
    static ILog log = LogManager.GetLogger<MsmqReceiver>();
    public bool Disabled => false;

    public MsmqReceiver(Configure configure, CriticalError criticalError, IPublishMessages publisher)
    {
        this.configure = configure;
        this.criticalError = criticalError;
        this.publisher = publisher;
    }

    // Since listening to the events published by MSMQ, a new
    // MsmqDequeueStrategy is constructed and setting the input queue to the
    // queue which will be receiving all the events from the MSMQ publishers.
    public Action<NServiceBus.Unicast.Transport.TransportReceiver> GetReceiverCustomization()
    {
        return tr =>
        {
            tr.Receiver = new MsmqDequeueStrategy(configure, criticalError, new MsmqUnitOfWork())
            {
                ErrorQueue = Address.Parse(ConfigErrorQueue.errorQueue)
            };
        };
    }


    // Will get invoked, whenever a new event is published by the Msmq publishers
    // and when they notify the bridge. The bridge is a MSMQ and the publishers
    // have an entry for this queue in their subscription storage.
    public bool Handle(TransportMessage msmqMessage)
    {
        var eventTypes = ExtractEventTypes(msmqMessage.Headers);
        var sqlMessage = TranslateToSqlTransportMessage(msmqMessage);

        log.Info("Forwarding message to all the SQL subscribers via a Publish");

        publisher.Publish(sqlMessage, new PublishOptions(eventTypes.First()));
        return true;
    }

    static Type[] ExtractEventTypes(Dictionary<string, string> headers)
    {
        return new []
        {
            Type.GetType(headers["NServiceBus.EnclosedMessageTypes"])
        };
    }

    static TransportMessage TranslateToSqlTransportMessage(TransportMessage msmqMessage)
    {
        var headers = msmqMessage.Headers;
        var msmqId = headers["NServiceBus.MessageId"];
        var sqlId = msmqId;

        // Set the Id to a deterministic guid, as Sql message Ids are Guids and
        // Msmq message ids are guid\nnnn. Newer versions of NServiceBus already
        // return just a guid for the messageId. So, check to see if the Id is
        // a valid Guid and if not, only then create a valid Guid. This check
        // is important as it affects the retries if the message is rolled back.
        // If the Ids are different, then the recoverability won't know its the same message.
        if (!Guid.TryParse(msmqId, out var msmqGuid))
        {
            sqlId = GuidBuilder.BuildDeterministicGuid(msmqId).ToString();
            headers["NServiceBus.MessageId"] = sqlId;
        }

        return new TransportMessage(sqlId, headers)
        {
            Body = msmqMessage.Body,
            // MSMQ and SQL Server transports signal lack of TimeToBeReceived differently.
            // MSMQ uses Message.InfiniteTimeout value to indicate that a message should
            // never be discarded. SQL Server transport expects TimeSpan.MaxValue to
            // indicate the same behaviour. This would normally be handled by NServiceBus
            // pipeline when transforming a transport message into a logical message.
            // This sample skips the pipeline and deals with transport messages directly,
            // thus making this translation required.
            TimeToBeReceived = (msmqMessage.TimeToBeReceived < Message.InfiniteTimeout) ? msmqMessage.TimeToBeReceived : TimeSpan.MaxValue
        };
    }

    // Address of the MSMQ that will be receiving all of the events from
    // all of the MSMQ publishers.
    public Address InputAddress => Address.Parse("SqlMsmqTransportBridge");

    public void Start()
    {
    }

    public void Stop()
    {
    }
}
Since SqlBridge is not using the native MSMQ transport manually creating SqlMsmqTransportBridge queue will be required.
TransportMessage objects for different transports may use certain properties in an incompatible way (e.g. TimeToBeReceived) and values of such properties need to be translated before use on another transport.

How does the advanced satellite work

  • It uses a MSMQ Dequeue strategy to read messages from its Input queue.
  • References the Shared message schema dll.
  • The input queue is defined as SqlMsmqTransportBridge in the InputAddress property of the satellite.
  • The MSMQ dequeue strategy is set here for reading messages from the queue (MSMQ).
  • The satellite will automatically process any message that is received in that queue (MSMQ).
  • The satellite will publish the received event. Since this endpoint uses SqlTransport, it will publish to its SQL queues.

SqlSubscriber

Receives events from the SqlBridge. The endpoint address is the SQL bridge address and not the original publisher's address.

The SQL Subscriber configuration

var busConfiguration = new BusConfiguration();
busConfiguration.EndpointName("SqlSubscriber");
busConfiguration.EnableInstallers();
var persistence = busConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForSqlTransport;Integrated Security=True");
var transport = busConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(@"Data Source=.\SqlExpress;Database=NServiceBus;Integrated Security=True");

The event handler

class SomethingHappenedHandler :
    IHandleMessages<SomethingHappened>
{
    static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();

    public void Handle(SomethingHappened message)
    {
        log.Info("Sql subscriber has now received this event. This was originally published by MSMQ publisher.");
    }
}

Summary

  1. Creating a new transactional queue that the MSMQ publisher will be sending its events to.
  2. In the original MSMQ Publisher's Subscriptions storage, in addition to its list of all its current subscribers, has an additional entry for the queue that the SqlBridge will be listening to.
  3. The SQL bridge endpoint (setup to read from that input queue) processes that message and publishes the event.
  4. The SQL Subscriber, subscribes to the SqlBridge.
The steps of creating the queue and adding the additional subscription message in the subscriptions queue of the publisher can be automated for deployment.

Related Articles

  • MSMQ Transport
    MSMQ is the primary durable communications technology for Microsoft but does not dynamically detect network interfaces.
  • SQL Server Transport
    High-level description of NServiceBus SQL Server Transport.
  • Transports

Last modified