Migrating from NServiceBus.Persistence.MongoDB

Component: MongoDB Persistence
NuGet Package NServiceBus.Storage.MongoDB (2.x)
Target NServiceBus Version: 7.x

The NServiceBus.Storage.MongoDB package was designed to be fully compatible with the community NServiceBus.Persistence.MongoDB package with some minor configuration.

Compatibility with existing data requires additional settings which must be configured, otherwise data duplication and incorrect execution of business logic may occur.

Saga data class changes

Saga data classes no longer need to provide an int version property decorated with a DocumentVersion. The version property and attribute may be safely removed from saga data class implementations:


class MySagaData : IContainSagaData
{
	public Guid Id { get; set; }
	public string OriginatingMessageId { get; set; }
	public string Originator { get; set; }
-       [DocumentVersion]
-       public int Version { get; set; }
}

Subscriptions

In the tekmavan implementation there was a single document per event type containing a collection of subscribers. In NServiceBus.Storage.MongoDB, subscriptions are individual documents. Each subscription needs to be converted into an eventsubscription document.

db.subscriptions.find().forEach(type => {
   type.Subscribers.forEach(subscription => {
       var parts = subscription.split('@');
       db.eventsubscription.insert({
           MessageTypeName: type._id.TypeName,
           TransportAddress: parts[0],
           Endpoint: parts[1]
       });
   });
});

How Document Versioning Works

MongoDB provides no out-of-the-box concurrency controls. A common pattern for supporting concurrency is using a document version number (int) that is used as a filter for update statements:

UpdateDefinition<BsonDocument> updateDefinition = updateBuilder.Inc(versionFieldName, 1);

//Define other update operations on the document

BsonDocument modifiedDocument = await collection.FindOneAndUpdateAsync<BsonDocument>(
    filter: document => document["_id"] == documentId && document["_version"] == currentVersion,
    update: updateDefinition,
    options: new FindOneAndUpdateOptions<BsonDocument, BsonDocument> { IsUpsert = false, ReturnDocument = ReturnDocument.After });

if (modifiedDocument == null)
{
    //The document was not updated because the version was already incremented.
}

By updating the document with a filter specifying the expected current version value of the document, no update will be made if another process has incremented the version before the current process is able to. This package relies on this pattern to ensure only one process/thread can update the saga at a time.

This pattern requires an element in the BsonDocument to store the current version value. Instead of requiring the user provide this as a property of their saga data type, this package uses the MongoDB client's BSON serializer to add a version element to the serialized saga data as it is initially created and stored in the collection. When the saga data serialized BsonDocument is later fetched, the version element's current value is retrieved before deserializing to the saga data type. The current value is then retained for the lifetime of saga message processing and is used to create the update filter.

By default, the BsonDocument element is named _version. To maintain compatibility with existing saga data created by community packages this package must be configured to instead use the version element name contained in the existing saga data documents.

Compatibility mode

Compatibility mode allows the MongoDB persistence to work with saga data created with the NServiceBus.Persistence.MongoDB package without modifications to the database.

Configuration

Use the following API to configure the package to work with existing saga data:

var persistence = endpointConfiguration.UsePersistence<MongoPersistence>();
var compatibility = persistence.CommunityPersistenceCompatibility();
compatibility.VersionElementName("Version");

The VersionElementName value must match the BsonDocument element name used by the previous saga data property decorated with the [DocumentVersion] attribute.

The version element name must be set taking into account any member mapping conventions configured for the MongoDB client.

Migrating data

As an alternative to compatibility mode, saga data created by the NServiceBus.Persistence.MongoDB package can be migrated to the data format used by the NServiceBus.Storage.MongoDB package. This approach requires the endpoint to be stopped during migration. Use the mongo shell to connect to the database and execute the following script:

db.getCollectionNames().forEach(collectionName => {
    db[collectionName].updateMany({
        Originator: { $exists: true },
        OriginalMessageId: { $exists: true }
    },
    {
        $rename: { "Version": "_version" }
    })
});

Replace "Version" with the name of the version property on the saga data which was previously decorated with the [DocumentVersion] attribute.

Be sure to create a backup of the database prior to migrating the saga data.

Related Articles


Last modified