MsmqToSql Relay

Component: NServiceBus
NuGet Package NServiceBus.Transport.Msmq (1-pre)
Target NServiceBus Version: 1-pre
This page targets a pre-release version and is subject to change prior to the final release.
Standard support for version 1-pre of NServiceBus has expired. For more information see our Support Policy.

SQL Relay (Transport Integration)

This sample shows how to receive events published by an endpoint whose transport is different from that of the subscriber. In this particular example, events published from an MSMQ endpoint will be relayed to an SQL endpoint for downstream SQL subscribers.

The solution comprises of these four projects.

Shared

This project contains the schema for the events that will be published by MsmqPublisher endpoint. It contains an event called, SomethingHappened

public class SomethingHappened :
    IEvent
{
}

MsmqPublisher

  • This endpoint uses MSMQ as the transport. It publishes events every time Enter key is pressed.
  • This endpoint uses NHibernate persistence. It uses a database called, PersistenceForMsmqTransport for the persistence. Use the script CreateDatabase.sql contained in this project to create the PersistenceForMsmqTransport database and the tables required for NHibernatePersistence. Alternatively running the MsmqPublisher within Visual Studio in debug mode will also create the necessary tables for Subscription and Timeout storage.
use [master]
go

create database [PersistenceForMsmqTransport]
go

use [PersistenceForMsmqTransport]
go

create table [dbo].[Subscription](
	[SubscriberEndpoint] [varchar](450) not null,
	[MessageType] [varchar](450) not null,
	[LogicalEndpoint] [varchar](450),
	[Version] [varchar](450),
	[TypeName] [varchar](450),
	primary key clustered
	(
		[SubscriberEndpoint],
		[MessageType]
	)
)

create table [dbo].[TimeoutEntity](
	[Id] [uniqueidentifier] not null,
	[Destination] [nvarchar](1024),
	[SagaId] [uniqueidentifier],
	[State] [varbinary](max),
	[Time] [datetime],
	[Headers] [nvarchar](max),
	[Endpoint] [nvarchar](440),
	primary key nonclustered
	(
		[Id]
	)
)

go

Endpoint configuration for MsmqPublisher.

var endpointConfiguration = new EndpointConfiguration("MsmqPublisher");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.UseTransport<MsmqTransport>();
endpointConfiguration.EnableInstallers();
var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForMsmqTransport;Integrated Security=True");

Publishing events

while (true)
{
    var key = Console.ReadKey();
    Console.WriteLine();

    if (key.Key != ConsoleKey.Enter)
    {
        return;
    }
    busSession.Publish(new SomethingHappened());
    Console.WriteLine("SomethingHappened Event published");
}

Additional entry to the list of subscribers

For all the events published by the MsmqPublisher add a corresponding new entry in the Subscriptions table so that the NativeMsmqToSql endpoint will start to receive these events.

Run the AddSubscriber.sql script contained in this project to add a new entry for the SomethingHappened event to the Subscriptions table:

use PersistenceForMsmqTransport
go

insert into Subscription
	([SubscriberEndpoint]
	,[MessageType]
	,[LogicalEndpoint]
	,[Version]
	,[TypeName])
values
	('MsmqToSqlRelay@localhost',
	'Shared.SomethingHappened,0.0.0.0',
	'MsmqToSqlRelay' ,
	'0.0.0.0',
	'Shared.SomethingHappened')
go

NativeMsmqToSql

NativeMsmqToSql is a program that reads messages from a queue using native Messaging MSMQ API and uses SqlClient API to write information in the corresponding SQL table for the SqlRelay endpoint.

Since this is not an NServiceBus endpoint, create the required transactional queue named MsmqToSqlRelay.

How does it work

When messages arrive in the MsmqToSqlRelay queue, they are read using .NET Messaging API.

// The address of the queue that will be receiving messages from other MSMQ publishers
var queuePath = @".\private$\MsmqToSqlRelay";

// Create message queue object
var addressOfMsmqBridge = new MessageQueue(queuePath, QueueAccessMode.SendAndReceive);
// for the sample's sake. Might need to fine tune the exact filters required.
addressOfMsmqBridge.MessageReadPropertyFilter.SetAll();

// Add an event handler for the ReceiveCompleted event.
addressOfMsmqBridge.ReceiveCompleted += MsmqBridgeOnReceiveCompleted;

// Begin the asynchronous receive operation.
addressOfMsmqBridge.BeginReceive();

When a message arrives at the queue, the body and the header of the message is extracted and using SQL Client API, the information is stored in the SQL table, SqlRelay.

var messageBody = ConvertStreamToByteArray(message.BodyStream);

// Serialize to dictionary
var headerString = Encoding.UTF8.GetString(message.Extension)
    .TrimEnd('\0');
var headers = ExtractHeaders(headerString);

// If this queue is going to be receiving messages from endpoints
// older than v4.0, then set the header["NServiceBus.MessageId"]
// to be a deterministic guid based on the Id as Sql transport
// expects a Guid for the MessageId and not an Id in the MSMQ format.

// Have the necessary raw information from queue
// Therefore write it to Sql.
Console.WriteLine("Forwarding message to SQLRelay endpoint");
SendMessageToSql(sqlConnectionStr, sqlRelayEndpointName, messageBody, headers);

SqlRelay

  • This endpoint uses SQL transport. It uses a database called, PersistenceForSqlTransport Use the script CreateDatabase.sql contained in this project to create the PersistenceForSqlTransport database.
use [master]
go
create database [PersistenceForSqlTransport]
go
  • References the Shared message schema.
  • Messages to this endpoint are being written natively by the NativeMsmqToSql program. Since no SQL endpoint is publishing the events, this endpoint is configured to have its auto subscription for events turned off.
  • Has a handler for the event that does a publish in the handler, so that downstream SQL subscribers can receive the event.

The SqlRelay configuration

var endpointConfiguration = new EndpointConfiguration("SqlRelay");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
endpointConfiguration.DisableFeature<AutoSubscribe>();
var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForSqlTransport;Integrated Security=True");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(@"Data Source=.\SqlExpress;Database=PersistenceForSqlTransport;Integrated Security=True");

The event handler

class SomethingHappenedHandler :
    IHandleMessages<SomethingHappened>
{
    static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();

    public Task Handle(SomethingHappened message, IMessageHandlerContext context)
    {
        log.Info("Sql Relay has now received this event from the MSMQ. Publishing this event for downstream SQLSubscribers");
        // relay this event to other interested SQL subscribers
        return context.Publish(message);
    }
}

Summary

  1. Create a new transactional queue that the MSMQ publisher will be sending its events to in addition to its current subscribers.
  2. For all the events published by the MsmqPublisher add a corresponding new entry in the Subscriptions table.
  3. The NativeMsmqToSql app will read the messages that arrive at this new transactional queue and write the corresponding message information in the SQL table of the SqlRelay endpoint.
  4. The SqlRelay endpoint receives the message and publishes the event for downstream SQL subscribers.
The deployment steps can be automated to create the needed queues and tables.

Related Articles

  • MSMQ Transport
    MSMQ is the primary durable communications technology for Microsoft but does not dynamically detect network interfaces.
  • SQL Server Transport
    High-level description of NServiceBus SQL Server Transport.
  • Transports

Last modified