RavenDB Custom Saga Finding Logic

Component: RavenDB Persistence
NuGet Package NServiceBus.RavenDB (5.x)
Target NServiceBus Version: 7.x
RavenDB's implementation of distributed transactions contains a bug that could cause an endpoint, in certain (rare) conditions, to lose data. If RavenDB is configured to enlist in distributed transactions, read DTC not supported for RavenDB Persistence.

Code walk-through

When the default Saga message mappings do not satisfy the requirements, custom logic can be put in place to allow NServiceBus to find a saga data instance based on which logic best suites the environment.

This sample also use multiple Unique attributes using the default RavenDB Unique Constraint bundle.

RavenDB setup

This sample requires RavenDB persistence package and a running RavenDB instance configured accordingly.

NServiceBus out of the box does not support saga data with multiple Unique attributes, in order to achieve that it is possible to utilize the default RavenDB UniqueConstraint Bundle. Follow the instructions on the RavenDB site to correctly install the bundle in the RavenDB server. It is also required to configure the client side of the bundle by registering the UniqueConstraintsStoreListener as shown above.

using (var documentStore = new DocumentStore
{
    Url = "http://localhost:32076",
    DefaultDatabase = "NServiceBus"
})
{
    documentStore.RegisterListener(new UniqueConstraintsStoreListener());
    documentStore.Initialize();

    var persistence = endpointConfiguration.UsePersistence<RavenDBPersistence>();
    // Only required to simplify the sample setup
    persistence.DoNotSetupDatabasePermissions();
    persistence.DisableSubscriptionVersioning();
    persistence.SetDefaultDocumentStore(documentStore);
If running this sample against an external RavenDB server ensure that the RavenDB.Bundles.UniqueConstraints bundle is currently installed according to the extending RavenDB documentation. If the server side of the plugin is not correctly loaded, notice that the SagaNotFoundHandler will be invoked.
When the RavenDB DocumentStore is created by the user at endpoint configuration time it's important to dispose it, by calling the Dispose() method, before shutting down the endpoint process.

In Process Raven Host

It is possible to self-host RavenDB so that no running instance of RavenDB server is required to run the sample.

class RavenHost :
    IDisposable
{
    public RavenHost()
    {
        documentStore = new EmbeddableDocumentStore
        {
            DataDirectory = "Data",
            UseEmbeddedHttpServer = true,
            DefaultDatabase = "NServiceBus",
            Configuration =
            {
                Port = 32076,
                PluginsDirectory = "Plugins",
                HostName = "localhost",
                Settings =
                {
                    { "Raven/ActiveBundles", "Unique Constraints" }
                },
                DefaultStorageTypeName = "esent"
            }
        };
        documentStore.RegisterListener(new UniqueConstraintsStoreListener());
        documentStore.Initialize();

        // since hosting a fake raven server in process need to remove it from the logging pipeline
        Trace.Listeners.Clear();
        Trace.Listeners.Add(new DefaultTraceListener());
        Console.WriteLine("Raven server started on http://localhost:32076/");
    }

    EmbeddableDocumentStore documentStore;

    public void Dispose()
    {
        documentStore?.Dispose();
    }
}

The Saga

The saga shown in the sample is a very simple order management saga that:

  • Handles the creation of an order.
  • Offloads the payment process to a different handler.
  • Handles the completion of the payment process.
  • Completes the order.
public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleMessages<CompletePaymentTransaction>,
    IHandleMessages<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(_ => _.OrderId)
            .ToSaga(_ => _.OrderId);
        mapper.ConfigureMapping<CompleteOrder>(_ => _.OrderId)
            .ToSaga(_ => _.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        Data.PaymentTransactionId = Guid.NewGuid().ToString();

        log.Info($"Saga with OrderId {Data.OrderId} received StartOrder with OrderId {message.OrderId}");
        var issuePaymentRequest = new IssuePaymentRequest
        {
            PaymentTransactionId = Data.PaymentTransactionId
        };
        return context.SendLocal(issuePaymentRequest);
    }

    public Task Handle(CompletePaymentTransaction message, IMessageHandlerContext context)
    {
        log.Info($"Transaction with Id {Data.PaymentTransactionId} completed for order id {Data.OrderId}");
        var completeOrder = new CompleteOrder
        {
            OrderId = Data.OrderId
        };
        return context.SendLocal(completeOrder);
    }

    public Task Handle(CompleteOrder message, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} received CompleteOrder with OrderId {message.OrderId}");
        MarkAsComplete();

        return Task.CompletedTask;
    }
}

From the process point of view it is important to note that the saga is not sending to the payment processor the order id, instead it is sending a payment transaction id. A saga can be correlated given more than one unique attribute, such as OrderId and PaymentTransactionId, requiring both to be treated as unique.

public class OrderSagaData :
    ContainSagaData
{
    [UniqueConstraint(CaseInsensitive = true)]
    public string OrderId { get; set; }

    [UniqueConstraint(CaseInsensitive = true)]
    public string PaymentTransactionId { get; set; }
}

A properties uniqueness can be expressed by using the UniqueConstraint attribute provided by the RavenDB bundle.

At start-up the sample will send a StartOrder message, since the saga data class is decorated with custom attributes it is required to also plug custom logic to find a saga data instance:

class CompletePaymentTransactionSagaFinder :
    IFindSagas<OrderSagaData>.Using<CompletePaymentTransaction>
{
    public Task<OrderSagaData> FindBy(CompletePaymentTransaction message, SynchronizedStorageSession storageSession, ReadOnlyContextBag context)
    {
        var session = storageSession.RavenSession();
        return session.LoadByUniqueConstraintAsync<OrderSagaData>(d => d.PaymentTransactionId, message.PaymentTransactionId);
    }
}

Building a saga finder requires to define a class that implements the IFindSagas<TSagaData>.Using<TMessage> interface. The class will be automatically picked up by NServiceBus at configuration time and used each time a message of type TMessage, that is expected to load a saga of type TSagaData, is received. The FindBy method will be invoked by NServiceBus.

In the sample the implementation of the ConfigureHowToFindSaga method, that is required, is empty since a saga finder is provided for each message type that the saga is handling. It is not required to provide a saga finder for every message type, a mix of standard saga mappings and custom saga finding is a valid scenario.

Related Articles

  • RavenDB Persistence
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified