Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

DynamoDB persistence transactions

NuGet Package: NServiceBus.Persistence.DynamoDB (2.x)
Target Version: NServiceBus 9.x

The DynamoDB TransactWriteItems API is used to commit outbox and saga changes in a single transaction. Message handlers can add further operations to this transaction with the synchronized session:

public async Task Handle(MyMessage message, IMessageHandlerContext context)

{
    //...

    var dynamoSession = context.SynchronizedStorageSession.DynamoPersistenceSession();
    dynamoSession.Add(new TransactWriteItem
    {
        // add database operations here
    });

    //...

}

Transactions can contain a maximum of 100 operations. This limit is shared with operations enlisted by NServiceBus. Each saga will use one operation. Outbox will use 1 + <number of outgoing messages> operations.

Mapping

The AWS SDK provides a built-in mechanism to map attribute values to custom types and vice-versa when using the DynamoDBContext. Unfortunately the DynamoDBContext comes with built-in assumptions on how classes should be annotated. On top of that, it tries to validate table structures in a synchronous and blocking way that can degrade the systems throughput. Additionally, the validation logic can make it difficult to use custom types stored together with the saga types in the same table when the single table design is preferred. To circumvent some of these issues, the persistence provides a custom mapper that supports all built-in data types and uses System.Text.Json under the covers.

The following snippet shows how to use the mapper function together with the synchronized storage to map attribute values to custom types and back.

class Customer
{
    public string CustomerId { get; set; }

    public bool CustomerPreferred { get; set; }
}

When the custom type doesn't contain an explicit mapped property for the partition and sort keys, it is crucial that both keys are added to the mapped attribute values before adding it to the storage session as shown below.

var getCustomer = new GetItemRequest
{
    ConsistentRead = true,
    Key = new Dictionary<string, AttributeValue>
    {
        { "PK", new AttributeValue { S = $"CUSTOMERS#{message.CustomerId}" } },
        { "SK", new AttributeValue { S = message.CustomerId } }
    },
    TableName = "someTable"
};

var getCustomerResponse = await client.GetItemAsync(getCustomer, context.CancellationToken);

var customer = Mapper.ToObject<Customer>(getCustomerResponse.Item);
customer.CustomerPreferred = true;

var customerMap = Mapper.ToMap(customer);
// when PK and SK are not defined on the custom type they need to be added again
customerMap["PK"] = getCustomerResponse.Item["PK"];
customerMap["SK"] = getCustomerResponse.Item["SK"];

dynamoSession.Add(new TransactWriteItem
{
    Put = new Put
    {
        Item = customerMap,
        TableName = "someTable"
    }
});

It is possible to also map the partition and the sort key by annotating a property with the corresponding partition and sort key name expressed with a JsonPropertyName attribute.

class Customer
{
    [JsonPropertyName("PK")]
    public string PartitionKey { get; set; }

    [JsonPropertyName("SK")]
    public string SortKey { get; set; }

    public string CustomerId { get; set; }

    public bool CustomerPreferred { get; set; }
}

with this in place the custom types can be mapped without further modification

var customer = Mapper.ToObject<Customer>(getCustomerResponse.Item);
customer.CustomerPreferred = true;

dynamoSession.Add(new TransactWriteItem
{
    Put = new Put
    {
        Item = Mapper.ToMap(customer),
        TableName = "someTable"
    }
});

Supported data types

The mapper supports the following data types:

  • Number
  • String
  • Binary
  • Boolean
  • Null
  • List
  • Map
  • Sets

The mapper is closely aligned with the DynamoDB v2 type mapping behavior in the AWS .NET SDK.

Binaries must be expressed as MemoryStream. For efficiency reasons the mapper does not copy the memory stream but directly adds a reference to the original MemoryStream into the attribute value dictionary and vice versa.

Sets are automatically used when the type has properties of type ISet<>.

Hierarchical objects are serialized into maps.

DynamoDBContext

It is possible to combine DynamoDBContext usage together with the synchronized storage but there are a number of things that must be taken into account.

  • Custom types that use renamed attribute names must have a corresponding JsonPropertyName attribute
  • In order to participate in the synchronized storage transaction SaveChangesAsync must not be called
  • The AWSSDK.DynamoDBv2 3.7.103 SDK introduced transactional operations on the context. These are currently not supported

Following up on the previous example, when mapping the Customer type with the DynamoDBContext, the CustomerId and the CustomerPreferred properties need the DynamoDBHashKey or DynamoDBProperty attributes, and the JsonPropertyName attribute.

[DynamoDBTable("customers")]
class Customer
{
    [DynamoDBHashKey("PK")]
    [JsonPropertyName("PK")]
    public string CustomerId { get; set; }

    [DynamoDBProperty("customer_preferred")]
    [JsonPropertyName("customer_preferred")]
    public bool CustomerPreferred { get; set; }
}

With mapping in place, loaded customers can be mapped into the storage session and will only be modified when the synchronized storage commits its transaction.

var customer = await dynamoDbContext.LoadAsync<Customer>(message.CustomerId, context.CancellationToken);

customer.CustomerPreferred = true;

// DO NOT call SaveAsync to participate in the synchronizes storage transaction
// await dynamoDbContext.SaveAsync(customer, context.CancellationToken);

dynamoSession.Add(new TransactWriteItem
{
    Put = new Put
    {
        Item = Mapper.ToMap(customer),
        TableName = "someTable"
    }
});

Testing

When unit testing a message handler, the TestableDynamoDBSynchronizedStorageSession class can be used:

[Test]
public async Task UnitTest()
{
    var testableSession = new TestableDynamoSynchronizedStorageSession();
    var testableContext = new TestableMessageHandlerContext
    {
        SynchronizedStorageSession = testableSession
    };

    var handler = new MyMessageHandler();
    await handler.Handle(new MyMessage(), testableContext);

    // assert on transaction items:
    Assert.That(testableSession.TransactWriteItems, Has.Count.EqualTo(1));
}

Related Articles