Simple RavenDB Persistence Usage

Component: RavenDB Persistence
NuGet Package NServiceBus.RavenDB (4.x)
Target NServiceBus Version: 6.x
When running RavenDB Persister Samples the following warning will be logged.

NServiceBus has detected that a RavenDB DocumentStore is being used with Distributed Transaction Coordinator transactions, but without the recommended production-safe settings for ResourceManagerId or TransactionStorageRecovery.

The reason for this is that, for simplicity, the Raven Persister DTC settings have not been configured for production scenarios.

Code walk-through

This sample shows a simple Client + Server scenario.

  • Client sends a StartOrder message to Server.
  • Server starts an OrderSaga.
  • OrderSaga requests a timeout with a CompleteOrder data.
  • When the CompleteOrder timeout fires the OrderSaga publishes a OrderCompleted event.
  • The Server then publishes a message that the client subscribes to.
  • Client handles OrderCompleted event.

Raven Config

Configure the endpoint to use RavenDB persistence.

var endpointConfiguration = new EndpointConfiguration("Samples.RavenDB.Server");
var documentStore = new DocumentStore
{
    Url = "http://localhost:32076",
    DefaultDatabase = "RavenSampleData"
};
documentStore.Initialize();

var persistence = endpointConfiguration.UsePersistence<RavenDBPersistence>();
// Only required to simplify the sample setup
persistence.DoNotSetupDatabasePermissions();
persistence.SetDefaultDocumentStore(documentStore);

In Process Raven Host

It is possible to self-host RavenDB so that no running instance of RavenDB server is required to run the sample.

class RavenHost :
    IDisposable
{

    public RavenHost()
    {
        documentStore = new EmbeddableDocumentStore
        {
            DataDirectory = "Data",
            UseEmbeddedHttpServer = true,
            DefaultDatabase = "RavenSampleData",
            Configuration =
            {
                Port = 32076,
                PluginsDirectory = Environment.CurrentDirectory,
                HostName = "localhost",
                DefaultStorageTypeName = "esent"
            }
        };
        documentStore.Initialize();
        // since hosting a fake raven server in process remove it from the logging pipeline
        Trace.Listeners.Clear();
        Trace.Listeners.Add(new DefaultTraceListener());
        Console.WriteLine("Raven server started on http://localhost:32076/");
    }

    EmbeddableDocumentStore documentStore;

    public void Dispose()
    {
        documentStore?.Dispose();
    }
}

Order Saga Data

public class OrderSagaData :
    ContainSagaData
{
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order Saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    static ILog log = LogManager.GetLogger<OrderSaga>();

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(message => message.OrderId)
            .ToSaga(sagaData => sagaData.OrderId);
    }

    public async Task Handle(StartOrder message, IMessageHandlerContext context)
    {
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };
        await context.SendLocal(shipOrder)
            .ConfigureAwait(false);

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };

        await RequestTimeout(context, TimeSpan.FromSeconds(5), timeoutData)
            .ConfigureAwait(false);
    }

    public Task Timeout(CompleteOrder state, IMessageHandlerContext context)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        MarkAsComplete();
        return context.Publish(orderCompleted);
    }
}

Handler Using Raven Session

The handler access the same Raven ISession via ISessionProvider.

public class ShipOrderHandler :
    IHandleMessages<ShipOrder>
{
    public Task Handle(ShipOrder message, IMessageHandlerContext context)
    {
        var session = context.SynchronizedStorageSession.RavenSession();
        var orderShipped = new OrderShipped
        {
            Id = message.OrderId,
            ShippingDate = DateTime.UtcNow,
        };
        return session.StoreAsync(orderShipped);
    }
}

The Data in RavenDB

The data in RavenDB is stored in three different collections.

The Saga Data

  • IContainSagaData.Id maps to the native RavenDB document Id.
  • IContainSagaData.Originator and IContainSagaData.OriginalMessageId map to simple properties pairs.
  • Custom properties on the SagaData, in this case OrderDescription and OrderId, are also mapped to simple properties.

The Timeouts

  • The subscriber is stored in a Destination with the nested properties Queue and Machine.
  • The endpoint that initiated the timeout is stored in the OwningTimeoutManager property.
  • The connected saga ID is stored in a SagaId property.
  • The serialized data for the message is stored in a State property.
  • The scheduled timestamp for the timeout is stored in a Time property.
  • Any headers associated with the timeout are stored in an array of key value pairs.

The Subscriptions

Note that the message type maps to multiple subscriber endpoints.

  • The Subscription message type and version are stored in the MessageType property.
  • The list of subscribers is stored in a array of objects each containing Queue and MachineName properties.

The Handler Stored data

Related Articles

  • RavenDB Persistence
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified