Raw messaging using NServiceBus

Source
NuGet Package NServiceBus (8-pre) | License
This is a community-maintained project. License and support are independent of Particular Software.
This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

Starting with NServiceBus version 8, the transport infrastructure can be used directly without the need to spin up a full NServiceBus endpoint. This is especially useful when integrating with third-party systems and when building message gateways or bridges.

Configuration

Configuration of the messaging infrastructure is done via the Initialize method:

var transport = new LearningTransport();

var hostSettings = new HostSettings(
    name: "MyRawEndpoint",
    hostDisplayName: "My Raw Endpoint",
    startupDiagnostic: new StartupDiagnosticEntries(),
    criticalErrorAction: (message, exception, token) =>
    {
        Console.WriteLine("Critical error: " + exception);
    },
    setupInfrastructure: true);

var infrastructure = await transport.Initialize(hostSettings, new[]
    {
        new ReceiveSettings(
            id: "Primary",
            receiveAddress: "MyQueue",
            usePublishSubscribe: false,
            purgeOnStartup: false,
            errorQueue: "error")
    }, new string[0])
    .ConfigureAwait(false);

var sender = infrastructure.Dispatcher;

Sending

The following code sends a message to another endpoint using an IMessageDispatcher that is part of the initialized infrastructure:

var body = Serialize();
var headers = new Dictionary<string, string>
{
    ["SomeHeader"] = "SomeValue"
};
var request = new OutgoingMessage(
    messageId: Guid.NewGuid().ToString(),
    headers: headers,
    body: body);

var operation = new TransportOperation(
    request,
    new UnicastAddressTag("Receiver"));

await sender.Dispatch(
        outgoingMessages: new TransportOperations(operation),
        transaction: new TransportTransaction())
    .ConfigureAwait(false);

Receiving

The following code starts the configured receiver (identified by ID "Primary"). Each infrastructure object can contain multiple receivers. Each receiver can be started separately. Once stopped, receivers cannot be restarted. If pause functionality is needed, create a new infrastructure object each time.

var receiver = infrastructure.Receivers["Primary"];
await receiver.Initialize(new PushRuntimeSettings(8),
    onMessage: (context, token) =>
    {
        var message = Deserialize(context.Body);
        return Console.Out.WriteLineAsync(message);
    },
    onError: (context, token) => Task.FromResult(ErrorHandleResult.RetryRequired));

await receiver.StartReceive().ConfigureAwait(false);

Shutting down

await receiver.StopReceive().ConfigureAwait(false);
await infrastructure.Shutdown().ConfigureAwait(false);

Before shutting down the infrastructure, be sure to stop all the receivers.


Last modified