Code walk-through
This sample demonstrates replacing the channel that is used for the NServiceBus gateway with a WebSockets channel based on the websocket-sharp library.
Messages
A shared class library for the sample message transmitted through the gateway.
SiteA
An endpoint that sends a message to SiteB via the WebSocket gateway.
Gateway and endpoint configuration
- Maps the site key
SiteB
tows:/
/ localhost:33334/ SiteB - Receives incoming messages on
ws:/
/ localhost:33335/ SiteA - Enables the gateway
- Replaces the default channel factories
var config = new EndpointConfiguration("Custom Gateway - SiteA");
config.UseTransport(new LearningTransport());
var gatewaySettings = config.Gateway(new NonDurableDeduplicationConfiguration());
gatewaySettings.ChannelFactories(
s => new WebSocketChannelSender(),
s => new WebSocketChannelReceiver()
);
gatewaySettings.AddReceiveChannel(
address: "ws://localhost:33334/SiteA",
type: "WebSocket",
isDefault: true);
gatewaySettings.AddSite(
siteKey: "SiteB",
address: "ws://localhost:33335/SiteB",
type: "WebSocket");
SiteB
An endpoint that receives a message sent from SiteA via the WebSocket gateway.
Gateway and endpoint configuration
- Maps the site key
SiteA
tows:/
/ localhost:33334/ SiteA - Receives incoming messages on
ws:/
/ localhost:33335/ SiteB - Enables the gateway
- Replaces the default channel factories
var config = new EndpointConfiguration("Custom Gateway - SiteB");
var transport = config.UseTransport(new LearningTransport());
transport.RouteToEndpoint(typeof(SomeMessage), "Custom Gateway - SiteB");
var gatewaySettings = config.Gateway(new NonDurableDeduplicationConfiguration());
gatewaySettings.ChannelFactories(
s => new WebSocketChannelSender(),
s => new WebSocketChannelReceiver()
);
gatewaySettings.AddReceiveChannel(
address: "ws://localhost:33335/SiteB",
type: "WebSocket",
isDefault: true);
gatewaySettings.AddSite(
siteKey: "SiteA",
address: "ws://localhost:33334/SiteA",
type: "WebSocket");
WebSocketGateway
A shared library that contains the implementations for the WebSocket gateway channels
WebSocketGatewayChannelSender
- Connects to a remote WebSocket server
- Serializes outgoing messages
public class WebSocketChannelSender :
IChannelSender
{
public Task Send(string remoteAddress, IDictionary<string, string> headers, Stream data, CancellationToken cancellationToken = new CancellationToken())
{
using (var webSocket = new WebSocket(remoteAddress))
{
webSocket.Connect();
var bytes = GetBytes(headers, data);
webSocket.Send(bytes);
webSocket.Close();
}
return Task.CompletedTask;
}
static byte[] GetBytes(IDictionary<string, string> headers, Stream data)
{
using (var stream = new MemoryStream())
{
var binaryFormatter = new BinaryFormatter();
binaryFormatter.Serialize(stream, headers);
data.CopyTo(stream);
return stream.ToArray();
}
}
}
WebSocketGatewayChannelReceiver
- Listens for incoming WebSocket connections
- Deserializes incoming messages
public class WebSocketChannelReceiver :
IChannelReceiver
{
public void Start(string address, int maxConcurrency, Func<DataReceivedOnChannelEventArgs, CancellationToken, Task> dataReceivedOnChannel)
{
var uri = new Uri(address);
server = new WebSocketServer(uri.GetLeftPart(UriPartial.Authority));
server.AddWebSocketService(uri.AbsolutePath,
initializer: () =>
{
return new WebSocketMessageBehavior(dataReceivedOnChannel);
});
server.Start();
}
public Task Stop(CancellationToken token)
{
server?.Stop();
return Task.CompletedTask;
}
WebSocketServer server;
}