Migrating saga persistence

Component: NServiceBus
NuGet Package NServiceBus (7-pre)
This page targets a pre-release version and is subject to change prior to the final release.

Prerequisites

An instance of SQL Server Express is installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

The database created by this sample is NsbSamplesSagaMigration.

Running the project

  1. Start the solution.
  2. Wait until Type 'start <SagaId>' or 'complete <SagaId>' is shown in the "Client" console window.
  3. Start a couple of sagas with easy to remember IDs (e.g. start 1, start 2 and start 3).
  4. Verify sagas are started by running SELECT * FROM [nservicebus].[dbo].[TestSaga].
  5. Verify that the messages were handled by "Server" endpoint.
  6. Complete one of the sagas (e.g. complete 1). Observe the message flow. It can take up to 10 seconds to complete as the flow involves a saga timeout. The result should be completion of a saga (verify by checking that a corresponding row is removed from the saga table by running again the previous query).
  7. Stop the solution.
  8. Uncomment the #define MIGRATION line in TestSaga.cs.
  9. Start the solution.
  10. Start and complete some new sagas (e.g. start A, start B and start C).
  11. Verify sagas are started by running SELECT * FROM [nservicebus].[dbo].[NewTestSaga].
  12. Verify that the messages were handled by "Server.New" endpoint.
  13. Notice that "Server" console shows information indicating that the not-found handler has been used.
  14. Complete the previously created sagas (complete 2 or complete 3) to drain the saga store.
  15. Verify the messages are handled by the old "Server" endpoint, not the "Server.New".
  16. Complete one of the new sagas (e.g. complete A) to verify it is handled properly by "Server.New"
  17. Complete another saga (e.g. complete B) and stop the solution as soon as Got a follow-up message. is shown in the console.
  18. Run SELECT [Destination], [SagaId] FROM [nservicebus].[dbo].[NewTimeoutData] to verify the timeout is stored in the database and the destination is the "Server.New" queue.
  19. Uncomment the #define POST_MIGRATION in Program.cs and DrainTempQueueSatelliteFeature.cs of "Server.New". This changes the input queue of "Server.New" back to the well-known Samples.SagaMigration.Server and enables an additional receiver that drains the temporary queue.
  20. Start only the "Server.New" project by right-clicking the project in Solution Explorer and selecting "Debug -> Start new instance".
  21. Notice "Server.New" prints Moving message from Samples.SagaMigration.Server.New@<machine> to Samples.SagaMigration.Server@<machine> and then Got timeout. Completing. which means the timeout has been successfully redirected from the temporary queue. This happens only if there were outstanding timeout messages present when new version of the endpoint replaced the old one.

Code walk-through

This sample contains four projects:

  • Shared - contains definitions of messages exchanged between the Client and the Server.
  • Client - initiates a multi-message conversation with the server.
  • Server - implements a long running process via the Saga feature. Uses NHibernate-based saga persister.
  • Server.New - implements the same functionality as Server but uses SQL-based saga persister.

The sample shows how to gradually migrate from one saga persister to another without requiring an off-line migration procedure/script. In this example NHibernate and SQL persisters as source and target respectively but any persister can be used in any role i.e. the same method can be used to migrate e.g. from RavenDB to NHibernate persister.

Message flow

The message flow is designed to demonstrate the correctness of migration logic:

  • The StartingMessage (sent via start command) starts the saga and assigns the correlation property.
  • The CorrelatedMessage (sent via complete command) contains the correlation property value of an already started saga. Handling of this message results in sending back a ReplyMessage
  • The ReplyMessage sent by the saga contains the saga ID header containing the storage ID (not the correlation property) of the saga instance that sent it
  • The ReplyFollowUpMessage send as a response to ReplyMessage contains the mentioned saga ID header by which the target saga is being looked up
public Task Handle(StartingMessage message, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Created new saga instance.");
    return Task.CompletedTask;
}

public Task Handle(ReplyFollowUpMessage message, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Got a follow-up message.");
    return RequestTimeout<TestTimeout>(context, DateTime.UtcNow.AddSeconds(10));
}

public Task Handle(CorrelatedMessage message, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Got a correlated message {message.SomeId}. Replying back.");
    var replyMessage = new ReplyMessage
    {
        SomeId = Data.SomeId
    };
    return context.Reply(replyMessage);
}

public Task Timeout(TestTimeout state, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Got timeout. Completing.");
    MarkAsComplete();
    return Task.CompletedTask;
}

To summarize, sagas can be either looked up by their correlation property value or the storage ID.

How it works

The migration procedure require that the new version of a given endpoint is deployed alongside the old one. In this sample "Server.New" represents the new version that uses SQL persistence. Before the migration is complete the new version is not visible to the outside world because it uses a different queue name.

The new version has to be deployed and started before the migration process can begin. Only then the old version has to be modified to enable the migration. This requires a couple of code changes. In this sample both versions share the same source code file for the saga definition and the changes are done via pre-processor instructions. Enabling the migration happens by un-commenting this statement:

//#define MIGRATION

Handling not found sagas

In order to eventually migrate to the new persistence, all new saga instances need to be created by the "Server.New" endpoint. To ensure this, the old "Server" endpoint has to include a handler for not found sagas that forwards the messages to the new endpoint:

class NotFoundHandler :
    IHandleSagaNotFound
{
    public Task Handle(object message, IMessageProcessingContext context)
    {
        return context.ForwardCurrentMessageTo("Samples.SagaMigration.Server.New");
    }
}

The messages are forwarded as-is, without any side effects and handled normally by the destination.

This handler is only invoked for messages that target an existing saga (either by correlation or by having a saga ID header). It is never invoked for a message that can start a saga so the saga code has to be modified to not treat StartingMessage as a saga started in the old endpoint.

//SERVER_NEW is defined in Server.New project settings
#if SERVER_NEW
[NServiceBus.Persistence.Sql.SqlSaga(correlationProperty: "SomeId")]
#endif

public class TestSaga :
        Saga<TestSaga.TestSagaData>,
        IHandleMessages<ReplyFollowUpMessage>,
        IHandleMessages<CorrelatedMessage>,
        IHandleTimeouts<TestTimeout>,
#if MIGRATION && !SERVER_NEW
        IHandleMessages<StartingMessage>,
        IAmStartedByMessages<DummyMessage>
#else
        IAmStartedByMessages<StartingMessage>
#endif

Notice DummyMessage is necessary as a saga starter. NServiceBus validation logic does not allow sagas without any starter messages. DummyMessage is never sent. It is only there so satisfy the validation.

The correlation property mappings also need to include DummyMessage

mapper.ConfigureMapping<StartingMessage>(m => m.SomeId)
    .ToSaga(s => s.SomeId);
mapper.ConfigureMapping<CorrelatedMessage>(m => m.SomeId)
    .ToSaga(s => s.SomeId);
#if MIGRATION && !SERVER_NEW
mapper.ConfigureMapping<DummyMessage>(m => m.SomeId)
    .ToSaga(s => s.SomeId);
#endif

Forwarding messages from temporary queue

When there are no sagas left in the old persistence, the old version of the endpoint can be decommissioned. In order to not lose any messages, the new version has to include, for a certain period of time, a redirection satellite that receives any remaining messages from the temporary queue and moves it to the regular queue.

public class DrainTempQueueSatelliteFeature :
    Feature
{
    static ILog log = LogManager.GetLogger<DrainTempQueueSatelliteFeature>();
    string tempQueue;
    string mainQueue;

    public DrainTempQueueSatelliteFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var settings = context.Settings;
        var qualifiedAddress = settings.LogicalAddress()
            .CreateQualifiedAddress("New");
        tempQueue = settings.GetTransportAddress(qualifiedAddress);
        mainQueue = settings.LocalAddress();

        context.AddSatelliteReceiver(
            name: "DrainTempQueueSatellite",
            transportAddress: tempQueue,
            runtimeSettings: new PushRuntimeSettings(maxConcurrency: 1),
            recoverabilityPolicy: (config, errorContext) =>
            {
                return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
            },
            onMessage: OnMessage);
    }

    Task OnMessage(IBuilder builder, MessageContext context)
    {
        var dispatcher = builder.Build<IDispatchMessages>();
        var headers = context.Headers;

        var message = new OutgoingMessage(context.MessageId, headers, context.Body);
        var operation = new TransportOperation(message, new UnicastAddressTag(mainQueue));

        log.Info($"Moving message from {tempQueue} to {mainQueue}");

        var operations = new TransportOperations(operation);
        return dispatcher.Dispatch(operations, context.TransportTransaction, context.Context);
    }
}

Decommissioning the old endpoint

Once all the sagas stored in the old persister are complete, the old endpoint can be decommissioned.

  1. Ensure the saga table is empty by running a command similar to SELECT * FROM [nservicebus].[dbo].[TestSaga].
  2. Stop the old endpoint ("Server").
  3. Update the binaries and/or configuration of the old endpoint with the binaries and configuration of the new endpoint. Enable the redirection of the temporary queue.
  4. Start the endpoint.
  5. The redirection of temporary queue can be removed when there are no timeout messages for the temporary queue (e.g. by running a query like SELECT [Destination], [SagaId] FROM [nservicebus].[dbo].[NewTimeoutData] WHERE [Destination] = <address of temp queue>.

Related Articles


Last modified