Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Msmq-to-SQL Relay

Component: MSMQ Transport
NuGet Package: NServiceBus.Transport.Msmq (2.x)
Target Version: NServiceBus 8.x

This sample is an example of transport integration, showing how to receive events published by an endpoint whose transport is different from that of the subscriber. Here, events published from an MSMQ endpoint will be relayed to an SQL endpoint for downstream SQL subscribers. The strategy shown can be applied to any transport integration, such as integrating the work of different teams, or crossing the boundary between on-premises and cloud-based systems.

The solution consists of an endpoint that publishes MSMQ messages, an endpoint that uses the SQL transport, a NativeMsmqToSql program that moves messages from MSMQ to SQL, and a shared assembly.

Shared

This project contains the schema for the events that will be published by the MsmqPublisher endpoint. It contains an event called SomethingHappened:

public class SomethingHappened :
    IEvent
{
}

MsmqPublisher

  • This endpoint uses MSMQ as the transport. It publishes events every time enter is pressed.
  • This endpoint uses the NHibernate persister. It uses a database called PersistenceForMsmqTransport for the persistence. Use the script CreateDatabase.sql contained in this project to create the PersistenceForMsmqTransport database as well as the tables required for NHibernatePersistence. Alternatively, running the MsmqPublisher within Visual Studio in debug mode will also create the necessary tables for subscription and timeout storage.
use [master]
go

create database [PersistenceForMsmqTransport]
go

use [PersistenceForMsmqTransport]
go

create table [dbo].[Subscription](
	[SubscriberEndpoint] [varchar](450) not null,
	[MessageType] [varchar](450) not null,
	[LogicalEndpoint] [varchar](450),
	[Version] [varchar](450),
	[TypeName] [varchar](450),
	primary key clustered
	(
		[SubscriberEndpoint],
		[MessageType]
	)
)

create table [dbo].[TimeoutEntity](
	[Id] [uniqueidentifier] not null,
	[Destination] [nvarchar](1024),
	[SagaId] [uniqueidentifier],
	[State] [varbinary](max),
	[Time] [datetime],
	[Headers] [nvarchar](max),
	[Endpoint] [nvarchar](440),
	primary key nonclustered
	(
		[Id]
	)
)

go

Endpoint configuration for MsmqPublisher.

var endpointConfiguration = new EndpointConfiguration("MsmqPublisher");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.UseTransport(new MsmqTransport());
endpointConfiguration.EnableInstallers();
var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=PersistenceForMsmqTransport;Integrated Security=True;Max Pool Size=100;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=PersistenceForMsmqTransport;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
var hibernateConfig = new Configuration();
hibernateConfig.DataBaseIntegration(x =>
{
    x.ConnectionString = connectionString;
    x.Dialect<MsSql2012Dialect>();
    x.Driver<MicrosoftDataSqlClientDriver>();
});
persistence.UseConfiguration(hibernateConfig);

Publishing events

while (true)
{
    var key = Console.ReadKey();
    Console.WriteLine();

    if (key.Key != ConsoleKey.Enter)
    {
        return;
    }
    await messageSession.Publish(new SomethingHappened());
    Console.WriteLine("SomethingHappened Event published");
}

Additional entry to the list of subscribers

For all the events published by the MsmqPublisher, add a corresponding new entry in the Subscriptions table so that the NativeMsmqToSql endpoint will start to receive these events.

Run the AddSubscriber.sql script contained in this project to add a new entry for the SomethingHappened event to the Subscriptions table:

use PersistenceForMsmqTransport
go

insert into Subscription
	([SubscriberEndpoint]
	,[MessageType]
	,[LogicalEndpoint]
	,[Version]
	,[TypeName])
values
	('MsmqToSqlRelay@localhost',
	'Shared.SomethingHappened,0.0.0.0',
	'MsmqToSqlRelay' ,
	'0.0.0.0',
	'Shared.SomethingHappened')
go

NativeMsmqToSql

NativeMsmqToSql is a program that reads messages from a queue using native Messaging MSMQ API and uses the SqlClient API to write information in the corresponding SQL table for the SqlRelay endpoint.

Since this is not an NServiceBus endpoint, the required transactional queue named MsmqToSqlRelay must be created manually.

How does it work

When messages arrive in the MsmqToSqlRelay queue, they are read using the .NET Messaging API.

// The address of the queue that will be receiving messages from other MSMQ publishers
var queuePath = @".\private$\MsmqToSqlRelay";

// Create message queue object
var addressOfMsmqBridge = new MessageQueue(queuePath, QueueAccessMode.SendAndReceive);
// for the sample's sake. Might need to fine tune the exact filters required.
addressOfMsmqBridge.MessageReadPropertyFilter.SetAll();

// Add an event handler for the ReceiveCompleted event.
addressOfMsmqBridge.ReceiveCompleted += MsmqBridgeOnReceiveCompleted;

// Begin the asynchronous receive operation.
addressOfMsmqBridge.BeginReceive();

When a message arrives at the queue, the body and the header of the message is extracted and the information is stored in the SQL table, SqlRelay using the SQL Client API.

var messageBody = ConvertStreamToByteArray(message.BodyStream);

// Serialize to dictionary
var headerString = Encoding.UTF8.GetString(message.Extension)
    .TrimEnd('\0');
var headers = ExtractHeaders(headerString);

// If this queue is going to be receiving messages from endpoints
// older than v4.0, then set the header["NServiceBus.MessageId"]
// to be a deterministic guid based on the Id as Sql transport
// expects a Guid for the MessageId and not an Id in the MSMQ format.

// Have the necessary raw information from queue
// Therefore write it to Sql.
Console.WriteLine("Forwarding message to SQLRelay endpoint");
SendMessageToSql(sqlConnectionStr, sqlRelayEndpointName, messageBody, headers);

SqlRelay

This endpoint uses the SQL transport with a database called PersistenceForSqlTransport. Use the script CreateDatabase.sql contained in this project to create the PersistenceForSqlTransport database.

use [master]
go
create database [PersistenceForSqlTransport]
go
  • This endpoint references the Shared message schema.
  • Messages to this endpoint are written natively by the NativeMsmqToSql program. Since no SQL endpoint is publishing the events, this endpoint is configured to have automatic subscription for events turned off.
  • This endpoint has a handler for the SomethingHappened event, which in turn does a publish within that handler so that downstream SQL subscribers can receive the event.

The SqlRelay configuration

var endpointConfiguration = new EndpointConfiguration("SqlRelay");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.EnableInstallers();
endpointConfiguration.DisableFeature<AutoSubscribe>();

// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=PersistenceForSqlTransport;Integrated Security=True;Max Pool Size=100;Encrypt=false
endpointConfiguration.UseTransport(new SqlServerTransport(@"Server=localhost,1433;Initial Catalog=PersistenceForSqlTransport;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false"));

The event handler

class SomethingHappenedHandler :
    IHandleMessages<SomethingHappened>
{
    static ILog log = LogManager.GetLogger<SomethingHappenedHandler>();

    public Task Handle(SomethingHappened message, IMessageHandlerContext context)
    {
        log.Info("Sql Relay has now received this event from the MSMQ. Publishing this event for downstream SQLSubscribers");
        // relay this event to other interested SQL subscribers
        return context.Publish(message);
    }
}

Summary

  1. Create a new transactional queue that the MSMQ publisher will send its events to in addition to its current subscribers.
  2. For all events published by the MsmqPublisher add a corresponding new entry in the Subscriptions table.
  3. The NativeMsmqToSql app will read the messages that arrive at this new transactional queue and write the corresponding message information in the SQL table of the SqlRelay endpoint.
  4. The SqlRelay endpoint receives the message and publishes the event for downstream SQL subscribers.
The deployment steps can be automated to create the required queues and tables.

Related Articles

  • MSMQ Transport
    MSMQ is a solid durable communications technology but does not dynamically detect network interfaces.
  • SQL Server transport
    An overview of the NServiceBus SQL Server transport.
  • Transports
    An overview of the transports available in NServiceBus.

Last modified