The transport infrastructure can be used directly without the need to spin up a full NServiceBus endpoint. This is especially useful when integrating with third-party systems and when building message gateways or bridges.
Configuration
Configuration of the messaging infrastructure is done via the Initialize method:
var transport = new LearningTransport();
var hostSettings = new HostSettings(
name: "MyRawEndpoint",
hostDisplayName: "My Raw Endpoint",
startupDiagnostic: new StartupDiagnosticEntries(),
criticalErrorAction: (message, exception, token) =>
{
Console.WriteLine("Critical error: " + exception);
},
setupInfrastructure: true);
var infrastructure = await transport.Initialize(hostSettings, new[]
{
new ReceiveSettings(
id: "Primary",
receiveAddress: new QueueAddress("MyQueue"),
usePublishSubscribe: false,
purgeOnStartup: false,
errorQueue: "error")
}, new string[0]);
var sender = infrastructure.Dispatcher;
Sending
The following code sends a message to another endpoint using an IMessageDispatcher that is part of the initialized infrastructure:
var body = Serialize();
var headers = new Dictionary<string, string>
{
["SomeHeader"] = "SomeValue"
};
var request = new OutgoingMessage(
messageId: Guid.NewGuid().ToString(),
headers: headers,
body: body);
var operation = new TransportOperation(
request,
new UnicastAddressTag("Receiver"));
await sender.Dispatch(
outgoingMessages: new TransportOperations(operation),
transaction: new TransportTransaction());
Receiving
The following code starts the configured receiver (identified by ID "Primary"). Each infrastructure object can contain multiple receivers and each receiver can be started separately. Once stopped, receivers cannot be restarted; if pause-like functionality is required, it is necessary to create a new infrastructure object on each pause resume.
var receiver = infrastructure.Receivers["Primary"];
await receiver.Initialize(new PushRuntimeSettings(8),
onMessage: (context, token) =>
{
var message = Deserialize(context.Body);
return Console.Out.WriteLineAsync(message);
},
onError: (context, token) => Task.FromResult(ErrorHandleResult.RetryRequired));
await receiver.StartReceive();
The TransportTransaction object can be passed from the receiving context to the dispatcher, in order to ensure transactions span both send and receive operations. It's important to ensure that the underlying transport infrastructure supports the SendsAtomicWithReceive transaction mode when using this option.
Shutting down
await receiver.StopReceive();
await infrastructure.Shutdown();
Before shutting down the infrastructure, be sure to stop all the receivers.