Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Configuration

Target Version: NServiceBus 9.x

In NServiceBus.Persistence.AzureStorage XML-based configuration is no longer available. Configuring the behavior of the persister is done using the code configuration API.

For sagas and subscriptions:

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");

Saga configuration

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Sagas>();
persistence.ConnectionString("connectionString");

// or

TableServiceClient tableServiceClient = new TableServiceClient("connectionString");
persistence.UseTableServiceClient(tableServiceClient);

The following settings are available for changing the behavior of saga persistence section:

  • ConnectionString: Sets the connection string for the storage account to be used for storing saga information.
  • UseTableServiceClient: Allows to set a fully pre-configured Table Service client instead of using a connection string.

Subscription configuration

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence, StorageType.Subscriptions>();
persistence.ConnectionString("connectionString");
persistence.TableName("tableName");

// Added in Version 1.3
persistence.CacheFor(TimeSpan.FromMinutes(1));

The following settings are available for changing the behavior of subscription persistence:

  • ConnectionString: Sets the connection string for the storage account to be used for storing subscription information.
  • UseTableServiceClient: Allows to set a fully pre-configured Table Service client instead of using a connection string.

Configuring a Table Service Client Provider

A fully preconfigured TableServiceClient can be registered in the container through a custom provider.

Create a customer provider:

class CustomTableClientProvider : IProvideTableServiceClient
{
    // get fully configured via DI container
    public CustomTableClientProvider(TableServiceClient tableServiceClient)
    {
        Client = tableServiceClient;
    }
    public TableServiceClient Client { get; }
}

// optionally when subscriptions used
class CustomSubscriptionTableClientProvider : IProvideTableServiceClientForSubscriptions
{
    // get fully configured via DI container
    public CustomSubscriptionTableClientProvider(TableServiceClient tableServiceClient)
    {
        Client = tableServiceClient;
    }
    public TableServiceClient Client { get; }
}

Then register the provider in the container:

endpointConfiguration.RegisterComponents(services => services.AddSingleton<CustomTableClientProvider>());

// optionally when subscriptions used
endpointConfiguration.RegisterComponents(services => services.AddSingleton<CustomSubscriptionTableClientProvider>());

Table name configuration and creation

The default table name will be used for Sagas, Outbox and Subscription storage and can be set as follows:

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");
persistence.DefaultTable("TableName");

endpointConfiguration.EnableInstallers();

Configuring the table name

To provide a table at runtime or override the default table, the table information needs to be set as part of the message handling pipeline.

A behavior at the stage of theITransportReceiveContext:

class ContainerInfoTransportReceiveContextBehavior
    : Behavior<ITransportReceiveContext>
{
    public override async Task Invoke(ITransportReceiveContext context, Func<Task> next)
    {
        context.Extensions.Set(
            new TableInformation(
                tableName: "tableName"));

        await next();
    }
}

A behavior at the stage of the IIncomingLogicalMessageContext can be used as well:

class ContainerInfoLogicalReceiveContextBehavior
    : Behavior<IIncomingLogicalMessageContext>
{
    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        context.Extensions.Set(
            new TableInformation(
                tableName: "tableName"));

        await next();
    }
}

Enabling automatic table creation

To enable table creation on endpoint start or during runtime, the EnableInstallers API needs to be called on the endpoint configuration.

Note that when the default table is set, the table will be created on endpoint-start. When the table information is provided as part of the message handling pipeline, the tables will be created at runtime.

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");
persistence.DefaultTable("TableName");

endpointConfiguration.EnableInstallers();

Opting out from table creation when installers are enabled

In case installers are enabled, but there's a need to opt out from creating the tables, the DisableTableCreation-API may be used:

var persistence = endpointConfiguration.UsePersistence<AzureTablePersistence>();
persistence.ConnectionString("connectionString");
persistence.DefaultTable("TableName");

// make sure the table name specified in the DefaultTable exists when calling DisableTableCreation
endpointConfiguration.EnableInstallers();
persistence.DisableTableCreation();

Partitioning and compatibility mode helpers

During a given message handling pipeline, multiple data storage operations may occur. To commit them atomically, in a single transaction, they must share a partition key. In conversations involving a saga, the saga ID is a good candidate for a partition key. Unfortunately, saga IDs are determined late in the message handling process and are not exposed to user code.

This makes it difficult to:

  • Enable transactional saga data storage for existing sagas that were stored with the previous version of the persister.
  • Allow other data operations to participate in the saga data transaction.

To support the above scenarios, IProvidePartitionKeyFromSagaId may be injected into behaviors at the logical pipeline stage:

class OrderIdAsPartitionKeyBehavior : Behavior<IIncomingLogicalMessageContext>
{
    public OrderIdAsPartitionKeyBehavior(IProvidePartitionKeyFromSagaId partitionKeyFromSagaId) =>
        this.partitionKeyFromSagaId = partitionKeyFromSagaId;

    public override async Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
    {
        var correlationProperty = SagaCorrelationProperty.None;

        if (context.Message.Instance is IProvideOrderId provideOrderId)
        {
            Log.Debug($"Order ID: '{provideOrderId.OrderId}'");

            correlationProperty = new SagaCorrelationProperty("OrderId", provideOrderId.OrderId);
        }

        await partitionKeyFromSagaId.SetPartitionKey<OrderSagaData>(context, correlationProperty);

        Log.Debug($"Partition key: {context.Extensions.Get<TableEntityPartitionKey>().PartitionKey}");

        if (context.Headers.TryGetValue(Headers.SagaId, out var sagaId))
        {
            Log.Debug($"Saga ID: {sagaId}");
        }

        if (context.Extensions.TryGet<TableInformation>(out var tableInformation))
        {
            Log.Debug($"Table name: {tableInformation.TableName}");
        }

        await next();
    }

    public class Registration : RegisterStep
    {
        public Registration() :
            base(nameof(OrderIdAsPartitionKeyBehavior),
                typeof(OrderIdAsPartitionKeyBehavior),
                "Determines the PartitionKey from the logical message",
                provider => new OrderIdAsPartitionKeyBehavior(provider.GetRequiredService<IProvidePartitionKeyFromSagaId>())) =>
            InsertBefore(nameof(LogicalOutboxBehavior));
    }

    readonly IProvidePartitionKeyFromSagaId partitionKeyFromSagaId;
    static readonly ILog Log = LogManager.GetLogger<OrderIdAsPartitionKeyBehavior>();
}

IProvidePartitionKeyFromSagaId does the folllowing:

  • Sets the partition key on the IIncomingLogicalMessageContext based on the following algorithm:
    • Use the saga ID header value if present. Otherwise:
    • When compatibility mode is enabled, and the correlation property is not SagaCorrelationProperty.None, look up the saga ID either using the secondary index if present, or by table scanning the saga data if that is enabled. Otherwise:
    • Calculate the saga ID based on the specified correlation property.
  • If the table in which all data, including saga data, will be stored is not already set, set it using the saga data name as the table name.

For more information on connection string configuration see Configuring Azure Connection Strings.