WebSocket Gateway

Component: Gateway
NuGet Package NServiceBus.Gateway (2.x)
Target NServiceBus Version: 6.x

Code walk-through

This sample demonstrates replacing the channel that is used for the NServiceBus Gateway with a WebSockets channel based on the websocket-sharp library.

Messages

A shared class library for the sample message transmitted through the gateway.

SiteA

An endpoint that sends a message to SiteB via the WebSocket gateway.

Gateway configuration

  • Maps the site key SiteB to ws://localhost:33334/SiteB
  • Receives incoming messages on ws://localhost:33335/SiteA
<configSections>
  <section name="GatewayConfig"
           type="NServiceBus.Config.GatewayConfig, NServiceBus.Gateway" />
</configSections>
<GatewayConfig>
  <Sites>
    <Site Key="SiteB"
          Address="ws://localhost:33335/SiteB"
          ChannelType="WebSocket"/>
  </Sites>
  <Channels>
    <Channel Address="ws://localhost:33334/SiteA"
             ChannelType="WebSocket"
             Default="True" />
  </Channels>
</GatewayConfig>

Endpoint configuration

  • Enables the gateway
  • Replaces the default channel factories
var config = new EndpointConfiguration("Custom Gateway - SiteA");
config.UseTransport<LearningTransport>();
// NOTE: The LearningPersistence does not support the gateway
config.UsePersistence<InMemoryPersistence>();

config.EnableFeature<Gateway>();
var gatewaySettings = config.Gateway();
gatewaySettings.ChannelFactories(
    s => new WebSocketChannelSender(),
    s => new WebSocketChannelReceiver()
);

SiteB

An endpoint that receives a message sent from SiteA via the WebSocket gateway.

Gateway configuration

  • Maps the site key SiteA to ws://localhost:33334/SiteA
  • Receives incoming messages on ws://localhost:33335/SiteB
<configSections>
  <section name="GatewayConfig"
           type="NServiceBus.Config.GatewayConfig, NServiceBus.Gateway" />
</configSections>
<GatewayConfig>
  <Sites>
    <Site Key="SiteA"
          Address="ws://localhost:33334/SiteA"
          ChannelType="WebSocket"/>
  </Sites>
  <Channels>
    <Channel Address="ws://localhost:33335/SiteB"
             ChannelType="WebSocket"
             Default="True" />
  </Channels>
</GatewayConfig>

Endpoint configuration

  • Enables the gateway
  • Replaces the default channel factories
var config = new EndpointConfiguration("Custom Gateway - SiteB");
var transport = config.UseTransport<LearningTransport>();
var routing = transport.Routing();
routing.RouteToEndpoint(typeof(SomeMessage), "Custom Gateway - SiteB");
// NOTE: The LearningPersistence does not support the gateway
config.UsePersistence<InMemoryPersistence>();

config.EnableFeature<Gateway>();
var gatewaySettings = config.Gateway();
gatewaySettings.ChannelFactories(
    s => new WebSocketChannelSender(),
    s => new WebSocketChannelReceiver()
);

WebScoketGateway

A shared library that contains the implementations for the WebSocket gateway channels

WebSocketGatewayChannelSender

  • Connects to a remote WebSocket server
  • Serializes outgoing messages
public class WebSocketChannelSender :
    IChannelSender
{
    public Task Send(string remoteAddress, IDictionary<string, string> headers, Stream data)
    {
        using (var webSocket = new WebSocket(remoteAddress))
        {
            webSocket.Connect();
            var bytes = GetBytes(headers, data);
            webSocket.Send(bytes);
            webSocket.Close();
        }
        return Task.CompletedTask;
    }

    static byte[] GetBytes(IDictionary<string, string> headers, Stream data)
    {
        using (var stream = new MemoryStream())
        {
            var binaryFormatter = new BinaryFormatter();
            binaryFormatter.Serialize(stream, headers);
            data.CopyTo(stream);
            return stream.ToArray();
        }
    }
}

WebSocketGatewayChannelReceiver

  • Listens for incoming WebSocket connections
  • Deserializes incoming messages
public class WebSocketChannelReceiver :
    IChannelReceiver
{
    public void Start(string address, int maxConcurrency, Func<DataReceivedOnChannelArgs, Task> dataReceivedOnChannel)
    {
        var uri = new Uri(address);
        server = new WebSocketServer(uri.GetLeftPart(UriPartial.Authority));

        server.AddWebSocketService(uri.AbsolutePath,
            initializer: () =>
            {
                return new WebSocketMessageBehavior(dataReceivedOnChannel);
            });
        server.Start();
    }

    public Task Stop()
    {
        server?.Stop();
        return Task.CompletedTask;
    }

    WebSocketServer server;
}

Related Articles

  • Gateway
    Durable fire-and-forget messaging across physically separated IT infrastructure.

Last modified