This sample shows a client/server scenario.
By default all endpoints are started when the solution is run, which means that the sample requires all databases (i.e. SQL Server, MySQL, Oracle, PostreSQL) to be configured to run correctly. In order to run the sample with just one database, disable the relevant endpoints.
Prerequisites
MS SQL Server
- Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on
localhost
and port1433
.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
MySQL
- Ensure an instance of MySQL (Version 5.7 or above) is installed and accessible on
localhost
and port3306
. - Add the username to access the instance to an environment variable named
MySqlUserName
. - Add the password to access the instance to an environment variable named
MySqlPassword
.
Alternatively, change the connection string to point to different MySQL instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
Oracle
- Ensure an instance of Oracle Database (Version 11g or later) is installed and accessible on
localhost
on port1521
with service nameXE
. - Add the username to access the instance to an environment variable named
OracleUserName
. - Add the password to access the instance to an environment variable named
OraclePassword
.
Alternatively, change the connection string to point to different Oracle instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
PostgreSQL
- Ensure an instance of PostgreSQL (Version 10 or later) is installed and accessible on
localhost
. - Add the username to access the instance to an environment variable named
PostgreSqlUserName
. - Add the password to access the instance to an environment variable named
PostgreSqlPassword
.
Alternatively, change the connection string to point to different PostgreSQL instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
Projects
SharedMessages
The shared message contracts used by all endpoints.
ServerShared
Contains the OrderSaga
functionality and is referenced by the Server endpoints
Client
- Sends the
StartOrder
message to eitherEndpointMySql
orEndpointSqlServer
. - Receives and handles the
OrderCompleted
event.
Server projects
EndpointMySql
,EndpointSqlServer
, andEndpointOracle
projects act as "servers" to run the saga instance.- Receive the
StartOrder
message and initiate anOrderSaga
. OrderSaga
requests a timeout with an instance ofCompleteOrder
with the saga data.OrderSaga
publishes anOrderCompleted
event when theCompleteOrder
timeout fires.
SQL scripts
Note that only ServerShared
has the NServiceBus.Persistence.Sql NuGet package directly referenced. This will cause the script directory ServerShared\
to be populated at build time.
These scripts will be copied to the output of each endpoint and executed at startup.
The endpoints know which scripts to execute via the persistence.
API at configuration time.
The scripts produced in this sample are promoted to $(SolutionDir)PromotedSqlScripts
.
[assembly: SqlPersistenceSettings(
MsSqlServerScripts = true,
MySqlScripts = true,
OracleScripts = true,
PostgreSqlScripts = true,
ScriptPromotionPath = "$(SolutionDir)PromotedSqlScripts")]
Persistence config
Configure the endpoint to use SQL Persistence.
MS SQL Server
var endpointConfiguration = new EndpointConfiguration("Samples.SqlPersistence.EndpointSqlServer");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlPersistence;Integrated Security=True;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlPersistence;User Id=SA;Password=yourStrong(!)Password;Encrypt=false";
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () => new SqlConnection(connectionString));
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
MySQL
var endpointConfiguration = new EndpointConfiguration("Samples.SqlPersistence.EndpointMySql");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("MySqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'MySqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("MySqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'MySqlUserName' from Environment variables.");
}
var connection = $"server=localhost;user={username};database=sqlpersistencesample;port=3306;password={password};AllowUserVariables=True;AutoEnlist=false";
persistence.SqlDialect<SqlDialect.MySql>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new MySqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
Oracle
var endpointConfiguration = new EndpointConfiguration("EndpointOracle");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("OraclePassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'OraclePassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("OracleUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'OracleUserName' from Environment variables.");
}
var connection = $"Data Source=localhost;User Id={username}; Password={password}; Enlist=false";
persistence.SqlDialect<SqlDialect.Oracle>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new OracleConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
PostgreSql
var endpointConfiguration = new EndpointConfiguration("EndpointPostgreSql");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("PostgreSqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'PostgreSqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("PostgreSqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'PostgreSqlUserName' from Environment variables.");
}
var connection = $"Host=localhost;Username={username};Password={password};Database=NsbSamplesSqlPersistence";
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
dialect.JsonBParameterModifier(
modifier: parameter =>
{
var npgsqlParameter = (NpgsqlParameter)parameter;
npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
});
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new NpgsqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
Order saga data
public class OrderSagaData :
ContainSagaData
{
public Guid OrderId { get; set; }
public string OrderDescription { get; set; }
}
Order saga
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleTimeouts<CompleteOrder>
{
static ILog log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.ConfigureMapping<StartOrder>(msg => msg.OrderId).ToSaga(saga => saga.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var orderDescription = $"The saga for order {message.OrderId}";
Data.OrderDescription = orderDescription;
log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");
var shipOrder = new ShipOrder
{
OrderId = message.OrderId
};
log.Info("Order will complete in 5 seconds");
var timeoutData = new CompleteOrder
{
OrderDescription = orderDescription
};
return Task.WhenAll(
context.SendLocal(shipOrder),
RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
);
}
public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} completed");
MarkAsComplete();
var orderCompleted = new OrderCompleted
{
OrderId = Data.OrderId
};
return context.Publish(orderCompleted);
}
}
Querying the saga data
SQL persistence uses the Newtonsoft.Json package to serialize saga data and metadata.
The saga data can be queried using the JSON querying capabilities of SQL Server. It is stored inside the Data
column and can be queried as shown here:
SELECT [Correlation_OrderId], OrderData.OrderDescription
FROM [NsbSamplesSqlPersistence].[dbo].[Samples_SqlPersistence_EndpointSqlServer_OrderSaga]
CROSS APPLY OPENJSON([Data]) WITH
(
OrderId NVARCHAR(500) N'$.OrderId',
OrderDescription NVARCHAR(2000) N'$.OrderDescription'
) as OrderData