The DynamoDB TransactWriteItems API is used to commit outbox and saga changes in a single transaction. Message handlers can add further operations to this transaction with the synchronized session:
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
//...
var dynamoSession = context.SynchronizedStorageSession.DynamoPersistenceSession();
dynamoSession.Add(new TransactWriteItem
{
// add database operations here
});
//...
}
Transactions can contain a maximum of 100 operations. This limit is shared with operations enlisted by NServiceBus. Each saga will use one operation. Outbox will use 1 +
operations.
Mapping
The AWS SDK provides a built-in mechanism to map attribute values to custom types and vice-versa when using the DynamoDBContext
. Unfortunately the DynamoDBContext
comes with built-in assumptions on how classes should be annotated. On top of that, it tries to validate table structures in a synchronous and blocking way that can degrade the systems throughput. Additionally, the validation logic can make it difficult to use custom types stored together with the saga types in the same table when the single table design is preferred. To circumvent some of these issues, the persistence provides a custom mapper that supports all built-in data types and uses System.
under the covers.
The following snippet shows how to use the mapper function together with the synchronized storage to map attribute values to custom types and back.
class Customer
{
public string CustomerId { get; set; }
public bool CustomerPreferred { get; set; }
}
When the custom type doesn't contain an explicit mapped property for the partition and sort keys, it is crucial that both keys are added to the mapped attribute values before adding it to the storage session as shown below.
var getCustomer = new GetItemRequest
{
ConsistentRead = true,
Key = new Dictionary<string, AttributeValue>
{
{ "PK", new AttributeValue { S = $"CUSTOMERS#{message.CustomerId}" } },
{ "SK", new AttributeValue { S = message.CustomerId } }
},
TableName = "someTable"
};
var getCustomerResponse = await client.GetItemAsync(getCustomer, context.CancellationToken);
var customer = Mapper.ToObject<Customer>(getCustomerResponse.Item);
customer.CustomerPreferred = true;
var customerMap = Mapper.ToMap(customer);
// when PK and SK are not defined on the custom type they need to be added again
customerMap["PK"] = getCustomerResponse.Item["PK"];
customerMap["SK"] = getCustomerResponse.Item["SK"];
dynamoSession.Add(new TransactWriteItem
{
Put = new Put
{
Item = customerMap,
TableName = "someTable"
}
});
It is possible to also map the partition and the sort key by annotating a property with the corresponding partition and sort key name expressed with a JsonPropertyName
attribute.
class Customer
{
[JsonPropertyName("PK")]
public string PartitionKey { get; set; }
[JsonPropertyName("SK")]
public string SortKey { get; set; }
public string CustomerId { get; set; }
public bool CustomerPreferred { get; set; }
}
with this in place the custom types can be mapped without further modification
var customer = Mapper.ToObject<Customer>(getCustomerResponse.Item);
customer.CustomerPreferred = true;
dynamoSession.Add(new TransactWriteItem
{
Put = new Put
{
Item = Mapper.ToMap(customer),
TableName = "someTable"
}
});
Supported data types
The mapper supports the following data types:
- Number
- String
- Binary
- Boolean
- Null
- List
- Map
- Sets
The mapper is closely aligned with the DynamoDB v2 type mapping behavior in the AWS .NET SDK.
Binaries must be expressed as MemoryStream
. For efficiency reasons the mapper does not copy the memory stream but directly adds a reference to the original MemoryStream
into the attribute value dictionary and vice versa.
Sets are automatically used when the type has properties of type ISet
.
Hierarchical objects are serialized into maps.
DynamoDBContext
It is possible to combine DynamoDBContext
usage together with the synchronized storage but there are a number of things that must be taken into account.
- Custom types that use renamed attribute names must have a corresponding
JsonPropertyName
attribute - In order to participate in the synchronized storage transaction
SaveChangesAsync
must not be called - The AWSSDK.DynamoDBv2 3.7.103 SDK introduced transactional operations on the context. These are currently not supported
Following up on the previous example, when mapping the Customer
type with the DynamoDBContext
, the CustomerId
and the CustomerPreferred
properties need the DynamoDBHashKey
or DynamoDBProperty
attributes, and the JsonPropertyName
attribute.
[DynamoDBTable("customers")]
class Customer
{
[DynamoDBHashKey("PK")]
[JsonPropertyName("PK")]
public string CustomerId { get; set; }
[DynamoDBProperty("customer_preferred")]
[JsonPropertyName("customer_preferred")]
public bool CustomerPreferred { get; set; }
}
With mapping in place, loaded customers can be mapped into the storage session and will only be modified when the synchronized storage commits its transaction.
var customer = await dynamoDbContext.LoadAsync<Customer>(message.CustomerId, context.CancellationToken);
customer.CustomerPreferred = true;
// DO NOT call SaveAsync to participate in the synchronizes storage transaction
// await dynamoDbContext.SaveAsync(customer, context.CancellationToken);
dynamoSession.Add(new TransactWriteItem
{
Put = new Put
{
Item = Mapper.ToMap(customer),
TableName = "someTable"
}
});
Testing
When unit testing a message handler, the TestableDynamoDBSynchronizedStorageSession
class can be used:
[Test]
public async Task UnitTest()
{
var testableSession = new TestableDynamoSynchronizedStorageSession();
var testableContext = new TestableMessageHandlerContext
{
SynchronizedStorageSession = testableSession
};
var handler = new MyMessageHandler();
await handler.Handle(new MyMessage(), testableContext);
// assert on transaction items:
Assert.That(testableSession.TransactWriteItems, Has.Count.EqualTo(1));
}