Multi-tenant endpoints

Component: NServiceBus
NuGet Package NServiceBus (6.x)

Prerequisites

An instance of SQL Server Express is installed and accessible as .\SqlExpress.

At startup each endpoint will create its required SQL assets including databases, tables and schemas.

The databases created by this sample are:

  • NsbSamplesMultiTenantSender
  • NsbSamplesMultiTenantReceiver
  • NsbSamplesMultiTenantA
  • NsbSamplesMultiTenantB

Running the project

  1. Start the Sender project (right-click on the project, select the Debug > Start new instance option).
  2. The text Press <enter> to send a message should be displayed in the Sender's console window.
  3. Start the Receiver project (right-click on the project, select the Debug > Start new instance option).
  4. The Sender should display subscription confirmation Subscribe from Receiver on message type OrderSubmitted.
  5. Press A or B on the Sender console to send a new message either to one of the tenants.

Verifying that the sample works correctly

  1. The Receiver displays information that an order was submitted.
  2. The Sender displays information that the order was accepted.
  3. Finally, after a couple of seconds, the Receiver displays confirmation that the timeout message has been received.
  4. Open SQL Server Management Studio and go to the tenant databases. Verify that there are rows in saga state table (dbo.OrderLifecycleSagaData) and in the orders table (dbo.Orders) for each message sent.
Timeouts are stored in the shared Receiver database so make sure to not include any sensitive information in the timeouts. Keep such information in the saga data and only use timeouts as notifications.

Code walk-through

This sample contains three projects:

  • Shared - A class library containing common code including messages definitions.
  • Sender - A console application responsible for sending the initial OrderSubmitted message and processing the follow-up OrderAccepted message.
  • Receiver - A console application responsible for processing the OrderSubmitted message, sending OrderAccepted message and randomly generating exceptions.

Sender project

The Sender does not store any data. It mimics the front-end system where orders are submitted by the users and passed via the bus to the back-end. It is configured to use MSMQ transport with NHibernate persistence and Outbox.

Receiver project

The Receiver mimics a back-end system. It is also configured to use MSMQ transport with NHibernate persistence and Outbox.

Creating the schema

var persistence = endpointConfiguration.UsePersistence<NHibernatePersistence>();
persistence.UseConfiguration(tenantDatabasesConfiguration);
persistence.UseSubscriptionStorageConfiguration(sharedDatabaseConfiguration);
persistence.UseTimeoutStorageConfiguration(sharedDatabaseConfiguration);
persistence.DisableSchemaUpdate();

endpointConfiguration.EnableOutbox();

var settings = endpointConfiguration.GetSettings();
settings.Set("NHibernate.Timeouts.AutoUpdateSchema", true);
settings.Set("NHibernate.Subscriptions.AutoUpdateSchema", true);
CreateSchema(tenantDatabasesConfiguration, Connections.TenantA);
CreateSchema(tenantDatabasesConfiguration, Connections.TenantB);

The above code makes sure that user, saga and outbox tables are created in the tenant databases while the timeouts and subscriptions -- in the shared database.

Tenant database

To allow for database isolation between the tenants the actual connection to the database need to be created based on the message being processed. This requires cooperation of several components:

  • Custom ConnectionProvider for NHibernate
  • A behavior that injects an incoming message and opens a new connection based on the tenant ID found in the headers of that message
  • A behavior that propagates the tenant ID information to outgoing messages
if (context.Extensions.TryGet("TenantId", out string tenant))
{
    context.Headers["TenantId"] = tenant;
}
return next();

Connection provider

The custom connection provider has to be registered with NHibernate

x.ConnectionProvider<MultiTenantConnectionProvider>();

The connection provider looks at the current message processing context. If there is an existing connection to the tenant database, it creates a new one with the same connection string. Otherwise it defaults to creating a connection to the shared database.

var connectionString = ExtractTenantConnectionStringBehavior.ConnectionStringHolder.Value;
if (connectionString != null)
{
    var connection = Driver.CreateConnection();
    connection.ConnectionString = connectionString;
    connection.Open();
    return connection;
}
The connection provider is only used by OutboxPersister's TryGet and MarkAsDispatched methods which execute in separate transaction from all the other storage operations.
The connection provider is a simple implementation that is not thread-safe. For this example, the Maximum Concurrency Level is set to 1 which makes it run in single thread mode.

Opening connection to tenant database

The MultiTenantOpenSqlConnectionBehavior behavior extracts the TenantId header from the incoming message and looks up the matching connection string in the app.config file.

if (!context.Message.Headers.TryGetValue("TenantId", out var tenant))
{
    throw new InvalidOperationException("No tenant id");
}
var connectionString = Connections.GetTenant(tenant);

ConnectionStringHolder.Value = connectionString;
try
{
    await next()
        .ConfigureAwait(false);
}
finally
{
    ConnectionStringHolder.Value = null;
}

This behavior needs to replace the built-in behavior

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register<ExtractTenantConnectionStringBehavior.Registration>();

Propagating the tenant information downstream

Finally the PropagateTenantIdBehavior behavior makes sure that tenant information is not lost and all outgoing messages have the same tenant ID as the message being processed.

if (context.MessageHeaders.TryGetValue("TenantId", out var tenant))
{
    context.Extensions.Set("TenantId", tenant);
}
return next();

This behavior also needs to be registered a configuration time.

var pipeline = endpointConfiguration.Pipeline;
pipeline.Register<PropagateOutgoingTenantIdBehavior.Registration>();
pipeline.Register<PropagateIncomingTenantIdBehavior.Registration>();

Related Articles


Last modified