Simple RavenDB Persistence Usage

Component: RavenDB Persistence
NuGet NServiceBus.RavenDB (3.x)
Target NServiceBus Version: 5.x
When running RavenDB Persister Samples the following warning will be logged.

NServiceBus has detected that a RavenDB DocumentStore is being used with Distributed Transaction Coordinator transactions, but without the recommended production-safe settings for ResourceManagerId or TransactionStorageRecovery.

The reason for this is that, for simplicity, the Raven Persister DTC settings have not been configured for production scenarios.

Code walk-through

This sample shows a simple Client + Server scenario.

  • Client sends a StartOrder message to Server.
  • Server starts an OrderSaga.
  • OrderSaga requests a timeout with a CompleteOrder data.
  • When the CompleteOrder timeout fires the OrderSaga publishes a OrderCompleted event.
  • The Server then publishes a message that the client subscribes to.
  • Client handles OrderCompleted event.

Raven Config

Configure the endpoint to use RavenDB persistence.

var busConfiguration = new BusConfiguration();
busConfiguration.EndpointName("Samples.RavenDB.Server");
var documentStore = new DocumentStore
{
    Url = "http://localhost:32076",
    DefaultDatabase = "RavenSampleData"
};
documentStore.Initialize();

var persistence = busConfiguration.UsePersistence<RavenDBPersistence>();
// Only required to simplify the sample setup
persistence.DoNotSetupDatabasePermissions();
persistence.SetDefaultDocumentStore(documentStore);

In Process Raven Host

It is possible to self-host RavenDB so that no running instance of RavenDB server is required to run the sample.

class RavenHost :
    IDisposable
{

    public RavenHost()
    {
        documentStore = new EmbeddableDocumentStore
        {
            DataDirectory = "Data",
            UseEmbeddedHttpServer = true,
            DefaultDatabase = "RavenSampleData",
            Configuration =
            {
                Port = 32076,
                PluginsDirectory = Environment.CurrentDirectory,
                HostName = "localhost"
            }
        };
        documentStore.Initialize();
        // since hosting a fake raven server in process need to remove it from the logging pipeline
        Trace.Listeners.Clear();
        Trace.Listeners.Add(new DefaultTraceListener());
        Console.WriteLine("Raven server started on http://localhost:32076/");
    }

    EmbeddableDocumentStore documentStore;

    public void Dispose()
    {
        documentStore?.Dispose();
    }
}

Order Saga Data

public class OrderSagaData :
    IContainSagaData
{
    public Guid Id { get; set; }
    public string Originator { get; set; }
    public string OriginalMessageId { get; set; }

    [Unique]
    public Guid OrderId { get; set; }
    public string OrderDescription { get; set; }
}

Order Saga

public class OrderSaga :
    Saga<OrderSagaData>,
    IAmStartedByMessages<StartOrder>,
    IHandleTimeouts<CompleteOrder>
{
    IBus bus;
    static ILog log = LogManager.GetLogger<OrderSaga>();

    public OrderSaga(IBus bus)
    {
        this.bus = bus;
    }

    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.ConfigureMapping<StartOrder>(message => message.OrderId)
            .ToSaga(sagaData => sagaData.OrderId);
    }

    public void Handle(StartOrder message)
    {
        Data.OrderId = message.OrderId;
        var orderDescription = $"The saga for order {message.OrderId}";
        Data.OrderDescription = orderDescription;
        log.Info($"Received StartOrder message {Data.OrderId}. Starting Saga");

        var shipOrder = new ShipOrder
        {
            OrderId = message.OrderId
        };
        Bus.SendLocal(shipOrder);

        log.Info("Order will complete in 5 seconds");
        var timeoutData = new CompleteOrder
        {
            OrderDescription = orderDescription
        };
        RequestTimeout(TimeSpan.FromSeconds(5), timeoutData);
    }

    public void Timeout(CompleteOrder state)
    {
        log.Info($"Saga with OrderId {Data.OrderId} completed");
        var orderCompleted = new OrderCompleted
        {
            OrderId = Data.OrderId
        };
        bus.Publish(orderCompleted);
        MarkAsComplete();
    }

}

Handler Using Raven Session

The handler access the same Raven ISession via ISessionProvider.

public class ShipOrderHandler :
    IHandleMessages<ShipOrder>
{
    ISessionProvider sessionProvider;

    public ShipOrderHandler(ISessionProvider sessionProvider)
    {
        this.sessionProvider = sessionProvider;
    }

    public void Handle(ShipOrder message)
    {
        var session = sessionProvider.Session;
        session.Store(new OrderShipped
        {
            Id = message.OrderId,
            ShippingDate = DateTime.UtcNow,
        });
    }
}

The Data in RavenDB

The data in RavenDB is stored in three different collections.

The Saga Data

  • IContainSagaData.Id maps to the native RavenDB document Id.
  • IContainSagaData.Originator and IContainSagaData.OriginalMessageId map to simple properties pairs.
  • Custom properties on the SagaData, in this case OrderDescription and OrderId, are also mapped to simple properties.

The Timeouts

  • The subscriber is stored in a Destination with the nested properties Queue and Machine.
  • The endpoint that initiated the timeout is stored in the OwningTimeoutManager property.
  • The connected saga ID is stored in a SagaId property.
  • The serialized data for the message is stored in a State property.
  • The scheduled timestamp for the timeout is stored in a Time property.
  • Any headers associated with the timeout are stored in an array of key value pairs.

The Subscriptions

Note that the message type maps to multiple subscriber endpoints.

  • The Subscription message type and version are stored in the MessageType property.
  • The list of subscribers is stored in a array of objects each containing Queue and MachineName properties.

The Handler Stored data

Related Articles

  • RavenDB Persistence
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified