Getting Started
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Previews
Samples

Migrating saga persistence

Component: NServiceBus
NuGet Package: NServiceBus (8.x)

When saga data is involved, migrating from one type of persistence to another can seem like an insurmountable task, since each persister stores NServiceBus saga data in different ways that would make creating a migration script difficult. Even if the script could be written, executing it may require lengthy downtime, possible with no ability to revert if something went wrong.

This sample takes a different approach, by showing how to migrate from one saga persister to another in a gradual fashion, without requiring an offline migration script. This is accomplished by running two versions of the endpoint in parallel and forwarding messages where the saga data cannot be found to the new endpoint.

This sample shows how to migrate from NHibernate to SQL Persistence, but the same concept can be used to migrate between any two persistence options.

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesSagaMigration.

Running the project

  1. Start the solution.
  2. Wait until Type 'start <SagaId>' or 'complete <SagaId>' is shown in the "Client" console window.
  3. Start a couple of sagas with easy to remember IDs (e.g. start 1, start 2 and start 3).
  4. Verify sagas are started by running SELECT * FROM [NsbSamplesSagaMigration].[dbo].[TestSaga].
  5. Verify that the messages were handled by the "Server" endpoint.
  6. Complete one of the sagas (e.g. complete 1) and observe the message flow. It can take up to 10 seconds to complete as the flow involves a saga timeout. The result should be completion of a saga (verify by checking that a corresponding row is removed from the saga table by re-running the previous query).
  7. Stop the solution.
  8. Uncomment the #define MIGRATION line in TestSaga.cs.
  9. Start the solution.
  10. Start and complete some new sagas (e.g. start A, start B and start C).
  11. Verify sagas are started by running SELECT * FROM [NsbSamplesSagaMigration].[dbo].[NewTestSaga].
  12. Verify that the messages were handled by the "Server.New" endpoint.
  13. Notice that the "Server" console shows information indicating that the not-found handler has been used.
  14. Complete the previously created sagas (complete 2 or complete 3) to drain the saga store.
  15. Verify the messages are handled by the old "Server" endpoint, not the "Server.New" endpoint.
  16. Complete one of the new sagas (e.g. complete A) to verify it is handled properly by "Server.New"
  17. Complete another saga (e.g. complete B) and stop the solution as soon as Got a follow-up message. is shown in the console.
  18. Run SELECT [Destination], [SagaId] FROM [NsbSamplesSagaMigration].[dbo].[NewTimeoutData] to verify the timeout is stored in the database and the destination is the "Server.New" queue.
  19. Uncomment the #define POST_MIGRATION line in Program.cs and DrainTempQueueSatelliteFeature.cs of "Server.New". This changes the input queue of "Server.New" back to the well-known Samples.SagaMigration.Server and enables an additional receiver that drains the temporary queue.
  20. Start only the "Server.New" project by right-clicking the project in Solution Explorer and selecting "Debug -> Start new instance".
  21. Notice "Server.New" prints Moving message from Samples.SagaMigration.Server.New@<machine> to Samples.SagaMigration.Server@<machine> and then Got timeout. Completing. which means the timeout has been successfully redirected from the temporary queue. This happens only if there were outstanding timeout messages present when new version of the endpoint replaced the old one.

Code walk-through

This sample contains four projects:

  • Shared - contains definitions of messages exchanged between the Client and the Server.
  • Client - initiates a multi-message conversation with the server.
  • Server - implements a long running process via the Saga feature. Uses the NHibernate-based saga persister.
  • Server.New - implements the same functionality as Server but uses the SQL-based saga persister.

The sample shows how to gradually migrate from one saga persister to another without requiring an off-line migration procedure/script. In this example NHibernate and SQL persisters are the source and target respectively but any persister can be used in any role i.e. the same method can be used to migrate, for example, from RavenDB to NHibernate.

Message flow

The message flow is designed to demonstrate the correctness of migration logic:

  • The StartingMessage (sent via start command) starts the saga and assigns the correlation property.
  • The CorrelatedMessage (sent via complete command) contains the correlation property value of an already started saga. Handling of this message results in sending back a ReplyMessage
  • The ReplyMessage sent by the saga contains the saga ID header containing the storage ID (not the correlation property) of the saga instance that sent it
  • The ReplyFollowUpMessage send as a response to ReplyMessage contains the mentioned saga ID header by which the target saga is being looked up
public Task Handle(StartingMessage message, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Created new saga instance.");
    return Task.CompletedTask;
}

public Task Handle(ReplyFollowUpMessage message, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Got a follow-up message.");
    return RequestTimeout<TestTimeout>(context, DateTime.UtcNow.AddSeconds(10));
}

public Task Handle(CorrelatedMessage message, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Got a correlated message {message.SomeId}. Replying back.");
    var replyMessage = new ReplyMessage
    {
        SomeId = Data.SomeId
    };
    return context.Reply(replyMessage);
}

public Task Timeout(TestTimeout state, IMessageHandlerContext context)
{
    log.Info($"{Data.SomeId}: Got timeout. Completing.");
    MarkAsComplete();
    return Task.CompletedTask;
}

To summarize, sagas can be looked up by either their correlation property value or the storage ID.

How it works

The migration procedure require that the new version of a given endpoint is deployed alongside the old one. In this sample "Server.New" represents the new version that uses SQL persistence. Before the migration is complete the new version is not visible to the outside world because it uses a different queue name.

The new version must be deployed and started before the migration process can begin. Only then can the old version be modified to enable the migration. This requires a couple of code changes. In this sample both versions share the same source code (though copied to different files) for the saga definition and the changes are done via pre-processor instructions. Enabling the migration happens by un-commenting this statement:

//#define MIGRATION

Handling not found sagas

In order to eventually migrate to the new persistence, all new saga instances need to be created by the "Server.New" endpoint. To ensure this, the old "Server" endpoint has to include a handler for not found sagas that forwards the messages to the new endpoint:

class NotFoundHandler :
    IHandleSagaNotFound
{
    public Task Handle(object message, IMessageProcessingContext context)
    {
        return context.ForwardCurrentMessageTo("Samples.SagaMigration.Server.New");
    }
}

The messages are forwarded as-is, without any side effects and handled normally by the destination.

This handler is only invoked for messages that target an existing saga (either by correlation or by having a saga ID header). It is never invoked for a message that can start a saga so the saga code has to be modified to not treat StartingMessage as a saga started in the old endpoint.

public class TestSaga :
        Saga<TestSaga.TestSagaData>,
        IHandleMessages<ReplyFollowUpMessage>,
        IHandleMessages<CorrelatedMessage>,
        IHandleTimeouts<TestTimeout>,
#if MIGRATION
        IHandleMessages<StartingMessage>,
        IAmStartedByMessages<DummyMessage>
#else
        IAmStartedByMessages<StartingMessage>
#endif

Notice DummyMessage is necessary as a saga starter. NServiceBus validation logic does not allow sagas without any starter messages. DummyMessage is never sent. It is only there so satisfy the validation.

The correlation property mappings also need to include DummyMessage

mapper.MapSaga(s => s.SomeId)

    .ToMessage<StartingMessage>(m => m.SomeId)
    .ToMessage<CorrelatedMessage>(m => m.SomeId)
#if MIGRATION
    .ToMessage<DummyMessage>(m => m.SomeId)
#endif
    ;

Forwarding messages from temporary queue

When there are no sagas left in the old persistence, the old version of the endpoint can be decommissioned. In order to not lose any messages, the new version has to include, for a certain period of time, a redirection satellite that receives any remaining messages from the temporary queue and moves it to the regular queue.

public class DrainTempQueueSatelliteFeature :
    Feature
{
    static ILog log = LogManager.GetLogger<DrainTempQueueSatelliteFeature>();

    public DrainTempQueueSatelliteFeature()
    {
        EnableByDefault();
    }

    protected override void Setup(FeatureConfigurationContext context)
    {
        var settings = context.Settings;

        context.AddSatelliteReceiver(
            name: "DrainTempQueueSatellite",
            transportAddress: new QueueAddress("Samples.SagaMigration.Server.New"),
            runtimeSettings: new PushRuntimeSettings(maxConcurrency: 1),
            recoverabilityPolicy: (config, errorContext) =>
            {
                return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
            },
            onMessage: OnMessage);
    }

    Task OnMessage(IServiceProvider serviceProvider, MessageContext context, CancellationToken cancellationToken)
    {
        var receiveAddresses = serviceProvider.GetRequiredService<ReceiveAddresses>();
        var dispatcher = serviceProvider.GetRequiredService<IMessageDispatcher>();
        var headers = context.Headers;

        var message = new OutgoingMessage(context.NativeMessageId, headers, context.Body);
        var operation = new TransportOperation(message, new UnicastAddressTag(receiveAddresses.MainReceiveAddress));

        log.Info($"Moving message from {context.ReceiveAddress} to {receiveAddresses.MainReceiveAddress}");

        var operations = new TransportOperations(operation);
        return dispatcher.Dispatch(operations, context.TransportTransaction, cancellationToken);
    }
}

Decommissioning the old endpoint

Once all the sagas stored in the old persister are complete, the old endpoint can be decommissioned.

  1. Ensure the saga table is empty by running a command similar to SELECT * FROM [NsbSamplesSagaMigration].[dbo].[TestSaga].
  2. Stop the old endpoint ("Server").
  3. Update the binaries and/or configuration of the old endpoint with the binaries and configuration of the new endpoint. Enable the redirection of the temporary queue.
  4. Start the endpoint.
  5. The redirection of temporary queue can be removed when there are no timeout messages for the temporary queue (e.g. by running a query like SELECT [Destination], [SagaId] FROM [NsbSamplesSagaMigration].[dbo].[NewTimeoutData] WHERE [Destination] = <address of temp queue>.

Related Articles

  • NHibernate Persistence
    NHibernate-based persistence for NServiceBus.
  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.
  • SQL Persistence
    A persister that targets relational databases, including SQL Server, Oracle. MySQL, and PostgreSQL.

Last modified