SQL Persistence Saga rename

Component: Sql Persistence | Nuget: NServiceBus.Persistence.Sql (Version: 1.x)
Target NServiceBus Version: 6.x

Prerequisites

MS SQL Server

  1. Ensure an instance of SQL Server Express (Version 2016 or above) is installed and accessible as .\SQLEXPRESS. Create a database SqlPersistenceSample. Or, alternatively, change the connection string to point to different SQL Server instance

MySql

  1. Ensure an instance of MySql (Version 5.7 or above) is installed and accessible as on localhost and port 3306.
  2. Add the username to access the instance to an environment variable named MySqlUserName
  3. Add the password to access the instance to an environment variable named MySqlPassword

Or, alternatively, change the connection string to point to different MySql instance

Scenario

This sample shows a two sagas that need to be renamed.

The sagas implementations are rather contrived and their implementation exists only to illustrate the renaming of sagas. The functionality is, for sample purposes, split between the two versions. Saga starting logic existing in one endpoint and the handling logic existing in the other endpoint.

The sample consists of two Versions a single endpoint. The first version will start the sagas. The second version will handle the renaming of the sagas.

Timeout Saga

The timeout saga sends a timeout at startup and then handles that timeout.

This Saga will be renamed from MyNamespace1.MyTimeoutSagaVersion1 to MyNamespace2.MyTimeoutSagaVersion2.

This scenario is necessary to illustrate how, when the timeout message is received, its header needs to be translated over the new saga name.

The saga type is stored in the Headers of the TimeoutData Table (Sql Server and MySql) but will be converted back to a message header when the timeout is executed.

Reply Saga

The timeout saga sends a request at startup and then handles the response for that message.

This Saga will be renamed from MyNamespace1.MyReplySagaVersion1 to MyNamespace2.MyReplySagaVersion2.

This scenario is necessary to illustrate how, when the reply message is received, its header needs to be translated over the new saga name.

Projects

The sample consists of several projects:

Shared

The shared message contracts and common configuration for both versions. This project only exists for the purposes of code reuse between the two endpoint versions. In a normal codebase this project would not be required as both versions of sagas would never co-exist in the same solution.

Endpoint Configuration

Share endpoint configuration.

Edit
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlVariant(SqlVariant.MsSqlServer);
var connection = @"Data Source=.\SQLEXPRESS;Initial Catalog=SqlPersistenceSample;Integrated Security=True";
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connection);
    });

endpointConfiguration.SendFailedMessagesTo("error");
endpointConfiguration.UseSerialization<JsonSerializer>();
endpointConfiguration.EnableInstallers();

Reply Handler

A reply handler which takes a request and sends a reply. In this case the reply will always be to a saga. It also duplicates the OriginatingSagaType Header on to the message for logging purposes when the reply is executed on the saga.

Edit
public class RequestHandler :
    IHandleMessages<Request>
{
    static ILog log = LogManager.GetLogger<RequestHandler>();

    public Task Handle(Request message, IMessageHandlerContext context)
    {
        log.Warn("Got Request. Will send Reply");
        var headers = context.MessageHeaders;
        var reply = new Reply
        {
            TheId = message.TheId,
            OriginatingSagaType = headers["NServiceBus.OriginatingSagaType"]
        };
        return context.Reply(reply);
    }

}

EndpointVersion1

This project contains the first versions of the sagas. It also sends message to start both saga at startup.

Starting Sagas

Send message to start both the Reply Saga and the Timeout Saga.

Edit
var startReplySaga = new StartReplySaga
{
    TheId = Guid.NewGuid()
};
await endpointInstance.SendLocal(startReplySaga)
    .ConfigureAwait(false);

var startTimeoutSaga = new StartTimeoutSaga
{
    TheId = Guid.NewGuid()
};
await endpointInstance.SendLocal(startTimeoutSaga)
    .ConfigureAwait(false);

Reply Saga

At startup Sends a new Request message. The message is delayed to give EndpointVersion1 time to shut down prior to attempting to handle the Reply message.

The Handle method for the Reply message throws an exception since in this samples execution steps that method should never be executed.

Edit
namespace MyNamespace1
{
    [SqlSaga(
        correlationProperty: nameof(SagaData.TheId)
    )]
    public class MyReplySagaVersion1 :
        SqlSaga<MyReplySagaVersion1.SagaData>,
        IAmStartedByMessages<StartReplySaga>,
        IHandleMessages<Reply>
    {
        static ILog log = LogManager.GetLogger<MyReplySagaVersion1>();

        protected override void ConfigureMapping(MessagePropertyMapper<SagaData> mapper)
        {
            mapper.MapMessage<StartReplySaga>(_ => _.TheId);
            mapper.MapMessage<Reply>(_ => _.TheId);
        }

        public Task Handle(StartReplySaga message, IMessageHandlerContext context)
        {
            var sendOptions = new SendOptions();
            sendOptions.RouteToThisEndpoint();
            sendOptions.DelayDeliveryWith(TimeSpan.FromSeconds(10));
            var request = new Request
            {
                TheId = message.TheId
            };
            log.Warn("Saga started. Sending Request");
            return context.Send(request, sendOptions);
        }

        public Task Handle(Reply reply, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected Reply in MyReplySagaVersion2. EndpointVersion1 may have been incorrectly started.");
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

Timeout Saga

At startup requests a SagaTimeout. The timeout is delayed to give EndpointVersion1 time to shut down prior to attempting to handle the timeout.

The Handle method for the SagaTimeout message throws an exception since in this samples execution steps that method should never be executed.

Edit
namespace MyNamespace1
{
    [SqlSaga(
        correlationProperty: nameof(SagaData.TheId)
    )]
    public class MyTimeoutSagaVersion1 :
        SqlSaga<MyTimeoutSagaVersion1.SagaData>,
        IAmStartedByMessages<StartTimeoutSaga>,
        IHandleTimeouts<SagaTimeout>
    {
        static ILog log = LogManager.GetLogger<MyTimeoutSagaVersion1>();

        protected override void ConfigureMapping(MessagePropertyMapper<SagaData> mapper)
        {
            mapper.MapMessage<StartTimeoutSaga>(_ => _.TheId);
        }

        public Task Handle(StartTimeoutSaga message, IMessageHandlerContext context)
        {
            var timeout = new SagaTimeout
            {
                OriginatingSagaType = GetType().Name
            };
            log.Warn("Saga started. Sending Timeout");
            return RequestTimeout(context, TimeSpan.FromSeconds(10), timeout);
        }

        public Task Timeout(SagaTimeout state, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected Timeout in MyTimeoutSagaVersion2. EndpointVersion1 may have been incorrectly started.");
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

EndpointVersion2

This project contains the second versions of the sagas.

Reply Saga

Handles the Reply message that has been sent from MyReplySagaVersion1.

The Handle method for the StartReplySaga message throws an exception since in this samples execution steps that method should never be executed.

Edit
namespace MyNamespace2
{
    [SqlSaga(
        correlationProperty: nameof(SagaData.TheId)
    )]
    public class MyReplySagaVersion2 :
        SqlSaga<MyReplySagaVersion2.SagaData>,
        IAmStartedByMessages<StartReplySaga>,
        IHandleMessages<Reply>
    {
        static ILog log = LogManager.GetLogger<MyReplySagaVersion2>();

        protected override void ConfigureMapping(MessagePropertyMapper<SagaData> mapper)
        {
            mapper.MapMessage<StartReplySaga>(_ => _.TheId);
            mapper.MapMessage<Reply>(_ => _.TheId);
        }

        public Task Handle(StartReplySaga message, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected StartReplySaga in MyReplySagaVersion1.");
        }

        public Task Handle(Reply reply, IMessageHandlerContext context)
        {
            log.Warn($"Received Reply from {reply.OriginatingSagaType}");
            MarkAsComplete();
            return Task.FromResult(0);
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

Timeout Saga

Handles the timeout that has been sent from MyTimeoutSagaVersion1.

The Handle method for the StartTimeoutSaga message throws an exception since in this samples execution steps that method should never be executed.

Edit
namespace MyNamespace2
{
    [SqlSaga(
        correlationProperty: nameof(SagaData.TheId)
    )]
    public class MyTimeoutSagaVersion2 :
        SqlSaga<MyTimeoutSagaVersion2.SagaData>,
        IAmStartedByMessages<StartTimeoutSaga>,
        IHandleTimeouts<SagaTimeout>
    {
        static ILog log = LogManager.GetLogger<MyTimeoutSagaVersion2>();

        protected override void ConfigureMapping(MessagePropertyMapper<SagaData> mapper)
        {
            mapper.MapMessage<StartTimeoutSaga>(_ => _.TheId);
        }

        public Task Handle(StartTimeoutSaga message, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected StartTimeoutSaga in MyTimeoutSagaVersion1.");
        }

        public Task Timeout(SagaTimeout state, IMessageHandlerContext context)
        {
            log.Warn($"Received Timeout from {state.OriginatingSagaType}");
            MarkAsComplete();
            return Task.FromResult(0);
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

Timeout Saga

Edit
var connectionString = @"Data Source=.\SQLEXPRESS;Initial Catalog=SqlPersistenceSample;Integrated Security=True";
using (var connection = new SqlConnection(connectionString))
{
    await connection.OpenAsync()
        .ConfigureAwait(false);
    using (var command = connection.CreateCommand())
    {
        command.CommandText = "exec sp_rename 'Samples_RenameSaga_MyReplySagaVersion1', 'Samples_RenameSaga_MyReplySagaVersion2'";
        await command.ExecuteNonQueryAsync()
            .ConfigureAwait(false);
    }
    using (var command = connection.CreateCommand())
    {
        command.CommandText = "exec sp_rename 'Samples_RenameSaga_MyTimeoutSagaVersion1', 'Samples_RenameSaga_MyTimeoutSagaVersion2'";
        await command.ExecuteNonQueryAsync()
            .ConfigureAwait(false);
    }
}
In a production scenario this code would be executed as part of an endpoint migration prior to starting the new version of the endpoint.

Mutator

The mutator is a Incoming Transport Mutator that translates saga reply headers on incoming messages to the new saga names.

This is required to handle the following scenario

  • When the EndpointVersion1 is shut down there can still be messages on the wire or pending timeouts stored in the database.
  • These messages and timeouts have headers which will correlate to the old version of the sagas.
  • When the EndpointVersion2 is started these messages and timeouts will be processed but the headers need to be translated to be the new version of the sagas.
Edit
public class ReplyMutator :
    IMutateIncomingTransportMessages
{

    static Dictionary<string, string> sagaRenameMap = new Dictionary<string, string>
    {
        {"MyNamespace1.MyReplySagaVersion1", typeof(MyNamespace2.MyReplySagaVersion2).AssemblyQualifiedName},
        {"MyNamespace1.MyTimeoutSagaVersion1", typeof(MyNamespace2.MyTimeoutSagaVersion2).AssemblyQualifiedName}
    };

    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        var headers = context.Headers;
        string assemblyQualifiedType;
        if (headers.TryGetValue(Headers.OriginatingSagaType, out assemblyQualifiedType))
        {
            // Since OriginatingSagaType is the AssemblyQualifiedName
            // only map on the TypeName
            var type = assemblyQualifiedType.Split(',').First();
            string newSagaName;
            if (sagaRenameMap.TryGetValue(type, out newSagaName))
            {
                headers[Headers.OriginatingSagaType] = newSagaName;
            }
        }
        return Task.FromResult(0);
    }
}
This mutator must remain in place until all messages and timeouts (that target the old saga versions) are processed.

For reply messages starting the saga a safe period of time to leave the mutator in places is the configured discard time for those messages. Note that this period may be superseded by messages in the error queue being retied.

For timeouts targeting the saga a safe period of time to leave the mutator in places is dependent one the lifetime of the given saga. That is, dependent on the business rules of a give saga, how long it is expected to exist before it is marked as complete. Alternatively the TimeoutData table can be queried, using json_value, to check if there are any pending timeouts that target the old saga:

select Id
from Samples_RenameSaga_TimeoutData
where json_value(Headers,'$."NServiceBus.OriginatingSagaType"')
    like 'MyNamespace1.MyTimeoutSagaVersion1%'

If a saga is extremely long running then the data can be migrated programmatically in SQL using json_modify and json_value. For example:

update Samples_RenameSaga_TimeoutData
set Headers = json_modify(
        Headers,
        '$."NServiceBus.OriginatingSagaType"',
        'MyNamespace1.MyTimeoutSagaVersion1, Endpoint, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null')
where json_value(Headers, '$."NServiceBus.OriginatingSagaType"')
    like 'MyNamespace1.MyTimeoutSagaVersion1%'
Registration

The mutator is registered at endpoint startup.

Edit
endpointConfiguration.RegisterComponents(
    registration: components =>
    {
        components.ConfigureComponent<ReplyMutator>(DependencyLifecycle.InstancePerCall);
    });

Related Articles

  • Sagas
    NServiceBus uses event-driven architecture to include fault-tolerance and scalability in long-term business processes.

Last modified