This sample demonstrates how an endpoint can use a different transport for audting to its main transport. In this instance, Azure Service Bus is the main transport used by the endpoint, and Azure Storage Queues is used as the transport for audit messages.
Two different transports are being used, which means the Azure Storage Queue transport dispatcher does not participate in the handler transaction. Hence there could be a scenario where the audit message is successfully sent, but an error occurs later in the pipeline that causes the receiver operation to be rolled back. This would result in two conflicting instances of the same message being visible in the Service Pulse all messages view, one showing as an error and the other as successfully processed.
Additionally, the audit instance cannot directly communicate with the error instance, hence plugin messages (e.g. Heartbeats, Custom Checks, Saga Audit, etc) will fail to send. There will be No destination specified for message errors in the audit instance log file.
Prerequisites
Ensure that:
- an instance of the latest Azurite Emulator is running
- the Azure Service Bus connection is stored in an environment variable called
AzureServiceBus_ConnectionString
- the
audit
queue exists in Azure Storage Queues
Optional
To see how the auditing messages appear in ServicePulse, two ServiceControl instances will need to be setup.
- ServiceControl Error instance with an Azure Service Bus transport and the same connection string as used in the sample.
- ServiceControl Audit instance with an Azure Storage Queue transport and the same connection string as used in the sample (the default local emulator
UseDevelopmentStorage=true
).
There's a docker-compose.
file provided which will setup the instances in a local container environment.
Projects
CustomAuditTransport
Main endpoint that is running on Azure Service Bus. It enables the audit
feature.
The configured queue name to AuditProcessedMessagesTo
will be used as the queue name on the Azure Storage Queue transport.
AuditViaASQ
A feature that uses Azure Storage Queues for audit messages instead of the transport used by the endpoint being audited.
The feature is turned on by default providing that Auditing is enabled and an audit queue has been defined.
AuditViaASQFeature()
{
EnableByDefault();
DependsOn<Audit>();
}
protected override void Setup(FeatureConfigurationContext context)
{
context.RegisterStartupTask(() => new AuditViaASQFeatureStartup());
context.Pipeline.Replace("AuditToDispatchConnector", new AuditDispatchTerminator());
}
It uses a PipelineTerminator, which is a special type of behavior, to replace the existing AuditToRoutingConnector process, which is the last step in the audit pipeline.
context.Pipeline.Replace("AuditToDispatchConnector", new AuditDispatchTerminator());
public class AuditDispatchTerminator :
PipelineTerminator<IAuditContext>
{
protected override async Task Terminate(IAuditContext context)
{
var sendOptions = new SendOptions();
sendOptions.SetDestination(context.AuditAddress);
foreach (var item in context.AuditMetadata)
{
context.Message.Headers[item.Key] = item.Value;
}
//NOTE the ASQ transport has a message size limit of 64KB, so if the message is larger than that, it will be rejected. Some checks would need to be put in place to handle that scenario.
var transportOperations = CreateTransportOperations(context.Message, context.AuditAddress);
//Transport transaction is being set to null since we cannot use the existing ASB transaction here.
//Each audit message is processed one at a time so there's also no point in creating an ASQ transaction for it.
await AuditViaASQFeatureStartup.AsqDispatcher!.Dispatch(transportOperations, null, context.CancellationToken);
}
private static TransportOperations CreateTransportOperations(OutgoingMessage message, string auditQueueAddress)
{
return new TransportOperations(new TransportOperation(message, new UnicastAddressTag(auditQueueAddress)));
}
}
Running the sample
CustomAuditTransport endpoint
Build the solution and run the CustomAuditTransport
project.
Press s
to send a message that will be successfully processed. Press e
to send a message that will error.
ServiceControl and ServicePulse
Pull the latest images
Before running the containers, ensure you're using the latest version of each image by executing the following command:
docker compose pull
This command checks for any updates to the images specified in the docker-compose.yml file and pulls them if available.
Start the containers
After pulling the latest images, modify the service-platform-error.
and env/
environment files, if necessary, and then start up the containers using:
docker compose up -d
Once composed, ServicePulse can be accessed at http://localhost:9090 to see how the messages appear in the All Messages
view.
Implementation details
- The ports for all services are exposed to localhost:
33333
: ServiceControl API44444
: Audit API8080
: Database backend9090
ServicePulse UI
- One instance of the
servicecontrol-ravendb
container is used for both theservicecontrol
andservicecontrol-audit
containers.- A single database container should not be shared between multiple ServiceControl instances in production scenarios.