In this sample, the SQL Server transport is used in conjunction with the SQL persister. The sample shows how to use the same database connection for both transport and persistence operations and how to access (using multiple ORMs) the current SQL connection and transaction from within a message handler to persist business objects to the database.
The persister is able to reuse the connection and transaction managed by SQL Server transport, so the endpoints can run in the TransportTransactionMode.
while ensuring exactly once message processing guarantees. This is true as long as persistence session APIs are used to access connection and transaction.
Prerequisites
Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost
and port 1433
. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.
in a terminal.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
The database created by this sample is NsbSamplesSql
.
Procedure
- Start the Sender and Receiver projects.
- In the Sender's console, press enter> to send a message when the app is ready.
- On the Receiver console, notice that the order was submitted.
- On the Sender console, notice that the order was accepted.
- Finally, after a couple of seconds, on the Receiver console, notice that the timeout message has been received.
- Open SQL Server Management Studio and go to the
NsbSamplesSql
database. Verify that there is a row in the saga state table (receiver.
) and in the orders table (OrderLifecycleSaga receiver.
)SubmittedOrder
Code walk-through
This sample contains three projects:
- Shared - A class library containing common code, including the message definitions.
- Sender - A console application responsible for sending the initial
OrderSubmitted
message and processing the follow-upOrderAccepted
message. - Receiver - A console application responsible for processing the order message.
Sender and Receiver use different schemas within one database. This creates a logical separation (since schemas can be secured independently) while retaining the benefits of having a single physical database. Apart from business data, each schema contains queues for the NServiceBus endpoint and tables for the NServiceBus persister. If no schema is specified, the transport will default to the dbo
schema.
Sender project
The Sender mimics the front-end system where orders are submitted by customers and passed as messages to the back-end. It is configured to use the SQL Server transport and the non-durable persister. Non-durable persistence is used since for the purposes of this sample, Sender does not need to persist data. The transport is configured to use sender
for the schema.
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSql;Integrated Security=True;Max Pool Size=100;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSql;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
var transport = new SqlServerTransport(connectionString)
{
DefaultSchema = "sender",
Subscriptions =
{
SubscriptionTableName = new SubscriptionTableName(
table: "Subscriptions",
schema: "dbo")
},
TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive
};
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.UseTransport(transport);
Receiver project
The Receiver mimics a back-end system. It is also configured to use the SQL Server transport and the SQL persister. The transport is configured to use receiver
for the schema and to send messages addressed to the receiver
endpoint to a different schema.
var endpointConfiguration = new EndpointConfiguration("Samples.Sql.Receiver");
endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.AuditProcessedMessagesTo("audit");
endpointConfiguration.EnableInstallers();
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSql;Integrated Security=True;Max Pool Size=100;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSql;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
var transport = new SqlServerTransport(connectionString)
{
DefaultSchema = "receiver",
TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive,
Subscriptions =
{
CacheInvalidationPeriod = TimeSpan.FromMinutes(1),
SubscriptionTableName = new SubscriptionTableName(table: "Subscriptions", schema: "dbo")
}
};
transport.SchemaAndCatalog.UseSchemaForQueue("error", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("audit", "dbo");
transport.SchemaAndCatalog.UseSchemaForQueue("Samples.Sql.Sender", "sender");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var routing = endpointConfiguration.UseTransport(transport);
routing.RouteToEndpoint(typeof(OrderAccepted), "Samples.Sql.Sender");
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var dialect = persistence.SqlDialect<SqlDialect.MsSqlServer>();
dialect.Schema("receiver");
persistence.ConnectionBuilder(() => new SqlConnection(connectionString));
persistence.TablePrefix("");
When the message arrives at the Receiver, a TransactionScope
is created that:
- Dequeues the message.
- Opens a DBConnection and starts a DBTransaction, supplying both to all handlers and sagas.
- Persists saga data for
OrderLifecycleSaga
. - Sends a reply message and s timeout request.
Script output
The Receiver contains an attribute in the SqlPersistenceSettings.cs file which instructs the SQL persister to generate table creation scripts for only Microsoft SQL Server.
[assembly: SqlPersistenceSettings(
MsSqlServerScripts = true)]
The SQL scripts, generated by the SQL persister at build time, will be created in the following directory:
Receiver\bin\Debug\net462\NServiceBus.Persistence.Sql
Accessing the ambient database details
When persisting data to the same database, it is recommended to use the same DBConnection and DBTransaction that is used by the underlying persister and transport. The approach to this differs depending on the approach used to persist the data. Several approaches to data access, including raw ADO.NET and several ORMs, are listed below.
The connection and transaction are managed by NServiceBus, so there is no need to explicitly commit a transaction or dispose the connection. Using the database state managed by NServiceBus ensures that database interactions, both in handlers and sagas, execute in the same connection and transaction context.
Raw ADO.NET
Persisting Data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
var sql = @"insert into receiver.SubmittedOrder
(Id, Value)
values (@Id, @Value)";
using (var command = new SqlCommand(
cmdText: sql,
connection: (SqlConnection)session.Connection,
transaction: (SqlTransaction)session.Transaction))
{
var parameters = command.Parameters;
parameters.AddWithValue("Id", $"Raw-{message.OrderId}");
parameters.AddWithValue("Value", message.Value);
await command.ExecuteNonQueryAsync(context.CancellationToken);
}
Dapper
Model
public class SubmittedOrder
{
public string Id { get; set; }
public int Value { get; set; }
}
Persisting Data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
var order = new SubmittedOrder
{
Id = $"Dapper-{message.OrderId}",
Value = message.Value,
};
var sql = @"insert into receiver.SubmittedOrder
(Id, Value)
values (@Id, @Value)";
return session.Connection.ExecuteAsync(sql: sql,
param: order,
transaction: session.Transaction);
EntityFramework
Model
The schema is defined using an attribute. The table name is redefined to be the class name as it is a required parameter.
[Table("SubmittedOrder", Schema = "receiver")]
public class SubmittedOrder
{
public string Id { get; set; }
public int Value { get; set; }
}
DBContext
Entity Framework requires an implementation of DBContext to persist data.
public class SubmittedOrderDbContext : DbContext
{
DbConnection connection;
public DbSet<SubmittedOrder> SubmittedOrder { get; set; }
public SubmittedOrderDbContext(DbConnection connection)
{
this.connection = connection;
}
protected override void OnConfiguring(DbContextOptionsBuilder options)
{
options.UseSqlServer(connection);
}
}
Persisting data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
var order = new SubmittedOrder
{
Id = $"EF-{message.OrderId}",
Value = message.Value,
};
using (var dbContext = new SubmittedOrderDbContext(session.Connection))
{
dbContext.Database.UseTransaction(session.Transaction);
await dbContext.SubmittedOrder.AddAsync(order, context.CancellationToken);
await dbContext.SaveChangesAsync(context.CancellationToken);
}
ServiceStack OrmLite
Model
The schema is defined using an attribute.
[Schema("receiver")]
public class SubmittedOrder
{
public string Id { get; set; }
public int Value { get; set; }
}
Persisting data
var session = context.SynchronizedStorageSession.SqlPersistenceSession();
var order = new SubmittedOrder
{
Id = $"OrmLite-{message.OrderId}",
Value = message.Value,
};
return session.Connection.UpdateAsync(
obj: order,
commandFilter: command => command.Transaction = session.Transaction,
context.CancellationToken);