Code walk-through
When the default Saga message mappings do not satisfy the requirements, custom logic can be put in place to allow NServiceBus to find a saga data instance based on which logic best suits the environment.
Prerequisites
Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost
and port 1433
. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.
in a terminal.
Alternatively, change the connection string to point to different SQL Server instance.
At startup each endpoint will create its required SQL assets including databases, tables, and schemas.
This sample creates a database named NsbSamplesNhCustomSagaFinder
.
NHibernate setup
This sample uses the NHibernate persistence, configured as follows:
// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesNhCustomSagaFinder;Integrated Security=True;Max Pool Size=100;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesNhCustomSagaFinder;User Id=SA;Password=yourStrong(!)Password;Max Pool Size=100;Encrypt=false";
var hibernateConfig = new Configuration();
hibernateConfig.DataBaseIntegration(x =>
{
x.ConnectionString = connectionString;
x.Dialect<MsSql2012Dialect>();
x.Driver<MicrosoftDataSqlClientDriver>();
});
var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>()
.UseConfiguration(hibernateConfig);
The Saga
This sample contains a very simple order management saga with these responsibilities:
- Handling the creation of an order.
- Offloading the payment process to a another handler.
- Handling the completion of the payment process.
- Completing the order.
public class OrderSaga :
Saga<OrderSagaData>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompletePaymentTransaction>,
IHandleMessages<CompleteOrder>
{
static readonly ILog log = LogManager.GetLogger<OrderSaga>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(saga => saga.OrderId)
.ToMessage<StartOrder>(msg => msg.OrderId)
.ToMessage<CompleteOrder>(msg => msg.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Data.PaymentTransactionId = Guid.NewGuid().ToString();
log.Info($"Saga with OrderId {Data.OrderId} received StartOrder with OrderId {message.OrderId}");
var issuePaymentRequest = new IssuePaymentRequest
{
PaymentTransactionId = Data.PaymentTransactionId
};
return context.SendLocal(issuePaymentRequest);
}
public Task Handle(CompletePaymentTransaction message, IMessageHandlerContext context)
{
log.Info($"Transaction with Id {Data.PaymentTransactionId} completed for order id {Data.OrderId}");
var completeOrder = new CompleteOrder
{
OrderId = Data.OrderId
};
return context.SendLocal(completeOrder);
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
log.Info($"Saga with OrderId {Data.OrderId} received CompleteOrder with OrderId {message.OrderId}");
MarkAsComplete();
return Task.CompletedTask;
}
}
It is important to note that the saga is not sending the order ID to the payment processor. Instead, it is sending a payment transaction ID. This saga needs to be correlated by more than one property. For example, OrderId
and PaymentTransactionId
. This requires both of these properties to be treated as unique.
class CompletePaymentTransactionSagaFinder : ISagaFinder<OrderSagaData, CompletePaymentTransaction>
{
public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, ISynchronizedStorageSession storageSession, IReadOnlyContextBag context, CancellationToken cancellationToken = default)
{
var session = storageSession.Session();
var orderSagaData = session.QueryOver<OrderSagaData>()
.Where(d => d.PaymentTransactionId == message.PaymentTransactionId)
.SingleOrDefault();
return Task.FromResult(orderSagaData);
}
}
A saga finder is a class that implements IFindSagas
. The class will be detected automatically by NServiceBus at endpoint configuration time. The FindBy
method will be invoked by NServiceBusEach time a message of type TMessage
is received by a saga which stores its state using type TSagaData
.
In this sample the implementation of the ConfigureHowToFindSaga
method is empty since a saga finder is provided for each message type handled by the saga. It is not required to provide a saga finder for every message type. A mix of standard saga mappings using ConfigureHowToFindSaga
, and saga finders, is a valid scenario.