Using connection pooling
The PostgreSQL transport is built on top of ADO.NET and will use connection pooling. This may result in the connection pool being shared by the transport, as well as other parts of the endpoint process and the business logic.
In scenarios where the concurrent message processing limit is changed, or the database connection is used for other purposes mentioned above, change the connection pool size to ensure it will not be exhausted.
If the maximum pool size is not explicitly set on the connection string, a warning message will be logged. See also Tuning endpoint message processing.
Connection configuration
The connection string is configured in the constructor of the transport object:
var transport = new PostgreSqlTransport("SomeConnectionString");
Token Authentication
To connect using token credentials, a User ID must still be supplied in the connection string with the password supplied from the access token. Given that the token is short-lived, a data source builder must be utilized to handle password refreshes. The following example uses Microsoft Entra ID.
var connection = "Server=test.postgres.database.azure.com;Database=postgres;Port=5432;User Id=<entra user id>;Ssl Mode=Require;";
var dataSourceBuilder = new NpgsqlDataSourceBuilder(connection);
if (string.IsNullOrEmpty(dataSourceBuilder.ConnectionStringBuilder.Password))
{
dataSourceBuilder.UsePeriodicPasswordProvider(async (_, ct) =>
{
var credentials = new DefaultAzureCredential();
var token = await credentials.GetTokenAsync(new TokenRequestContext(["https://ossrdbms-aad.database.windows.net/.default"]), ct);
return token.Token;
}, TimeSpan.FromHours(24), TimeSpan.FromSeconds(10));
}
var builder = dataSourceBuilder.Build();
var transport = new PostgreSqlTransport(async cancellationToken =>
{
var connection = builder.CreateConnection();
try
{
await connection.OpenAsync(cancellationToken);
return connection;
}
catch
{
connection.Dispose();
throw;
}
});
Custom database schemas
The PostgreSQL transport uses public
as a default schema. It is used for every queue if no other schema is explicitly provided in a transport address. This includes all local queues, error, audit, and remote queues of other endpoints.
The default schema can be overridden using the DefaultSchema
method:
var transport = new PostgreSqlTransport("connectionString")
{
DefaultSchema = "myschema"
};
When subscribing to events between endpoints in different database schemas or catalogs, a shared subscription table must be configured.
Custom PostgreSQL transport connection factory
In some environments, it might be necessary to adapt to the database server settings or to perform additional operations. This can be done by passing a custom factory method to the transport which will provide connection strings at runtime and can perform custom actions:
var transport = new PostgreSqlTransport(
async cancellationToken =>
{
var connection = new NpgsqlConnection("SomeConnectionString");
try
{
await connection.OpenAsync(cancellationToken);
// perform custom operations
return connection;
}
catch
{
connection.Dispose();
throw;
}
});
If opening the connection fails, the custom connection factory must dispose of the connection object and rethrow the exception.
When using custom schemas, ensure the connection returned by the connection factory is granted sufficient permissions for the endpoint to perform its operations.
Circuit breaker
A built-in circuit breaker is used to handle intermittent PostgreSQL connectivity problems. When a failure occurs while trying to connect, a circuit breaker enters an armed state. If the failure is not resolved before the configured wait time elapses, the circuit breaker triggers the critical errors handling procedure.
Wait time
The circuit breaker's default time to wait before triggering is two minutes. Use the TimeToWaitBeforeTriggeringCircuitBreaker
method to change it.
var transport = new PostgreSqlTransport("connectionString")
{
TimeToWaitBeforeTriggeringCircuitBreaker = TimeSpan.FromMinutes(3)
};