Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

WebSocket Gateway

Component: Gateway
NuGet Package: NServiceBus.Gateway (4.x)
Target Version: NServiceBus 8.x

Code walk-through

This sample demonstrates replacing the channel that is used for the NServiceBus gateway with a WebSockets channel based on the websocket-sharp library.

Messages

A shared class library for the sample message transmitted through the gateway.

SiteA

An endpoint that sends a message to SiteB via the WebSocket gateway.

Gateway and endpoint configuration

  • Maps the site key SiteB to ws://localhost:33334/SiteB
  • Receives incoming messages on ws://localhost:33335/SiteA
  • Enables the gateway
  • Replaces the default channel factories
var config = new EndpointConfiguration("Custom Gateway - SiteA");
config.UseTransport(new LearningTransport());

var gatewaySettings = config.Gateway(new NonDurableDeduplicationConfiguration());
gatewaySettings.ChannelFactories(
    s => new WebSocketChannelSender(),
    s => new WebSocketChannelReceiver()
);

gatewaySettings.AddReceiveChannel(
    address: "ws://localhost:33334/SiteA",
    type: "WebSocket",
    isDefault: true);

gatewaySettings.AddSite(
    siteKey: "SiteB",
    address: "ws://localhost:33335/SiteB",
    type: "WebSocket");

SiteB

An endpoint that receives a message sent from SiteA via the WebSocket gateway.

Gateway and endpoint configuration

  • Maps the site key SiteA to ws://localhost:33334/SiteA
  • Receives incoming messages on ws://localhost:33335/SiteB
  • Enables the gateway
  • Replaces the default channel factories
var config = new EndpointConfiguration("Custom Gateway - SiteB");
var transport = config.UseTransport(new LearningTransport());
transport.RouteToEndpoint(typeof(SomeMessage), "Custom Gateway - SiteB");

var gatewaySettings = config.Gateway(new NonDurableDeduplicationConfiguration());
gatewaySettings.ChannelFactories(
    s => new WebSocketChannelSender(),
    s => new WebSocketChannelReceiver()
);

gatewaySettings.AddReceiveChannel(
    address: "ws://localhost:33335/SiteB",
    type: "WebSocket",
    isDefault: true);

gatewaySettings.AddSite(
    siteKey: "SiteA",
    address: "ws://localhost:33334/SiteA",
    type: "WebSocket");

WebSocketGateway

A shared library that contains the implementations for the WebSocket gateway channels

WebSocketGatewayChannelSender

  • Connects to a remote WebSocket server
  • Serializes outgoing messages
public class WebSocketChannelSender :
    IChannelSender
{
    public Task Send(string remoteAddress, IDictionary<string, string> headers, Stream data, CancellationToken cancellationToken = new CancellationToken())
    {
        using (var webSocket = new WebSocket(remoteAddress))
        {
            webSocket.Connect();
            var bytes = GetBytes(headers, data);
            webSocket.Send(bytes);
            webSocket.Close();
        }
        return Task.CompletedTask;
    }

    static byte[] GetBytes(IDictionary<string, string> headers, Stream data)
    {
        using (var stream = new MemoryStream())
        {
            var binaryFormatter = new BinaryFormatter();
            binaryFormatter.Serialize(stream, headers);
            data.CopyTo(stream);
            return stream.ToArray();
        }
    }
}

WebSocketGatewayChannelReceiver

  • Listens for incoming WebSocket connections
  • Deserializes incoming messages
public class WebSocketChannelReceiver :
    IChannelReceiver
{
    public void Start(string address, int maxConcurrency, Func<DataReceivedOnChannelEventArgs, CancellationToken, Task> dataReceivedOnChannel)
    {
        var uri = new Uri(address);
        server = new WebSocketServer(uri.GetLeftPart(UriPartial.Authority));

        server.AddWebSocketService(uri.AbsolutePath,
            initializer: () =>
            {
                return new WebSocketMessageBehavior(dataReceivedOnChannel);
            });
        server.Start();
    }

    public Task Stop(CancellationToken token)
    {
        server?.Stop();
        return Task.CompletedTask;
    }

    WebSocketServer server;
}

Related Articles

  • Gateway
    Durable fire-and-forget messaging across physically separated IT infrastructure.

Last modified