Starting with NServiceBus version 8, the transport infrastructure can be used directly without the need to spin up a full NServiceBus endpoint. This is especially useful when integrating with third-party systems and when building message gateways or bridges.
Configuration
Configuration of the messaging infrastructure is done via the Initialize
method:
var transport = new LearningTransport();
var hostSettings = new HostSettings(
name: "MyRawEndpoint",
hostDisplayName: "My Raw Endpoint",
startupDiagnostic: new StartupDiagnosticEntries(),
criticalErrorAction: (message, exception, token) =>
{
Console.WriteLine("Critical error: " + exception);
},
setupInfrastructure: true);
var infrastructure = await transport.Initialize(hostSettings, new[]
{
new ReceiveSettings(
id: "Primary",
receiveAddress: new QueueAddress("MyQueue"),
usePublishSubscribe: false,
purgeOnStartup: false,
errorQueue: "error")
}, new string[0]);
var sender = infrastructure.Dispatcher;
Sending
The following code sends a message to another endpoint using an IMessageDispatcher
that is part of the initialized infrastructure:
var body = Serialize();
var headers = new Dictionary<string, string>
{
["SomeHeader"] = "SomeValue"
};
var request = new OutgoingMessage(
messageId: Guid.NewGuid().ToString(),
headers: headers,
body: body);
var operation = new TransportOperation(
request,
new UnicastAddressTag("Receiver"));
await sender.Dispatch(
outgoingMessages: new TransportOperations(operation),
transaction: new TransportTransaction());
Receiving
The following code starts the configured receiver (identified by ID "Primary"
). Each infrastructure object can contain multiple receivers. Each receiver can be started separately. Once stopped, receivers cannot be restarted. If pause functionality is needed, create a new infrastructure object each time.
var receiver = infrastructure.Receivers["Primary"];
await receiver.Initialize(new PushRuntimeSettings(8),
onMessage: (context, token) =>
{
var message = Deserialize(context.Body);
return Console.Out.WriteLineAsync(message);
},
onError: (context, token) => Task.FromResult(ErrorHandleResult.RetryRequired));
await receiver.StartReceive();
Shutting down
await receiver.StopReceive();
await infrastructure.Shutdown();
Before shutting down the infrastructure, be sure to stop all the receivers.