Simple RavenDB Persistence Usage

Component: RavenDB Persistence
NuGet Package NServiceBus.RavenDB (5-pre)
Target NServiceBus Version: 7.x
This page targets a pre-release version and is subject to change prior to the final release.
RavenDB's implementation of distributed transactions contains a bug that could cause an endpoint, in certain (rare) conditions, to lose data. If RavenDB is configured to enlist in distributed transactions, read DTC not supported for RavenDB Persistence.
When running RavenDB Persister Samples the following warning will be logged.

NServiceBus has detected that a RavenDB DocumentStore is being used with Distributed Transaction Coordinator transactions, but without the recommended production-safe settings for ResourceManagerId or TransactionStorageRecovery.

The reason for this is that, for simplicity, the Raven Persister DTC settings have not been configured for production scenarios.

Code walk-through

This sample shows a simple Client + Server scenario.

  • Client sends a StartOrder message to Server.
  • Server starts an OrderSaga.
  • OrderSaga requests a timeout with a CompleteOrder data.
  • When the CompleteOrder timeout fires the OrderSaga publishes a OrderCompleted event.
  • The Server then publishes a message that the client subscribes to.
  • Client handles OrderCompleted event.

Raven Config

Configure the endpoint to use RavenDB persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.RavenDB.Server");
using (var documentStore = new DocumentStore
{
    Url = "http://localhost:32076",
    DefaultDatabase = "RavenSampleData"
})
{
    documentStore.Initialize();

    var persistence = endpointConfiguration.UsePersistence<RavenDBPersistence>();
    // Only required to simplify the sample setup
    persistence.DoNotSetupDatabasePermissions();
    persistence.DisableSubscriptionVersioning();
    persistence.SetDefaultDocumentStore(documentStore);
When the RavenDB DocumentStore is created by the user at endpoint configuration time it's important to dispose it, by calling the Dispose() method, before shutting down the endpoint process.

In Process Raven Host

It is possible to self-host RavenDB so that no running instance of RavenDB server is required to run the sample.

class RavenHost :
    IDisposable
{
    public RavenHost()
    {
        documentStore = new EmbeddableDocumentStore
        {
            DataDirectory = "Data",
            UseEmbeddedHttpServer = true,
            DefaultDatabase = "RavenSampleData",
            Configuration =
            {
                Port = 32076,
                PluginsDirectory = Environment.CurrentDirectory,
                HostName = "localhost",
                DefaultStorageTypeName = "esent"
            }
        };
        documentStore.Initialize();
        // since hosting a fake raven server in process remove it from the logging pipeline
        Trace.Listeners.Clear();
        Trace.Listeners.Add(new DefaultTraceListener());
        Console.WriteLine("Raven server started on http://localhost:32076/");
    }

    EmbeddableDocumentStore documentStore;

    public void Dispose()
    {
        documentStore?.Dispose();
    }
}

Order Saga Data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order Saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(message => message.OrderId)
            .ToSaga(sagaData => sagaData.OrderId);
    }

    public Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };

        return Task.WhenAll(
            context.SendLocal(shipOrder),
            RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
        );
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        MarkAsComplete();
        return context.Publish(orderCompleted);
    }
}

Handler Using Raven Session

The handler access the same Raven ISession via ISessionProvider.

public class ShipOrderHandler :
    IHandleMessages<ShipOrder>
{
    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        var session = context.SynchronizedStorageSession.RavenSession();
        var orderShipped = new OrderShipped
        {
            Id = message.OrderId,
            ShippingDate = DateTime.UtcNow,
        };
        return session.StoreAsync(orderShipped);
    }
}

The Data in RavenDB

The data in RavenDB is stored in three different collections.

The Saga Data

  • IContainSagaData.Id maps to the native RavenDB document Id.
  • IContainSagaData.Originator and IContainSagaData.OriginalMessageId map to simple properties pairs.
  • Custom properties on the SagaData, in this case OrderDescription and OrderId, are also mapped to simple properties.

The Timeouts

  • The subscriber is stored in a Destination with the nested properties Queue and Machine.
  • The endpoint that initiated the timeout is stored in the OwningTimeoutManager property.
  • The connected saga ID is stored in a SagaId property.
  • The serialized data for the message is stored in a State property.
  • The scheduled timestamp for the timeout is stored in a Time property.
  • Any headers associated with the timeout are stored in an array of key value pairs.

The Subscriptions

Note that the message type maps to multiple subscriber endpoints.

  • The Subscription message type and version are stored in the MessageType property.
  • The list of subscribers is stored in a array of objects each containing Queue and MachineName properties.

The Handler Stored data

Related Articles

  • RavenDB Persistence
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified