SQL Persistence Saga Finding Logic

Component: Sql Persistence | Nuget: NServiceBus.Persistence.Sql (Version: 2.x)
Target NServiceBus Version: 6.x

Code walk-through

When the default Saga message mappings do not satisfy the requirements, custom logic can be put in place to allow NServiceBus to find a saga data instance based on which logic best suites the environment.

Prerequisites

MS SQL Server

  1. Ensure an instance of SQL Server Express (Version 2016 or above) is installed and accessible as .\SQLEXPRESS. Create a database SqlPersistenceSample. Or, alternatively, change the connection string to point to different SQL Server instance

MySql

  1. Ensure an instance of MySql (Version 5.7 or above) is installed and accessible as on localhost and port 3306.
  2. Add the username to access the instance to an environment variable named MySqlUserName
  3. Add the password to access the instance to an environment variable named MySqlPassword

Or, alternatively, change the connection string to point to different MySql instance

Persistence Config

Configure the endpoint to use SQL Persistence.

MS SQL Server

Edit
var connection = @"Data Source=.\SqlExpress;Database=SqlPersistenceSample;Integrated Security=True";
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connection);
    });
persistence.SqlVariant(SqlVariant.MsSqlServer);
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

MySql

Edit
var transport = endpointConfiguration.UseTransport<MsmqTransport>();
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("MySqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
    throw new Exception("Could not extract 'MySqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("MySqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
    throw new Exception("Could not extract 'MySqlUserName' from Environment variables.");
}
var connection = $"server=localhost;user={username};database=sqlpersistencesample;port=3306;password={password};AllowUserVariables=True;AutoEnlist=false";
persistence.SqlVariant(SqlVariant.MySql);
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new MySqlConnection(connection);
    });
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

The Saga

The saga shown in the sample is a very simple order management saga that:

  • Handles the creation of an order.
  • Offloads the payment process to a different handler.
  • Handles the completion of the payment process.
  • Completes the order.
Edit
public class OrderSaga :
    SqlSaga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<PaymentTransactionCompleted>,
    IHandleMessages<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureMapping(IMessagePropertyMapper mapper)
    {
        mapper.ConfigureMapping<StartOrder>(_ => _.OrderId);
        mapper.ConfigureMapping<CompleteOrder>(_ => _.OrderId);
    }

    protected override string CorrelationPropertyName => nameof(OrderSagaData.OrderId);

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.PaymentTransactionId = Guid.NewGuid().ToString();

        log.Info($"Saga with OrderId {Data.OrderId} received StartOrder with OrderId {message.OrderId}");
        var issuePaymentRequest = new IssuePaymentRequest
        {
            PaymentTransactionId = Data.PaymentTransactionId
        };
        return context.SendLocal(issuePaymentRequest);
    }

    public Task Handle(PaymentTransactionCompleted message, IMessageHandlerContext context)
    {
        log.Info($"Transaction with Id {Data.PaymentTransactionId} completed for order id {Data.OrderId}");
        var completeOrder = new CompleteOrder
        {
            OrderId = Data.OrderId
        };
        return context.SendLocal(completeOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} received CompleteOrder with OrderId {message.OrderId}");
        MarkAsComplete();
        return Task.CompletedTask;
    }

}

From the process point of view it is important to note that the saga is not sending to the payment processor the order id, instead it is sending a payment transaction id. A saga can be correlated given more than one unique attribute, such as OrderId and PaymentTransactionId, requiring both to be treated as unique.

Saga Finders

A Saga Finder is only required for the PaymentTransactionCompleted message since the other messages (StartOrder and CompleteOrder) are correlated based on OrderSagaData.OrderId.

MS SQL Server

Edit
class OrderSagaFinder :
    IFindSagas<OrderSagaData>.Using<PaymentTransactionCompleted>
{
    public Task<OrderSagaData> FindBy(PaymentTransactionCompleted message, SynchronizedStorageSession session, ReadOnlyContextBag context)
    {
        return session.GetSagaData<OrderSagaData>(
            context: context,
            whereClause: "JSON_VALUE(Data,'$.PaymentTransactionId') = @propertyValue",
            appendParameters: (builder, append) =>
            {
                var parameter = builder();
                parameter.ParameterName = "propertyValue";
                parameter.Value = message.PaymentTransactionId;
                append(parameter);
            });
    }
}

MySql

Edit
class OrderSagaFinder :
    IFindSagas<OrderSagaData>.Using<PaymentTransactionCompleted>
{
    public Task<OrderSagaData> FindBy(PaymentTransactionCompleted message, SynchronizedStorageSession session, ReadOnlyContextBag context)
    {
        return session.GetSagaData<OrderSagaData>(
            context: context,
            whereClause: "JSON_EXTRACT(Data,'$.PaymentTransactionId') = @propertyValue",
            appendParameters: (builder, append) =>
            {
                var parameter = builder();
                parameter.ParameterName = "propertyValue";
                parameter.Value = message.PaymentTransactionId;
                append(parameter);
            });
    }
}

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.
  • SQL Persistence

Last modified