Integrates the SQL Server transport with SQL persistence and ADO.NET user data store using outbox.
Prerequisites
Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost
and port 1433
. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.
in a terminal.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
The database created by this sample is NsbSamplesSqlOutbox
.
The outbox feature is designed to provide exactly once delivery guarantees without the Distributed Transaction Coordinator (DTC) running. Disable the DTC service to avoid seeing warning messages in the console window. If the DTC service is not disabled, when the sample project is started it will display a DtcRunningWarning
message in the console window.
Running the project
- Start the Solution
- The text
Press
is displayed in the Sender's console window.<enter> to send a message - Press enter to send a new message.
Verifying that the sample works correctly
- The Receiver displays information that an order was submitted.
- The Sender displays information that the order was accepted.
- After a few seconds, the Receiver displays confirmation that the timeout message has been received.
- Open SQL Server Management Studio and go to the
NsbSamplesSqlOutbox
database. Verify that there is a row in the saga state table (receiver.
) and in the orders table (OrderLifecycleSaga receiver.
)SubmittedOrder
Code walk-through
This sample contains three projects:
- Shared - A class library containing common code including the message definitions.
- Sender - A console application responsible for sending the initial
OrderSubmitted
message and processing the follow-upOrderAccepted
message. - Receiver - A console application responsible for processing the order message.
Sender and Receiver use different schemas in the same database. Apart from business data the database also contains tables representing queues for the NServiceBus endpoint and tables for NServiceBus persistence.
Sender project
The Sender does not store any data. It mimics the front-end system where orders are submitted by the users and passed via the bus to the back-end. It is configured to use the SQL Server transport with SQL persistence (backed by SQL Server) and the outbox feature.
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlOutbox;Integrated Security=True;Max Pool Size=100;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlOutbox;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
var transport = new SqlServerTransport(connectionString)
{
DefaultSchema = "sender",
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
};
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
endpointConfiguration.UseTransport(transport);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(connectionString);
});
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
dialect.Schema("sender");
persistence.TablePrefix("");
transport.Subscriptions.DisableCaching = true;
transport.Subscriptions.SubscriptionTableName = new SubscriptionTableName(
table: "Subscriptions",
schema: "dbo");
endpointConfiguration.EnableOutbox();
Receiver project
The Receiver mimics a back-end system. It is also configured to use the SQL Server transport with SQL persistence and the outbox. It uses ADO.NET to store business data (orders).
var transport = new SqlServerTransport(connectionString)
{
DefaultSchema = "receiver",
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
};
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
var routing = endpointConfiguration.UseTransport(transport);
routing.UseSchemaForEndpoint("Samples.SqlOutbox.Sender", "sender");
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(connectionString);
});
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
dialect.Schema("receiver");
persistence.TablePrefix("");
transport.Subscriptions.DisableCaching = true;
transport.Subscriptions.SubscriptionTableName = new SubscriptionTableName(
table: "Subscriptions",
schema: "dbo");
endpointConfiguration.EnableOutbox();
When the message arrives at the Receiver, it is dequeued using a native SQL Server transaction. Then a separate Outbox SQL Server transaction is created that encompasses
- persisting business data:
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
var sql = @"insert into receiver.SubmittedOrder
(Id, Value)
values (@Id, @Value)";
using (var command = new SqlCommand(
cmdText: sql,
connection: (SqlConnection)session.Connection,
transaction: (SqlTransaction)session.Transaction))
{
var parameters = command.Parameters;
parameters.AddWithValue("Id", message.OrderId);
parameters.AddWithValue("Value", message.Value);
await command.ExecuteNonQueryAsync(context.CancellationToken);
}
- persisting saga data of
OrderLifecycleSaga
, - storing the reply message and the timeout request in the Outbox:
public Task Handle(OrderSubmitted message, IMessageHandlerContext context)
{
var orderTimeout = new OrderTimeout();
return RequestTimeout(context, TimeSpan.FromSeconds(5), orderTimeout);
}
Once the outbox transaction is committed, both the business data changes and the outgoing messages are durably stored in the database. Finally the messages in the outbox are pushed to their destinations. The timeout message gets stored in the NServiceBus timeout store and is sent back to the saga after the requested delay of 5 seconds.
See Accessing the ambient database details for information about using other ORMs.