MSMQ to Azure Service Bus Transport Bridge

Component: Azure Service Bus Transport
NuGet Package NServiceBus.Azure.Transports.WindowsAzureServiceBus (8-pre)
Target NServiceBus Version: 7.x
This page targets a pre-release version and is subject to change prior to the final release.

Endpoints running on different transports cannot exchange messages and require additional integration work.

Common examples include:

  • A hybrid solution that spans across endpoints deployed on-premises and in a cloud environment.
  • Departments within organization integrating their systems that use different messaging technologies for historical reasons.

Traditionally, such integrations would require native messaging or relaying. Bridging is an alternative, allowing endpoints to communicate over different transports without a need to get into low-level messaging technology code. With time, when endpoints can standardize on a single transport, bridging can be removed with a minimal impact on the entire system.

Prerequisites

An environment variable named AzureServiceBus.ConnectionString with the connection string for the Azure Service Bus namespace.

Azure Service Bus Transport

This sample utilizes the Azure Service Bus Transport.

Code walk-through

This sample shows an integration between two endpoints running on two different transports, MsmqEndpoint endpoint running on MSMQ and AsbEndpoint running on Azure Service Bus.

Covered scenarios are:

  • Sending commands from Azure Service Bus endpoint to MSMQ endpoint.
  • Publishing events from MSMQ endpoint and subscribing to those events from Azure Service Bus endpoint.

Bridging

Endpoints are bridged using Transport Bridge. Bridge project is implemented as a standalone process that runs side-by-side with the bridged endpoints, MsmqEndpoint and AsbEndpoint.

Azure Service Bus endpoint configuration

Azure Service Bus endpoint is bridged via Bridge-ASB queue:

var routing = transport.Routing();
var bridge = routing.ConnectToBridge("Bridge-ASB");

Azure Service Bus topology

Command routing to MSMQ endpoint is specified using bridge extension method:

bridge.RouteToEndpoint(typeof(MyCommand), "Samples.Azure.ServiceBus.MsmqEndpoint");
to access bridge extension method, project has to reference NServiceBus.Bridge.Connector NuGet package.

To subscribe to an event published by MSMQ endpoint, Azure Service Bus endpoint must register publishing endpoint using bridge extension method:

bridge.RegisterPublisher(typeof(MyEvent), "Samples.Azure.ServiceBus.MsmqEndpoint");

MSMQ endpoint configuration

MSMQ endpoint is bridged via Bridge-MSMQ queue:

MSMQ topology

Bridge configuration

Bridge process is connecting between MSMQ and Azure ServiceBus transports and provide any configuration settings required by any of the transports. For Azure Service Bus that would be a mandatory connection string and topology.

var bridgeConfiguration = Bridge
    .Between<MsmqTransport>("Bridge-MSMQ")
    .And<AzureServiceBusTransport>("Bridge-ASB", transport =>
    {
        transport.ConnectionString(connectionString);
        transport.UseForwardingTopology();
        var settings = transport.GetSettings();
        var serializer = Tuple.Create(new NewtonsoftSerializer() as SerializationDefinition, new SettingsHolder());
        settings.Set("MainSerializer", serializer);
    });

bridgeConfiguration.AutoCreateQueues();
bridgeConfiguration.UseSubscriptionPersistence<InMemoryPersistence>((configuration, persistence) => { });

A bridge is created, started, and should be executed as long as bridging is required.

var bridge = bridgeConfiguration.Create();

await bridge.Start().ConfigureAwait(false);

Console.WriteLine("Press any key to exit");
Console.ReadKey();

await bridge.Stop().ConfigureAwait(false);

Forwarding interception

In scenarios where forwarding operation requires customization, Bridge provides an option to intercept forwarding operation. For example, Azure Service Bus transport requires any non Azure Service Bus transactional scope to be suppressed.

// TODO: Bridge 2.x doesn't support this yet

bridgeConfiguration.InterceptForwarding(async (queue, message, forward) =>
{
    using (new TransactionScope(TransactionScopeOption.Suppress, TransactionScopeAsyncFlowOption.Enabled))
    {
        await forward().ConfigureAwait(false);
    }
});

Related Articles


Last modified