Code walk-through
When the default Saga message mappings do not satisfy the requirements, custom logic can be put in place to allow NServiceBus to find a saga data instance based on which logic best suits the environment.
Prerequisites
MS SQL Server
- Ensure an instance of SQL Server Express (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible as
.
.\ SqlExpress
Or, alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables and schemas.
MySQL
- Ensure an instance of MySQL (Version 5.7 or above) is installed and accessible on
localhost
and port3306
. - Add the username to access the instance to an environment variable named
MySqlUserName
. - Add the password to access the instance to an environment variable named
MySqlPassword
.
Or, alternatively, change the connection string to point to different MySQL instance.
Oracle
- Ensure an instance of Oracle Database (Version 11g or later) is installed and accessible on
localhost
on port1521
with service nameXE
. - Add the username to access the instance to an environment variable named
OracleUserName
. - Add the password to access the instance to an environment variable named
OraclePassword
.
Or, alternatively, change the connection string to point to different Oracle instance.
PostgreSQL
- Ensure an instance of PostgreSQL (Version 10 or later) is installed and accessible on
localhost
. - Add the username to access the instance to an environment variable named
PostgreSqlUserName
. - Add the password to access the instance to an environment variable named
PostgreSqlPassword
.
Or, alternatively, change the connection string to point to different PostgreSQL instance.
Persistence Config
Configure the endpoint to use SQL Persistence.
MS SQL Server
var connection = @"Data Source=.\SqlExpress;Database=NsbSamplesSqlSagaFinder;Integrated Security=True";
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
MySql
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("MySqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'MySqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("MySqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'MySqlUserName' from Environment variables.");
}
var connection = $"server=localhost;user={username};database=sqlpersistencesample;port=3306;password={password};AllowUserVariables=True;AutoEnlist=false";
persistence.SqlDialect<SqlDialect.MySql>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new MySqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
PostgreSql
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var password = Environment.GetEnvironmentVariable("PostgreSqlPassword");
if (string.IsNullOrWhiteSpace(password))
{
throw new Exception("Could not extract 'PostgreSqlPassword' from Environment variables.");
}
var username = Environment.GetEnvironmentVariable("PostgreSqlUserName");
if (string.IsNullOrWhiteSpace(username))
{
throw new Exception("Could not extract 'PostgreSqlUserName' from Environment variables.");
}
var connection = $"Host=localhost;Username={username};Password={password};Database=NsbSamplesSqlSagaFinder";
persistence.TablePrefix("Finder");
var dialect = persistence.SqlDialect<SqlDialect.PostgreSql>();
dialect.JsonBParameterModifier(
modifier: parameter =>
{
var npgsqlParameter = (NpgsqlParameter)parameter;
npgsqlParameter.NpgsqlDbType = NpgsqlDbType.Jsonb;
});
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new NpgsqlConnection(connection);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));
The Saga
This sample contains a very simple order management saga with these responsibilities:
- Handling the creation of an order.
- Offloading the payment process to a another handler.
- Handling the completion of the payment process.
- Completing the order.
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompletePaymentTransaction>,
IHandleMessages<CompleteOrder>
{
static ILog log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.PaymentTransactionId = Guid.NewGuid().ToString();
log.Info($"Saga with OrderId {Data.OrderId} received StartOrder with OrderId {message.OrderId}");
var issuePaymentRequest = new IssuePaymentRequest
{
PaymentTransactionId = Data.PaymentTransactionId
};
return context.SendLocal(issuePaymentRequest);
}
public Task Handle(CompletePaymentTransaction message, IMessageHandlerContext context)
{
log.Info($"Transaction with Id {Data.PaymentTransactionId} completed for order id {Data.OrderId}");
var completeOrder = new CompleteOrder
{
OrderId = Data.OrderId
};
return context.SendLocal(completeOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} received CompleteOrder with OrderId {message.OrderId}");
MarkAsComplete();
return Task.CompletedTask;
}
}
It is important to note that the saga is not sending the order ID to the payment processor. Instead, it is sending a payment transaction ID. This saga needs to be correlated by more than one property. For example, OrderId
and PaymentTransactionId
. This requires both of these properties to be treated as unique.
Saga Finders
A Saga Finder is only required for the PaymentTransactionCompleted
message since the other messages (StartOrder
and CompleteOrder
) are correlated based on OrderSagaData.
.
MS SQL Server
class OrderSagaFinder :
IFindSagas<OrderSagaData>.Using<CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, SynchronizedStorageSession session, ReadOnlyContextBag context)
{
return session.GetSagaData<OrderSagaData>(
context: context,
whereClause: "JSON_VALUE(Data,'$.PaymentTransactionId') = @propertyValue",
appendParameters: (builder, append) =>
{
var parameter = builder();
parameter.ParameterName = "propertyValue";
parameter.Value = message.PaymentTransactionId;
append(parameter);
});
}
}
MySql
class OrderSagaFinder :
IFindSagas<OrderSagaData>.Using<CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, SynchronizedStorageSession session, ReadOnlyContextBag context)
{
return session.GetSagaData<OrderSagaData>(
context: context,
whereClause: "JSON_EXTRACT(Data,'$.PaymentTransactionId') = @propertyValue",
appendParameters: (builder, append) =>
{
var parameter = builder();
parameter.ParameterName = "propertyValue";
parameter.Value = message.PaymentTransactionId;
append(parameter);
});
}
}
PostgreSql
class OrderSagaFinder :
IFindSagas<OrderSagaData>.Using<CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, SynchronizedStorageSession session, ReadOnlyContextBag context)
{
return session.GetSagaData<OrderSagaData>(
context: context,
whereClause: @"""Data""->>'PaymentTransactionId' = @propertyValue",
appendParameters: (builder, append) =>
{
var parameter = builder();
parameter.ParameterName = "propertyValue";
parameter.Value = message.PaymentTransactionId;
append(parameter);
});
}
}