Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

SQL Persistence Saga Rename

Component: Sql Persistence
NuGet Package: NServiceBus.Persistence.Sql (8.x)
Target Version: NServiceBus 9.x

This sample shows two sagas that need to be renamed.

The saga implementations are contrived and exist only to illustrate the renaming of sagas. For sample purposes, the functionality is split between the two versions: saga starting logic in one endpoint, and the handling logic in the other endpoint.

The sample consists of two versions of a single endpoint. The first version will start the sagas. The second version will handle the renaming of the sagas.

Prerequisites

Ensure an instance of SQL Server (Version 2016 or above for custom saga finders sample, or Version 2012 or above for other samples) is installed and accessible on localhost and port 1433. A Docker image can be used to accomplish this by running docker run -e 'ACCEPT_EULA=Y' -e 'MSSQL_SA_PASSWORD=yourStrong(!)Password' -p 1433:1433 -d mcr.microsoft.com/mssql/server:latest in a terminal.

Alternatively, change the connection string to point to different SQL Server instance.

At startup each endpoint will create its required SQL assets including databases, tables, and schemas.

The database created by this sample is NsbSamplesSqlPersistenceRenameSaga.

Timeout saga

The timeout saga sends a timeout at startup and then handles that timeout.

This saga will be renamed from MyNamespace1.MyTimeoutSagaVersion1 to MyNamespace2.MyTimeoutSagaVersion2.

This scenario demonstrates that when the timeout message is received, its header needs to be translated over the new saga name.

The saga type is stored in the headers of the TimeoutData table (SQL Server and MySQL) but will be converted back to a message header when the timeout is executed.

Reply saga

The reply saga sends a request at startup and then handles the response for that message.

This saga will be renamed from MyNamespace1.MyReplySagaVersion1 to MyNamespace2.MyReplySagaVersion2.

This scenario demonstrates that when the reply message is received, its header needs to be translated over the new saga name.

Projects

The sample consists of several projects:

Shared

The shared message contracts and common configuration for both versions. This project exists for the purposes of code reuse between the two endpoint versions. In a production codebase, this project would not be required as both versions of sagas would not co-exist in the same solution.

Endpoint configuration

Shared endpoint configuration.

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();

var subscriptions = persistence.SubscriptionSettings();
subscriptions.CacheFor(TimeSpan.FromMinutes(1));

// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlPersistenceRenameSaga;Integrated Security=True;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlPersistenceRenameSaga;User Id=SA;Password=yourStrong(!)Password;Encrypt=false";

persistence.ConnectionBuilder(
    () => new SqlConnection(connectionString));

endpointConfiguration.EnableInstallers();
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.UseTransport(new LearningTransport());

Reply handler

A reply handler which takes a request and sends a reply. In this case the reply will always be to a saga. It also duplicates the OriginatingSagaType header on to the message for logging purposes when the reply is executed on the saga.

public class RequestHandler :
    IHandleMessages<Request>
{
    readonly static ILog log = LogManager.GetLogger<RequestHandler>();

    public Task Handle(Request message, IMessageHandlerContext context)
    {
        log.Warn("Got Request. Will send Reply");

        var headers = context.MessageHeaders;

        var reply = new Reply
        {
            TheId = message.TheId,
            OriginatingSagaType = headers["NServiceBus.OriginatingSagaType"]
        };

        return context.Reply(reply);
    }
}

EndpointVersion1

This project contains the first versions of the sagas. It also sends messages to start both sagas at startup.

Starting sagas

Sends messages to start both the Reply saga and the Timeout saga.

var startReplySaga = new StartReplySaga
{
    TheId = Guid.NewGuid()
};

await endpointInstance.SendLocal(startReplySaga);

var startTimeoutSaga = new StartTimeoutSaga
{
    TheId = Guid.NewGuid()
};

await endpointInstance.SendLocal(startTimeoutSaga);

Reply saga

At startup, sends a new Request message. The message is delayed to give EndpointVersion1 time to shut down prior to attempting to handle the Reply message.

The Handle method for the Reply message throws an exception since in this sample, this handler will not be executed.

namespace MyNamespace1
{
    public class MyReplySagaVersion1 :
        Saga<MyReplySagaVersion1.SagaData>,
        IAmStartedByMessages<StartReplySaga>,
        IHandleMessages<Reply>
    {
        readonly static ILog log = LogManager.GetLogger<MyReplySagaVersion1>();

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SagaData> mapper)
        {
            mapper.MapSaga(saga => saga.TheId)
                .ToMessage<StartReplySaga>(msg => msg.TheId)
                .ToMessage<Reply>(msg => msg.TheId);
        }

        public Task Handle(StartReplySaga message, IMessageHandlerContext context)
        {
            var sendOptions = new SendOptions();
            sendOptions.RouteToThisEndpoint();
            sendOptions.DelayDeliveryWith(TimeSpan.FromSeconds(10));

            var request = new Request
            {
                TheId = message.TheId
            };

            log.Warn("Saga started. Sending Request");

            return context.Send(request, sendOptions);
        }

        public Task Handle(Reply reply, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected Reply in MyReplySagaVersion2. EndpointVersion1 may have been incorrectly started.");
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

Timeout saga

At startup, requests a SagaTimeout. The timeout is delayed to give EndpointVersion1 time to shut down prior to attempting to handle the timeout.

The Handle method for the SagaTimeout message throws an exception since in this sample, this handler will not be executed.

namespace MyNamespace1
{
    public class MyTimeoutSagaVersion1 :
        Saga<MyTimeoutSagaVersion1.SagaData>,
        IAmStartedByMessages<StartTimeoutSaga>,
        IHandleTimeouts<SagaTimeout>
    {
        readonly static ILog log = LogManager.GetLogger<MyTimeoutSagaVersion1>();

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SagaData> mapper)
        {
            mapper.MapSaga(saga => saga.TheId)
                .ToMessage<StartTimeoutSaga>(msg => msg.TheId);
        }

        public Task Handle(StartTimeoutSaga message, IMessageHandlerContext context)
        {
            var timeout = new SagaTimeout
            {
                OriginatingSagaType = GetType().Name
            };

            log.Warn("Saga started. Sending Timeout");

            return RequestTimeout(context, TimeSpan.FromSeconds(10), timeout);
        }

        public Task Timeout(SagaTimeout state, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected Timeout in MyTimeoutSagaVersion2. EndpointVersion1 may have been incorrectly started.");
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

EndpointVersion2

This project contains the second versions of the sagas.

Reply saga

Handles the Reply message that has been sent from MyReplySagaVersion1.

The Handle method for the StartReplySaga message throws an exception since in this sample, this handler will not be executed.

namespace MyNamespace2
{
    public class MyReplySagaVersion2 :
        Saga<MyReplySagaVersion2.SagaData>,
        IAmStartedByMessages<StartReplySaga>,
        IHandleMessages<Reply>
    {
        readonly static ILog log = LogManager.GetLogger<MyReplySagaVersion2>();

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SagaData> mapper)
        {
            mapper.MapSaga(saga => saga.TheId)
                .ToMessage<StartReplySaga>(msg => msg.TheId)
                .ToMessage<Reply>(msg => msg.TheId);
        }

        public Task Handle(StartReplySaga message, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected StartReplySaga in MyReplySagaVersion1.");
        }

        public Task Handle(Reply reply, IMessageHandlerContext context)
        {
            log.Warn($"Received Reply from {reply.OriginatingSagaType}");

            MarkAsComplete();

            return Task.CompletedTask;
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

Timeout saga

Handles the timeout that has been sent from MyTimeoutSagaVersion1.

The Handle method for the StartTimeoutSaga message throws an exception since in this sample, this handler will not be executed.

namespace MyNamespace2
{
    public class MyTimeoutSagaVersion2 :
        Saga<MyTimeoutSagaVersion2.SagaData>,
        IAmStartedByMessages<StartTimeoutSaga>,
        IHandleTimeouts<SagaTimeout>
    {
        readonly static ILog log = LogManager.GetLogger<MyTimeoutSagaVersion2>();

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<SagaData> mapper)
        {
            mapper.MapSaga(saga => saga.TheId)
                .ToMessage<StartTimeoutSaga>(msg => msg.TheId);
        }

        public Task Handle(StartTimeoutSaga message, IMessageHandlerContext context)
        {
            // throw only for sample purposes
            throw new Exception("Expected StartTimeoutSaga in MyTimeoutSagaVersion1.");
        }

        public Task Timeout(SagaTimeout state, IMessageHandlerContext context)
        {
            log.Warn($"Received Timeout from {state.OriginatingSagaType}");

            MarkAsComplete();
            return Task.CompletedTask;
        }

        public class SagaData :
            ContainSagaData
        {
            public Guid TheId { get; set; }
        }
    }
}

Timeout saga

// for SqlExpress use Data Source=.\SqlExpress;Initial Catalog=NsbSamplesSqlPersistenceRenameSaga;Integrated Security=True;Encrypt=false
var connectionString = @"Server=localhost,1433;Initial Catalog=NsbSamplesSqlPersistenceRenameSaga;User Id=SA;Password=yourStrong(!)Password;Encrypt=false";

using var connection = new SqlConnection(connectionString);

await connection.OpenAsync();

using var renameReplySaga = connection.CreateCommand();

renameReplySaga.CommandText = "exec sp_rename 'Samples_RenameSaga_MyReplySagaVersion1', 'Samples_RenameSaga_MyReplySagaVersion2'";
await renameReplySaga.ExecuteNonQueryAsync();

using var renameTimeoutSaga = connection.CreateCommand();

renameTimeoutSaga.CommandText = "exec sp_rename 'Samples_RenameSaga_MyTimeoutSagaVersion1', 'Samples_RenameSaga_MyTimeoutSagaVersion2'";
await renameTimeoutSaga.ExecuteNonQueryAsync();

Mutator

The mutator is an incoming transport mutator that translates saga reply headers on incoming messages to the new saga names.

This is required to handle the following scenario

  • When EndpointVersion1 is shut down there can still be messages in the queues or pending timeouts stored in the database.
  • These messages and timeouts have headers referring to the old versions of the saga types.
  • When EndpointVersion2 is started these messages and timeouts will be processed but the headers need to be translated to the new versions of the sagas.
public class ReplyMutator :
    IMutateIncomingTransportMessages
{
    readonly static Dictionary<string, string> sagaRenameMap = new Dictionary<string, string>
    {
        {"MyNamespace1.MyReplySagaVersion1", typeof(MyNamespace2.MyReplySagaVersion2).AssemblyQualifiedName},
        {"MyNamespace1.MyTimeoutSagaVersion1", typeof(MyNamespace2.MyTimeoutSagaVersion2).AssemblyQualifiedName}
    };

    public Task MutateIncoming(MutateIncomingTransportMessageContext context)
    {
        var headers = context.Headers;

        if (headers.TryGetValue(Headers.OriginatingSagaType, out var assemblyQualifiedType))
        {
            // Since OriginatingSagaType is the AssemblyQualifiedName
            // only map on the TypeName
            var type = assemblyQualifiedType.Split(',').First();

            if (sagaRenameMap.TryGetValue(type, out var newSagaName))
            {
                headers[Headers.OriginatingSagaType] = newSagaName;
            }
        }
        return Task.CompletedTask;
    }
}

For reply messages, the mutator should not be removed for at least message discard time - if specified for those messages. Note that messages in the error queue might be stored by messaging infrastructure for even longer. If no discard period is specified, it is recommended to leave the mutator until it is reasonable to assume there are no more messages that need to be mutated.

For timeouts, the safe period of time to leave the mutator in place is dependent on the lifetime of the given saga. The business rules of a given saga, how long it is expected to exist before it is marked as complete, define its lifetime. Alternatively, the TimeoutData table can be queried, using json_value, to check if there are any pending timeouts that target the old saga:

select Id
from Samples_RenameSaga_TimeoutData
where json_value(Headers,'$."NServiceBus.OriginatingSagaType"')
    like 'MyNamespace1.MyTimeoutSagaVersion1%'

If a saga is extremely long running then the data can be migrated programmatically in SQL using json_modify and json_value. For example:

update Samples_RenameSaga_TimeoutData
set Headers = json_modify(
        Headers,
        '$."NServiceBus.OriginatingSagaType"',
        'MyNamespace1.MyTimeoutSagaVersion1, Endpoint, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null')
where json_value(Headers, '$."NServiceBus.OriginatingSagaType"')
    like 'MyNamespace1.MyTimeoutSagaVersion1%'
Registration

The mutator is registered at endpoint startup.

endpointConfiguration.RegisterMessageMutator(new ReplyMutator());

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.