Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Simple SQL Persistence Usage

Component: Sql Persistence
NuGet Package: NServiceBus.Persistence.Sql (8.x)
Target Version: NServiceBus 9.x

This sample shows a client/server scenario.

Prerequisites

MS SQL Server

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

MySQL

Ensure an instance of MySQL (Version 5.7 or above) is installed and accessible on localhost and port 3306. A Docker image can be used to accomplish this by running docker run --name mysql -e 'MYSQL_ROOT_PASSWORD=yourStrong(!)Password' -p 3306:3306 -d mysql:latest in a terminal.

Alternatively, change the connection string to point to different MySQL instance.

At startup each endpoint will create the required SQL assets including databases, tables, and schemas.

Oracle

Ensure an instance of Oracle (Version 11g or later) is installed and accessible on localhost and port 1521. A Docker image can be used to accomplish this by running docker run --name oracle -e 'ORACLE_PASSWORD=yourStrong(!)Password' -p 1521:1521 -d gvenzl/oracle-free:23-slim in a terminal.

Alternatively, change the connection string to point to different Oracle instance.

At startup each endpoint will create the required SQL assets including databases, tables, and schemas.

PostgreSQL

Ensure an instance of PostgreSQL (Version 10 or later) is installed and accessible on localhost and port 5432. A Docker image can be used to accomplish this by running docker run --name postgres -e 'POSTGRES_PASSWORD=yourStrong(!)Password' -p 5432:5432 -d postgres:latest in a terminal.

Alternatively, change the connection string to point to different PostgreSQL instance.

At startup each endpoint will create the required SQL assets including databases, tables, and schemas.

Projects

SharedMessages

The shared message contracts used by all endpoints.

ServerShared

Contains the OrderSaga functionality and is referenced by the Server endpoints

Client

  • Sends the StartOrder message to either EndpointMySql or EndpointSqlServer.
  • Receives and handles the OrderCompleted event.

Server projects

  • EndpointMySql, EndpointSqlServer, and EndpointOracle projects act as "servers" to run the saga instance.
  • Receive the StartOrder message and initiate an OrderSaga.
  • OrderSaga requests a timeout with an instance of CompleteOrder with the saga data.
  • OrderSaga publishes an OrderCompleted event when the CompleteOrder timeout fires.

SQL scripts

Note that only ServerShared has the NServiceBus.Persistence.Sql NuGet package directly referenced. This will cause the script directory ServerShared\bin\Debug\[TFM]\NServiceBus.Persistence.Sql\[Variant] to be populated at build time.

These scripts will be copied to the output of each endpoint and executed at startup.

The endpoints know which scripts to execute via the persistence.SqlVariant(); API at configuration time.

The scripts produced in this sample are promoted to $(SolutionDir)PromotedSqlScripts.

[assembly: SqlPersistenceSettings(
    MsSqlServerScripts = true,
    MySqlScripts = true,
    OracleScripts = true,
    PostgreSqlScripts = true,
    ScriptPromotionPath = "$(SolutionDir)PromotedSqlScripts")]

Persistence config

Configure the endpoint to use SQL Persistence.

MS SQL Server

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();

// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlPersistence;Integrated Security=True;Encrypt=false
var connectionString = "Server=localhost,1433;Initial Catalog=NsbSamplesSqlPersistence;User Id=SA;Password=yourStrong(!)Password;Encrypt=false";

persistence.ConnectionBuilder(() => new SqlConnection(connectionString));

MySQL

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MySql>();

var connection = "server=localhost;user=root;database=sqlpersistencesample;port=3306;password=yourStrong(!)Password;AllowUserVariables=True;AutoEnlist=false";

persistence.ConnectionBuilder(() => new MySqlConnection(connection));

Oracle

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.Oracle>();

var connection = "Data Source=localhost;User Id=SYSTEM; Password=yourStrong(!)Password; Enlist=false";

persistence.ConnectionBuilder(() => new OracleConnection(connection));

PostgreSql

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();

var connection = "Host=localhost;Username=postgres;Password=yourStrong(!)Password;Database=NsbSamplesSqlPersistence";

persistence.ConnectionBuilder(() => new NpgsqlConnection(connection));

Order saga data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(msg => msg.OrderId).ToSaga(saga => saga.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        MarkAsComplete();
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        return context.Publish(orderCompleted);
    }
}

Querying the saga data

SQL persistence uses the Newtonsoft.Json package to serialize saga data and metadata.

The saga data can be queried using the JSON querying capabilities of SQL Server. It is stored inside the Data column and can be queried as shown here:

SELECT [Correlation_OrderId], OrderData.OrderDescription
FROM [NsbSamplesSqlPersistence].[dbo].[Samples_SqlPersistence_EndpointSqlServer_OrderSaga]
CROSS APPLY OPENJSON([Data]) WITH
(
   OrderId NVARCHAR(500) N'$.OrderId',
   OrderDescription NVARCHAR(2000) N'$.OrderDescription'
) as OrderData

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.