Configuring an endpoint
To use Azure Service Bus as the underlying transport:
var transport = new AzureServiceBusTransport("Endpoint=sb://[NAMESPACE].servicebus.windows.net/;SharedAccessKeyName=[KEYNAME];SharedAccessKey=[KEY]", TopicTopology.Default);
endpointConfiguration.UseTransport(transport);
Connectivity
These settings control how the transport connects to the broker.
Transport
UseWebSockets
: Configures the transport to use AMQP over websockets.WebProxy
: Configures an optional web-proxy to use with AMQP over websockets.TimeToWaitBeforeTriggeringCircuitBreaker
: The time to wait before triggering the circuit breaker after a critical error occurred. Defaults to 2 minutes.
Retry-policy
CustomRetryPolicy(RetryPolicy)
: Allows replacement of the default retry policy.
Token-credentials
Enables usage of Microsoft Entra ID authentication such as managed identities for Azure resources instead of the shared secret in the connection string.
var transportWithTokenCredentials = new AzureServiceBusTransport("[NAMESPACE].servicebus.windows.net", new DefaultAzureCredential(), TopicTopology.Default);
endpointConfiguration.UseTransport(transportWithTokenCredentials);
Entity creation
These settings control how the transport creates entities in the Azure Service Bus namespace.
Entity creation settings are applied only at creation time of the corresponding entities; they are not updated on subsequent startups.
Access rights
The transport can be run without Manage
rights when using a shared access policy or without the Azure Service Bus Data Owner role if authenticating using Managed Identities.
To run without manage rights:
- Make sure that installers are not configured to run
- Use operational scripting to provision provision queues, topics and subscriptions
Topology
Topology
: The topology used to publish and subscribe to events between endpoints. The topology is shared by the endpoints that need to publish and subscribe to events from each other. The topology has to be explicitly passed into the constructor
Endpoints that do not require backward compatibility with the previous single-topic topology should be using TopicTopology.
which represents the new default topic-per-event topology. For transports requiring compatibility during the migration towards the topic-per-event topology the upgrade guide describes in more details the migration topology.
Topic names must adhere to the limits outlined in the Microsoft documentation on topic creation.
Mapping
Options
It is possible to configure a topology entirely from configuration by loading a serialized version of the options and using the TopicTopology.
to create the topology.
This allows loading topology configuration from Application configuration or any other sources. The options layer also provides support for source generated serializer options as part of TopologyOptionsSerializationContext
.
The following snippet demonstrates raw deserialization of options and creating the topology from those options. Usage may vary depending on the usage cases. For more details how to load options in the generic host consolidate the options sample.
using var stream = File.OpenRead("topology-options.json");
var options = JsonSerializer.Deserialize<TopologyOptions>(stream, TopologyOptionsSerializationContext.Default.Options);
var jsonTopology = TopicTopology.FromOptions(options);
The topology json document for the topic-per-event topology looks following:
{
"$type": "topology-options",
"PublishedEventToTopicsMap": {
"MyNamespace.SomeEvent": "some-event"
},
"SubscribedEventToTopicsMap": {
"MyNamespace.SomeEvent": "some-event"
},
"QueueNameToSubscriptionNameMap": {
"Publisher": "PublisherSubscriptionName"
}
}
In order to support polymorphic events, one event (base type) can be mapped to multiple topics (where the derived events are published):
{
"$type": "topology-options",
"PublishedEventToTopicsMap": {
"MyNamespace.SomeEvent": "some-event"
},
"SubscribedEventToTopicsMap": {
"MyNamespace.SomeEvent": [
"some-event",
"some-other-event"
]
},
"QueueNameToSubscriptionNameMap": {
"Publisher": "PublisherSubscriptionName"
}
}
Loading from json is also supported for the migration topology:
{
"$type": "migration-topology-options",
"TopicToPublishTo": "TopicToPublishTo",
"TopicToSubscribeOn": "TopicToSubscribeOn",
"EventsToMigrateMap": [
"MyNamespace.NotYetMigratedEvent"
],
"SubscribedEventToRuleNameMap": {
"MyNamespace.NotYetMigratedEvent": "EventRuleName"
},
"PublishedEventToTopicsMap": {
"MyNamespace.MigratedEvent": "MigratedEvent"
},
"SubscribedEventToTopicsMap": {
"MyNamespace.MigratedEvent": "MigratedEvent"
},
"QueueNameToSubscriptionNameMap": {
"Publisher": "PublisherSubscriptionName"
}
}
Validation
During the start of the transport the topology configuration is validated against some of the well known limitations like entity, subscription or rule name lengths and some consistency validation is executed.
The default validator uses data validations and source generated options validation. The default validator can be overriden or the validation can be entirely disabled.
transport.Topology.OptionsValidator = new TopologyOptionsDisableValidationValidator();
Disabling the validator might be desirable in generic hosting scenarios when the topology options are loaded from the Application configuration and the validator is registered to validate at startup to avoid double validating.
Settings
EntityMaximumSize
: The maximum entity size in GB. The value must correspond to a valid value for the namespace type. Defaults to 5. See the Microsoft documentation on quotas and limits for valid values.EnablePartitioning
: Partitioned entities offer higher availability, reliability, and throughput over conventional non-partitioned queues and topics. For more information about partitioned entities see the Microsoft documentation on partitioned messaging entities.
Controlling the prefetch count
When consuming messages from the broker, throughput can be improved by having the consumer prefetch additional messages. The prefetch count is calculated by multiplying maximum concurrency by the prefetch multiplier. The default value of the multiplier is 10, but it can be changed by using the following:
transport.PrefetchMultiplier = 3;
Alternatively, the whole calculation can be overridden by setting the prefetch count directly using the following:
transport.PrefetchCount = 100;
To disable prefetching, prefetch count should be set to zero.
The lock duration for all prefetched messages starts as soon as they are fetched. To avoid LockLostException
, ensure the lock-renewal duration is longer than the total time it takes to process all prefetched messages (i.e., message handler execution time multiplied by the prefetch count). In addition, it's important to consider how endpoints are scaled. If the prefetch count is high, the lock may deprive other endpoint instances of messages, making those instances redundant.
Lock-renewal
For all supported transport transaction modes (except TransportTransactionMode.
), the transport utilizes a peek-lock mechanism to ensure that only one instance of an endpoint can process a message. The default lock duration is set during entity creation. By default, the transport uses the SDK's default maximum auto lock renewal duration of 5 minutes.
To ensure smooth processing, it is recommended to configuring the MaxAutoLockRenewalDuration
property to be greater than the longest running handler for the endpoint. This helps avoid LockLostException
and ensures the message is properly handled by the recoverability process.
The lock can be auto-renewed beyond the default by overriding the MaxAutoLockRenewalDuration
.
transport.MaxAutoLockRenewalDuration = TimeSpan.FromMinutes(10);
Message lock renewal is initiated by client code, not the broker. If a request to renew a lock fails after all the SDK built-in retries (e.g., due to connection loss), the lock won't be renewed, and the message will become unlocked and available for processing by competing consumers. Lock renewal should be treated as a best effort, not as a guaranteed operation.
The following approaches may be considered to minimize or avoid the occurrence of message lock renewals:
- Optimise the message handlers to reduce their execution time.
- Reduce the prefetch count. All messages are locked on peek, so when they are prefetched, they remain locked until they are all processed.