Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Using Outbox with CosmosDB

Integrates the RabbitMQ Server transport with CosmosDb persistence.

This sample demonstrates how the Outbox feature works to ensure the atomic processing of a message in CosmosDb, ensuring that messages sent and received are kept consistent with any modifications made to business data in a database.

This sample uses Docker Compose to provide dependencies. It is not necessary to have installed instances of RabbitMQ or SQL Server.

Prerequisites

  1. Install Docker.
  2. Install Docker Compose.
  3. If running Docker on Windows, set Docker to use Linux containers.
  4. In the sample directory, execute the following to set up the RabbitMQ and CosmosDb emulator instances:
> docker compose up --detach

Once complete, the RabbitMQ administration can be reached via http://localhost:15672/ with username rabbitmq and password rabbitmq.

Running the project

The code consists of a single NServiceBus endpoint project, which simulates receiving duplicated messages (normally received due to at-least-once delivery guarantees of the message broker) and processing them under three different circumstances.

  1. Without protection, resulting in duplicated processing of messages.
  2. Using the Outbox but with a maximum message concurrency of 1.
  3. Using the Outbox but with multiple messages being processed simultaneously, relying on the concurrency exception thrown by the database to ensure exactly-once successful processing of messages.

Selecting each step is accomplished via commenting and uncommenting code in the Program.cs file:

// STEP 1: Run code as is, duplicates can be observed in console and database
var transactionInformation = persistence.TransactionInformation();
transactionInformation.ExtractPartitionKeyFromHeader("PartitionKeyHeader");

// STEP 2: Uncomment this line to enable the Outbox. Duplicates will be suppressed.
// endpointConfiguration.EnableOutbox();

// STEP 3: Comment out this line to allow concurrent processing. Concurrency exceptions will
// occur in the console window, but only 5 entries will be made in the database.
endpointConfiguration.LimitMessageProcessingConcurrencyTo(1);

Step 1: No protection

First, run the sample as-is. It's easy to see from the console output that each MessageId is processed twice. The endpoint has no way to know that it's handling duplicated messages.

Endpoint started. Press Enter to send 5 sets of duplicate messages...

Processing MessageId f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa
Processing MessageId f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa
Processing MessageId 78c4be6d-02f1-4dc6-9aa0-e1298dcb28f9
Processing MessageId 78c4be6d-02f1-4dc6-9aa0-e1298dcb28f9
Processing MessageId 96ed249b-ac80-4eea-be16-eede5f368b09
Processing MessageId 96ed249b-ac80-4eea-be16-eede5f368b09
Processing MessageId 95c6f325-22e2-4832-8a2a-0988ce318f40
Processing MessageId 95c6f325-22e2-4832-8a2a-0988ce318f40
Processing MessageId 1333e38d-b076-41e1-92d6-a2b7a699f62f
Processing MessageId 1333e38d-b076-41e1-92d6-a2b7a699f62f
Press any key to exit

The message handler also writes the received MessageId to the BusinessObject table in the database, as a simulation of writing business data. To see it, execute select * from BusinessObject when connected to the database:

IdMessageId
1f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa
2f4589be7-efaa-4d7b-8fc7-414ac6e5ddfa
378c4be6d-02f1-4dc6-9aa0-e1298dcb28f9
478c4be6d-02f1-4dc6-9aa0-e1298dcb28f9
596ed249b-ac80-4eea-be16-eede5f368b09
696ed249b-ac80-4eea-be16-eede5f368b09
795c6f325-22e2-4832-8a2a-0988ce318f40
895c6f325-22e2-4832-8a2a-0988ce318f40
91333e38d-b076-41e1-92d6-a2b7a699f62f
101333e38d-b076-41e1-92d6-a2b7a699f62f

Step 2: Outbox, 1 message at a time

Next, uncomment the line in Program.cs commented as Step 2, which enables the Outbox feature.

It's clear from the console output that each MessageId is only processed a single time, and the message handler does not execute for the duplicate messages:

Endpoint started. Press Enter to send 5 sets of duplicate messages...

Processing MessageId 4d9b207b-f7e3-48f3-a8a9-8bf00afe0bd5
Processing MessageId cfda075b-2ebd-4256-b127-20fbf719873c
Processing MessageId 48cb433c-2cda-4835-a40c-1f06e7c4ace5
Processing MessageId 93132c07-e39d-4eb1-9d34-db4dee493f17
Processing MessageId 247706a4-c231-441c-b5fd-9f2defe6bb6f
Press any key to exit

The same is true in CosmosDb:

IdMessageId
114d9b207b-f7e3-48f3-a8a9-8bf00afe0bd5
12cfda075b-2ebd-4256-b127-20fbf719873c
1348cb433c-2cda-4835-a40c-1f06e7c4ace5
1493132c07-e39d-4eb1-9d34-db4dee493f17
15247706a4-c231-441c-b5fd-9f2defe6bb6f

Code walk-through

In Program.cs, an NServiceBus endpoint is created and configured to use the RabbitMQ transport, connecting to the broker instance hosted in Docker:

var endpointConfiguration = new EndpointConfiguration("Samples.CosmosDb.Outbox");

var rabbitMqTransport = new RabbitMQTransport(RoutingTopology.Conventional(QueueType.Classic), "host=localhost;username=rabbitmq;password=rabbitmq")
{
    TransportTransactionMode = TransportTransactionMode.ReceiveOnly
};
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
endpointConfiguration.UseTransport(rabbitMqTransport);

Next, CosmosDb Persistence is configured to connect to the local CosmosDb emulator instance hosted in Docker.

var persistence = endpointConfiguration.UsePersistence<CosmosPersistence>();

endpointConfiguration.UsePersistence<CosmosPersistence>()
            .CosmosClient(new CosmosClient(connectionString))
            .DatabaseName("Samples.Database.Demo");

persistence.DefaultContainer("Outbox", "/messageId");

MyHandler.cs contains the message handler.

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
    Console.WriteLine($"Processing MessageId {context.MessageId}");
    await Task.CompletedTask;
}

The message handler:

  1. Logs the MessageId to the console.

Cleaning up

Once finished with the sample, the RabbitMQ and local CosmosDb instances can be cleaned up using:

> docker compose down

Related Articles