Simple SQL Persistence Usage

Component: Sql Persistence
NuGet Package NServiceBus.Persistence.Sql (3.x)
Target NServiceBus Version: 7.x

This sample shows a Client + Server scenario.

Prerequisites

MS SQL Server

  1. Ensure an instance of SQL Server Express (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible as .\SqlExpress.

Or, alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

MySQL

  1. Ensure an instance of MySQL (Version 5.7 or above) is installed and accessible on localhost and port 3306.
  2. Add the username to access the instance to an environment variable named MySqlUserName.
  3. Add the password to access the instance to an environment variable named MySqlPassword.

Or, alternatively, change the connection string to point to different MySQL instance.

Oracle

  1. Ensure an instance of Oracle Database (Version 11g or later) is installed and accessible on localhost on port 1521 with service name XE.
  2. Add the username to access the instance to an environment variable named OracleUserName.
  3. Add the password to access the instance to an environment variable named OraclePassword.

Or, alternatively, change the connection string to point to different Oracle instance.

PostgreSQL

  1. Ensure an instance of PostgreSQL (Version 10 or later) is installed and accessible on localhost.
  2. Add the username to access the instance to an environment variable named PostgreSqlUserName.
  3. Add the password to access the instance to an environment variable named PostgreSqlPassword.

Or, alternatively, change the connection string to point to different PostgreSQL instance.

Projects

SharedMessages

The shared message contracts used by all endpoints.

ServerShared

Contains the OrderSaga functionality and is referenced by the Server endpoints

Client

  • Sends the StartOrder message to either EndpointMySql or EndpointSqlServer.
  • Receives and handles the OrderCompleted event.

Servers

  • EndpointMySql, EndpointSqlServer, and EndpointOracle projects act as "servers" to run the saga instance.
  • Receive the StartOrder message and initiate a OrderSaga.
  • OrderSaga requests a timeout with a CompleteOrder data.
  • When the CompleteOrder timeout fires the OrderSaga publishes a OrderCompleted event.

SQL Scripts

Note that only ServerShared has the NServiceBus.Persistence.Sql.MsBuild NuGet package installed. This will cause the script director ServerShared\bin\Debug\NServiceBus.Persistence.Sql\[Variant] to be populated at build time.

These scripts will then be copied to the output of each endpoint and executed at startup.

The endpoints know which scripts to execute via the use of the persistence.SqlVariant(); API usage at configuration time.

The scripts produced in this sample are promoted to $(SolutionDir)PromotedSqlScripts.

[assembly: SqlPersistenceSettings(
    MsSqlServerScripts = true,
    MySqlScripts = true,
    OracleScripts = true,
    PostgreSqlScripts = true,
    ScriptPromotionPath = "$(SolutionDir)PromotedSqlScripts")]

Persistence Config

Configure the endpoint to use SQL Persistence.

MS SQL Server

var endpointConfiguration = new EndpointConfiguration("Samples.SqlPersistence.EndpointSqlServer");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var connection = @"Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlPersistence;Integrated Security=True";
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connection);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

MySQL

var endpointConfiguration = new EndpointConfiguration("Samples.SqlPersistence.EndpointMySql");

var transport = endpointConfiguration.UseTransport<MsmqTransport>();
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();

var password = Environment.GetEnvironmentVariable("MySqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
    throw new Exception("Could not extract 'MySqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("MySqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
    throw new Exception("Could not extract 'MySqlUserName' from Environment variables.");
}
var connection = $"server=localhost;user={username};database=sqlpersistencesample;port=3306;password={password};AllowUserVariables=True;AutoEnlist=false";
persistence.SqlDialect<SqlDialect.MySql>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new MySqlConnection(connection);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

Oracle

var endpointConfiguration = new EndpointConfiguration("EndpointOracle");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("OraclePassword");
if (string.IsNullOrWhiteSpace(password))
{
    throw new Exception("Could not extract 'OraclePassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("OracleUserName");
if (string.IsNullOrWhiteSpace(username))
{
    throw new Exception("Could not extract 'OracleUserName' from Environment variables.");
}
var connection = $"Data Source=localhost;User Id={username}; Password={password}; Enlist=false";
persistence.SqlDialect < SqlDialect.Oracle>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new OracleConnection(connection);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

PostgreSql

var endpointConfiguration = new EndpointConfiguration("EndpointPostgreSql");

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("PostgreSqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
    throw new Exception("Could not extract 'PostgreSqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("PostgreSqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
    throw new Exception("Could not extract 'PostgreSqlUserName' from Environment variables.");
}

var connection = $"Host=localhost;Username={username};Password={password};Database=NsbSamplesSqlPersistence";
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
dialect.JsonBParameterModifier(
    modifier: parameter =>
    {
        var npgsqlParameter = (NpgsqlParameter)parameter;
        npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
    });
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new NpgsqlConnection(connection);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

Order Saga Data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order Saga

public class OrderSaga :
    SqlSaga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureMapping(IMessagePropertyMapper mapper)
    {
        mapper.ConfigureMapping<StartOrder>(_ => _.OrderId);
    }

    protected override string CorrelationPropertyName => nameof(OrderSagaData.OrderId);

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified