Code walk-through
When the default Saga message mappings do not satisfy the requirements, custom logic can be put in place to allow NServiceBus to find a saga data instance based on which logic best suits the environment.
Prerequisites
MS SQL Server
Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost
and port 1433
. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.
in a terminal.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
MySQL
Ensure an instance of MySQL (Version 5.7 or above) is installed and accessible on localhost
and port 3306
. A Docker image can be used to accomplish this by running docker run --name mysql -e 'MYSQL_ROOT_PASSWORD=yourStrong(!)Password' -p 3306:3306 -d mysql:latest
in a terminal.
Alternatively, change the connection string to point to different MySQL instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
Oracle
Ensure an instance of Oracle (Version 11g or later) is installed and accessible on localhost
and port 1521
. A Docker image can be used to accomplish this by running docker run --name oracle -e 'ORACLE_PASSWORD=yourStrong(!)Password' -p 1521:1521 -d gvenzl/
in a terminal.
Alternatively, change the connection string to point to different Oracle instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
PostgreSQL
Ensure an instance of PostgreSQL (Version 10 or later) is installed and accessible on localhost
and port 5432
. A Docker image can be used to accomplish this by running docker run --name postgres -e 'POSTGRES_PASSWORD=yourStrong(!)Password' -p 5432:5432 -d postgres:latest
in a terminal.
Alternatively, change the connection string to point to different PostgreSQL instance.
At startup each endpoint will create the required SQL assets including databases, tables, and schemas.
Persistence Config
Configure the endpoint to use SQL Persistence.
MS SQL Server
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlSagaFinder;Integrated Security=True;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlSagaFinder;User Id=SA;Password=yourStrong(!)Password;Encrypt=false";
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(connectionString);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
MySql
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("MySqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'MySqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("MySqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'MySqlUserName' from Environment variables.");
}
var connection = $"server=localhost;user={username};database=sqlpersistencesample;port=3306;password={password};AllowUserVariables=True;AutoEnlist=false";
persistence.SqlDialect<SqlDialect.MySql>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new MySqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
PostgreSql
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("PostgreSqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'PostgreSqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("PostgreSqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'PostgreSqlUserName' from Environment variables.");
}
var connection = $"Host=localhost;Username={username};Password={password};Database=NsbSamplesSqlSagaFinder";
persistence.TablePrefix("Finder");
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
dialect.JsonBParameterModifier(
modifier: parameter =>
{
var npgsqlParameter = (NpgsqlParameter)parameter;
npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
});
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new NpgsqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
The Saga
This sample contains a very simple order management saga with these responsibilities:
- Handling the creation of an order.
- Offloading the payment process to a another handler.
- Handling the completion of the payment process.
- Completing the order.
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompletePaymentTransaction>,
IHandleMessages<CompleteOrder>
{
static ILog log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.PaymentTransactionId = Guid.NewGuid().ToString();
log.Info($"Saga with OrderId {Data.OrderId} received StartOrder with OrderId {message.OrderId}");
var issuePaymentRequest = new IssuePaymentRequest
{
PaymentTransactionId = Data.PaymentTransactionId
};
return context.SendLocal(issuePaymentRequest);
}
public Task Handle(CompletePaymentTransaction message, IMessageHandlerContext context)
{
log.Info($"Transaction with Id {Data.PaymentTransactionId} completed for order id {Data.OrderId}");
var completeOrder = new CompleteOrder
{
OrderId = Data.OrderId
};
return context.SendLocal(completeOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} received CompleteOrder with OrderId {message.OrderId}");
MarkAsComplete();
return Task.CompletedTask;
}
}
It is important to note that the saga is not sending the order ID to the payment processor. Instead, it is sending a payment transaction ID. This saga needs to be correlated by more than one property. For example, OrderId
and PaymentTransactionId
. This requires both of these properties to be treated as unique.
Saga Finders
A Saga Finder is only required for the PaymentTransactionCompleted
message since the other messages (StartOrder
and CompleteOrder
) are correlated based on OrderSagaData.
.
MS SQL Server
On Microsoft SQL Server, the saga finder feature requires the JSON_VALUE function that is only available starting with SQL Server 2016.
class OrderSagaFinder :
ISagaFinder<OrderSagaData, CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
{
return storageSession.GetSagaData<OrderSagaData>(
context: context,
whereClause: "JSON_VALUE(Data,'$.PaymentTransactionId') = @propertyValue",
appendParameters: (builder, append) =>
{
var parameter = builder();
parameter.ParameterName = "propertyValue";
parameter.Value = message.PaymentTransactionId;
append(parameter);
});
}
}
MySql
class OrderSagaFinder :
ISagaFinder<OrderSagaData, CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
{
return storageSession.GetSagaData<OrderSagaData>(
context: context,
whereClause: "JSON_EXTRACT(Data,'$.PaymentTransactionId') = @propertyValue",
appendParameters: (builder, append) =>
{
var parameter = builder();
parameter.ParameterName = "propertyValue";
parameter.Value = message.PaymentTransactionId;
append(parameter);
});
}
}
PostgreSql
class OrderSagaFinder :
ISagaFinder<OrderSagaData, CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
{
return storageSession.GetSagaData<OrderSagaData>(
context: context,
whereClause: @"""Data""->>'PaymentTransactionId' = @propertyValue",
appendParameters: (builder, append) =>
{
var parameter = builder();
parameter.ParameterName = "propertyValue";
parameter.Value = message.PaymentTransactionId;
append(parameter);
});
}
}