Connection pooling
The PostgreSQL transport is built on top of ADO.NET and will use connection pooling. This may result in sharing of the connection pool by the transport, as well as other parts of the endpoint process and the business logic.
If increasing the concurrent message processing limit, or if the database connection is used for other purposes mentioned above, increase the connection pool size to ensure it is not exhausted.
If the maximum pool size is not explicitly set on the connection string, a warning message will be logged. See also Tuning endpoint message processing.
Connection configuration
The connection string is configured in the constructor of the transport object:
var transport = new PostgreSqlTransport("SomeConnectionString");
Token Authentication
To connect using token credentials, the following must be provided in the connection string:
- A User ID.
- The password taken from the access token.
Since tokens are short-lived, a data source builder must be utilized to handle password refreshes. The following example uses Microsoft Entra ID.
var connection = "Server=test.postgres.database.azure.com;Database=postgres;Port=5432;User Id=<entra user id>;Ssl Mode=Require;";
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connection);
if (string.IsNullOrEmpty(dataSourceBuilder.ConnectionStringBuilder.Password))
{
dataSourceBuilder.UsePeriodicPasswordProvider(async (_, ct) =>
{
var credentials = new DefaultAzureCredential();
var token = await credentials.GetTokenAsync(new TokenRequestContext(["https://ossrdbms-aad.database.windows.net/.default"]), ct);
return token.Token;
}, TimeSpan.FromHours(24), TimeSpan.FromSeconds(10));
}
var builder = dataSourceBuilder.Build();
var transport = new PostgreSqlTransport(async cancellationToken =>
{
var connection = builder.CreateConnection();
try
{
await connection.OpenAsync(cancellationToken);
return connection;
}
catch
{
connection.Dispose();
throw;
}
});
Custom database schemas
The transport uses public
as a default schema. It is used for every queue if no other schema is explicitly provided in a transport address. This includes all local queues, error, audit, and remote queues of other endpoints.
The default schema can be overridden using the DefaultSchema
method:
var transport = new PostgreSqlTransport("connectionString")
{
DefaultSchema = "myschema"
};
Subscribing to events between endpoints in different database schemas or catalogs requires a shared subscription table to be configured.
Custom connection factory
Some environments need additional connection operations, such as adapting to the database server settings. To achieve this, pass a custom factory method to the transport that will provide connection strings at runtime and can perform custom actions:
var transport = new PostgreSqlTransport(
async cancellationToken =>
{
var connection = new NpgsqlConnection("SomeConnectionString");
try
{
await connection.OpenAsync(cancellationToken);
// perform custom operations
return connection;
}
catch
{
connection.Dispose();
throw;
}
});
If opening the connection fails, the custom connection factory must dispose of the connection object and rethrow the exception.
When using custom schemas, ensure the connection returned by the connection factory is granted sufficient permissions for the endpoint to perform its operations.
Circuit breaker
A built-in circuit breaker is used to handle intermittent PostgreSQL connectivity problems. When a failure occurs while trying to connect, a circuit breaker enters an armed state. If the failure is not resolved before the configured wait time elapses, the circuit breaker triggers the critical errors handling procedure.
Wait time
The circuit breaker's default time to wait before triggering is two minutes. Use the TimeToWaitBeforeTriggeringCircuitBreaker
method to change it.
var transport = new PostgreSqlTransport("connectionString")
{
TimeToWaitBeforeTriggeringCircuitBreaker = TimeSpan.FromMinutes(3)
};