Configuration

This page targets a pre-release version. Pre-releases are subject to change and samples are not guaranteed to be fully functional.

In NServiceBus.Persistence.AzureStorage XML-based configuration is no longer available. Configuring the behavior of the persister is done using the code configuration API.

For sagas and subscriptions:

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");

Saga configuration

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();
persistence.ConnectionString("connectionString");
// or
persistence.UseCloudTableClient(cloudTableClient);

The following settings are available for changing the behavior of saga persistence section:

  • ConnectionString: Sets the connection string for the storage account to be used for storing saga information.
  • UseCloudTableClient: Allows to set a fully pre-configured Cloud Table client instead of using a connection string.

Saga compatibility configuration

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();

var compatibility = persistence.Compatibility();
// e.g. Disable secondary index
compatibility.DisableSecondaryKeyLookupForSagasCorrelatedByProperties();

The following settings are available for changing the behavior of saga persistence compatibility section:

  • DisableSecondaryKeyLookupForSagasCorrelatedByProperties: By default the persistence operates in compatibility mode and tries to find sagas by using the secondary index property. If no more sagas are using the secondary index property NServiceBus_2ndIndexKey, the lookup by secondary key can be disabled.
  • AssumeSecondaryKeyUsesANonEmptyRowKeySetToThePartitionKey: Sagas that have been stored with a secondary index used an empty RowKey on the secondary index entry. By enabling this setting the secondary key lookups will assume that the RowKey equals the PartitionKey, which is crucial for Azure Cosmos DB Table API usage.
  • AllowSecondaryKeyLookupToFallbackToFullTableScan: Opt-in to full table scanning for sagas that have been stored with version 1.4 or earlier when running in compatibility mode.

Subscription configuration

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Subscriptions>();
persistence.ConnectionString("connectionString");
persistence.TableName("tableName");

// Added in Version 1.3
persistence.CacheFor(TimeSpan.FromMinutes(1));

The following settings are available for changing the behavior of subscription persistence:

  • ConnectionString: Sets the connection string for the storage account to be used for storing subscription information.
  • UseCloudTableClient: Allows to set a fully pre-configured Cloud Table client instead of using a connection string.

Configuring a Cloud Table Client Provider

A fully preconfigured CloudTableClient can be registered in the container through a custom provider.

Create a customer provider:

class CustomTableClientProvider
    : IProvideCloudTableClient
{
    // get fully configured via DI container
    public CustomTableClientProvider(CloudTableClient tableClient)
    {
        Client = tableClient;
    }
    public CloudTableClient Client { get; }
}

// optionally when subscriptions used
class CustomSubscriptionTableClientProvider
    : IProvideCloudTableClientForSubscriptions
{
    // get fully configured via DI container
    public CustomSubscriptionTableClientProvider(CloudTableClient tableClient)
    {
        Client = tableClient;
    }
    public CloudTableClient Client { get; }
}

Then register the provider in the container:

endpointConfiguration.RegisterComponents(services => services.AddSingleton<CustomTableClientProvider>());

// optionally when subscriptions used
endpointConfiguration.RegisterComponents(services => services.AddSingleton<CustomSubscriptionTableClientProvider>());

Table name configuration and creation

The default table name will be used for Sagas, Outbox and Subscription storage and can be set as follows:

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");
persistence.DefaultTable("TableName");

endpointConfiguration.EnableInstallers();

Configuring the table name

To provide a table at runtime or override the default table, the table information needs to be set as part of the message handling pipeline.

A behavior at the stage of theITransportReceiveContext:

class ContainerInfoTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(
            new TableInformation(
                tableName: "tableName"));

        await next().ConfigureAwait(false);
    }
}

A behavior at the stage of the IIncomingLogicalMessageContext can be used as well:

class ContainerInfoLogicalReceiveContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(
            new TableInformation(
                tableName: "tableName"));

        await next().ConfigureAwait(false);
    }
}

Enabling automatic table creation

To enable table creation on endpoint start or during runtime, the EnableInstallers API needs to be called on the endpoint configuration.

Note that when the default table is set, the table will be created on endpoint-start. When the table information is provided as part of the message handling pipeline, the tables will be created at runtime.

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");
persistence.DefaultTable("TableName");

endpointConfiguration.EnableInstallers();

Opting out from table creating when installers are enabled

In case installers are enabled, but there's a need to opt out from creating the tables, the DisableTableCreation-API may be used:

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");
persistence.DefaultTable("TableName");

// make sure the table name specified in the DefaultTable exists when calling DisableTableCreation
endpointConfiguration.EnableInstallers();
persistence.DisableTableCreation();

Partitioning and compatibility mode helpers

During a given message handling pipeline, multiple data storage operations may occur. To commit them atomically, in a single transaction, they must share a partition key. In conversations involving a saga, the saga ID is a good candidate for a partition key. Unfortunately, saga IDs are determined late in the message handling process and are not exposed to user code.

This makes it difficult to:

  • Enable transactional saga data storage for existing sagas that were stored with the previous version of the persister.
  • Allow other data operations to participate in the saga data transaction.

To support the above scenarios, IProvidePartitionKeyFromSagaId may be injected into behaviors at the logical pipeline stage:

class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public OrderIdAsPartitionKeyBehavior(IProvidePartitionKeyFromSagaId partitionKeyFromSagaId) =>
        this.partitionKeyFromSagaId = partitionKeyFromSagaId;

    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var correlationProperty = SagaCorrelationProperty.None;

        if (context.Message.Instance is IProvideOrderId provideOrderId)
        {
            Log.Debug($"Order ID: '{provideOrderId.OrderId}'");

            correlationProperty = new SagaCorrelationProperty("OrderId", provideOrderId.OrderId);
        }

        await partitionKeyFromSagaId.SetPartitionKey<OrderSagaData>(context, correlationProperty)
            .ConfigureAwait(false);

        Log.Debug($"Partition key: {context.Extensions.Get<TableEntityPartitionKey>().PartitionKey}");

        if (context.Headers.TryGetValue(Headers.SagaId, out var sagaId))
        {
            Log.Debug($"Saga ID: {sagaId}");
        }

        if (context.Extensions.TryGet<TableInformation>(out var tableInformation))
        {
            Log.Debug($"Table name: {tableInformation.TableName}");
        }

        await next().ConfigureAwait(false);
    }

    public class Registration : RegisterStep
    {
        public Registration() :
            base(nameof(OrderIdAsPartitionKeyBehavior),
                typeof(OrderIdAsPartitionKeyBehavior),
                "Determines the PartitionKey from the logical message",
                provider => new OrderIdAsPartitionKeyBehavior(provider.GetRequiredService<IProvidePartitionKeyFromSagaId>())) =>
            InsertBefore(nameof(LogicalOutboxBehavior));
    }

    readonly IProvidePartitionKeyFromSagaId partitionKeyFromSagaId;
    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}

IProvidePartitionKeyFromSagaId does the folllowing:

  • Sets the partition key on the IIncomingLogicalMessageContext based on the following algorithm:
    • Use the saga ID header value if present. Otherwise:
    • When compatibility mode is enabled, and the correlation property is not SagaCorrelationProperty.None, look up the saga ID either using the secondary index if present, or by table scanning the saga data if that is enabled. Otherwise:
    • Calculate the saga ID based on the specified correlation property.
  • If the table in which all data, including saga data, will be stored is not already set, set it using the saga data name as the table name.

For more information on connection string configuration see Configuring Azure Connection Strings.


Last modified