Prerequisites
An instance of SQL Server Express installed and accessible as .
.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
The databases created by this sample are NsbSamplesStoreAndForwardReceiver
and NsbSamplesStoreAndForwardSender
.
Running the project
- Start the Sender project (right-click the project, select
Debug > Start new instance
). - Start the Receiver project.
- In the Sender's console,
Press
appears when the app is ready.<enter> to send a message - Press enter.
Verifying the sample works correctly
- The Receiver indicates that the order was submitted.
- The Sender indicates that the order was accepted.
- Press enter in the Receiver console to shut it down.
- Go to SQL Server Management Studio and delete the
NsbSamplesStoreAndForwardReceiver
database. - Press enter again in the Sender console
- Notice that the retry mechanism kicks in after 10 seconds and retries sending the message to the destination database.
- Restart the Receiver project.
- Notice that the message has been delivered to the Receiver.
Code walk-through
When the SQL Server transport is used in multi-instance mode, the messages are inserted directly into the remote destination database's table. If the receiving endpoint's database is down or inaccessible (for example, because of network failures), the sending endpoint can't send messages to it. In this situation, the exception is thrown from the Send()
or the Publish()
methods, resulting in a potential message loss.
The message loss problem can be prevented by adding store-and-forward functionality to the SQL Server transport, as explained in this sample.
The sample contains three projects:
- Shared - A class library containing common code including messages definitions.
- Sender - A console application responsible for sending the initial
OrderSubmitted
message and processing the follow-upOrderAccepted
message. - Receiver - A console application responsible for processing the
OrderSubmitted
message.
Sender and Receiver use different databases, just like in a production scenario where two systems are integrated using NServiceBus. Each database contains, apart from business data, queues for the NServiceBus endpoint.
Sender project
The Sender does not store any data. It mimics the front-end system where orders are submitted by the users and passed via the bus to the back-end. It is configured to use the SQL Server transport.
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.EnableLegacyMultiInstanceMode(GetConnecton);
var pipeline = endpointConfiguration.Pipeline;
pipeline.Register(
stepId: "Forward",
behavior: new ForwardBehavior(),
description: "Forwards messages to destinations.");
pipeline.Register("Store",
factoryMethod: builder =>
{
var localAddress = builder.Build<ReadOnlySettings>().LocalAddress();
return new SendThroughLocalQueueRoutingToDispatchConnector(localAddress);
},
description: "Send messages through local endpoint.");
The Sender registers two custom behaviors, one for the send pipeline and one for the receive pipeline.
Send pipeline
The new behavior is added at the beginning of the send pipeline (in version 2) or in the routing stage (in version 3).
class SendThroughLocalQueueRoutingToDispatchConnector :
ForkConnector<IRoutingContext, IDispatchContext>
{
public SendThroughLocalQueueRoutingToDispatchConnector(string localAddress)
{
this.localAddress = localAddress;
}
public override Task Invoke(IRoutingContext context, Func<Task> next, Func<IDispatchContext, Task> fork)
{
var fromHandler = context.Extensions.TryGet(out IncomingMessage _);
if (context.Extensions.TryGetDeliveryConstraint(out DelayedDeliveryConstraint _) || fromHandler)
{
return next();
}
var operations = context.RoutingStrategies
.Select(s => RouteThroughLocalEndpointInstance(s, context))
.ToArray();
return fork(new DispatchContext(operations, context));
}
TransportOperation RouteThroughLocalEndpointInstance(RoutingStrategy routingStrategy, IRoutingContext context)
{
var outgoingMessage = context.Message;
var headers = new Dictionary<string, string>(outgoingMessage.Headers);
var originalTag = routingStrategy.Apply(headers);
var unicastTag = originalTag as UnicastAddressTag;
if (unicastTag == null)
{
var multicastTag = originalTag as MulticastAddressTag;
if (multicastTag == null)
{
throw new Exception($"Unsupported type of address tag: {originalTag.GetType().FullName}");
}
headers["$.store-and-forward.eventtype"] = multicastTag.MessageType.AssemblyQualifiedName;
}
else
{
headers["$.store-and-forward.destination"] = unicastTag.Destination;
}
var message = new OutgoingMessage(outgoingMessage.MessageId, headers, outgoingMessage.Body);
return new TransportOperation(
message: message,
addressTag: new UnicastAddressTag(localAddress),
requiredDispatchConsistency: DispatchConsistency.Default,
deliveryConstraints: context.Extensions.GetDeliveryConstraints());
}
string localAddress;
class DispatchContext :
ContextBag,
IDispatchContext
{
TransportOperation[] operations;
public ContextBag Extensions => this;
public IBuilder Builder => Get<IBuilder>();
public DispatchContext(TransportOperation[] operations, IBehaviorContext parentContext)
: base(parentContext?.Extensions)
{
this.operations = operations;
}
public IEnumerable<TransportOperation> Operations => operations;
}
}
The behavior ignores:
- Messages sent from a handler, because the incoming message will be retried ensuring the outgoing messages are eventually delivered.
- Deferred messages, because their destination is the local timeout manager satellite.
The behavior captures the destination of the message in a header and overrides the original value so that the message is actually sent to the local endpoint (at the end of the endpoint's incoming queue).
Receive pipeline
In the receive pipeline, the new behavior is placed just before loading the message handlers (in version 2) or in the physical processing stage (in version 3).
var message = context.Message;
var headers = message.Headers;
var body = message.Body;
if (headers.TryGetValue("$.store-and-forward.destination", out var destination))
{
var operation = new TransportOperation(
message: new OutgoingMessage(context.MessageId, headers, body),
addressTag: new UnicastAddressTag(destination));
return fork(new DispatchContext(operation, context));
}
if (headers.TryGetValue("$.store-and-forward.eventtype", out var eventtype))
{
var messageType = Type.GetType(eventtype, true);
var operation = new TransportOperation(
message: new OutgoingMessage(context.MessageId, headers, body),
addressTag: new MulticastAddressTag(messageType));
return fork(new DispatchContext(operation, context));
}
return next();
If the message contains the headers used by the sender-side behavior, it is forwarded to the ultimate destination instead of being processed locally. This is the first time the remote database of the receiver endpoint is contacted. Should it be down, the retry mechanism kicks in and ensures the message is eventually delivered to the destination. In this example, the retry mechanism is configured to retry every 10 seconds up to 100 times.
var recoverability = endpointConfiguration.Recoverability();
recoverability.Delayed(
customizations: delayed =>
{
delayed.NumberOfRetries(100);
delayed.TimeIncrease(TimeSpan.FromSeconds(10));
});
ForwardBehavior
has to include a TransactionScope
that suppresses the ambient transaction before forwarding the message.Receiver project
The Receiver mimics a back-end system. The following code configures it to use multi-instance mode of the SQL Server transport.
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
var routing = transport.Routing();
routing.RegisterPublisher(
eventType: typeof(OrderSubmitted),
publisherEndpoint: "Samples.SqlServer.StoreAndForwardSender");
transport.EnableLegacyMultiInstanceMode(async address =>
{
string connectionString;
if (address.StartsWith("Samples.SqlServer.StoreAndForwardReceiver") ||
address == "error")
{
connectionString = Connections.ReceiverConnectionString;
}
else
{
connectionString = Connections.SenderConnectionString;
}
var connection = new SqlConnection(connectionString);
await connection.OpenAsync()
.ConfigureAwait(false);
return connection;
});