Getting Started
Architecture
NServiceBus
Transports
Persistence
ServiceInsight
ServicePulse
ServiceControl
Monitoring
Samples

Using NServiceBus Sagas with AWS Lambda, SQS, and Aurora

Component: AWS Lambda (SQS)
NuGet Package: NServiceBus.AwsLambda.SQS (1.x)
Target Version: NServiceBus 8.x

This sample shows a basic saga using AWS Lambda with SQS and Aurora.

Prerequisites

The sample includes a CloudFormation template, which will deploy the Lambda function and create the necessary queues to run the code.

The Amazon.Lambda.Tools CLI can be used to deploy the template to an AWS account.

  1. Install the Amazon.Lambda.Tools CLI using dotnet tool install -g Amazon.Lambda.Tools
  2. Create an S3 bucket in the AWS region of choice
  3. Create a publicly accessible Aurora MySQL database (see AWS documentation for more information)
  4. Update the connection string in the DeployDatabase project and run it to deploy the database schema

Running the sample

Open the file serverless.template in the Sales project and update the value of the environment variable AuroraLambda_ConnectionString with the database's connection string.

Run the following command from the Sales directory to deploy the Lambda project:

dotnet lambda deploy-serverless

The deployment will ask for a stack name and an S3 bucket name to deploy the serverless stack. After that, running the sample will launch a single console window:

  • ClientUI is a console application that will send a PlaceOrder command to the Samples.Aurora.Lambda.Sales endpoint, which is monitored by the AWS Lambda.
  • The deployed Sales project will receive messages from the Samples.Aurora.Lambda.Sales queue and process them using the AWS Lambda runtime.

To try the AWS Lambda:

  1. From the ClientUI window, press Enter to send a PlaceOrder message to the trigger queue.
  2. The AWS Lambda will receive the PlaceOrder message and will start the OrderSaga.
  3. The OrderSaga will publish an OrderReceived event and a business SLA message OrderDelayed.
  4. The AWS Lambda receives the OrderReceived event which is handled by the BillCustomerHandler and the StageInventoryHandler. After a delay, each handler publishes an event, CustomerBilled and InventoryStaged, respectively.
  5. The AWS Lambda will receive the events. Once both events are received, the OrderSaga publishes an OrderShipped event. In case it took longer than the defined business SLA to bill and stage the order the client is informed about the order being delayed by publishing OrderDelayed.
  6. The ClientUI will handle the OrderShipped event and log a message to the console. It might occasionally also handle the OrderDelayed event and hand out 10% coupon codes.

Code walk-through

The ClientUI console application is an Amazon SQS endpoint that sends PlaceOrder commands and handles the OrderShipped event.

The Sales project is hosted using AWS Lambda. The static NServiceBus endpoint must be configured using details that come from the AWS Lambda ILambdaContext. Since that is not available until a message is handled by the function, the NServiceBus endpoint instance is deferred until the first message is processed, using a lambda expression such as:

static readonly AwsLambdaSQSEndpoint endpoint = new AwsLambdaSQSEndpoint(context =>
{
    var endpointConfiguration = new AwsLambdaSQSEndpointConfiguration("Samples.Aurora.Lambda.Sales");

    endpointConfiguration.UseSerialization<SystemJsonSerializer>();

    var connection = Environment.GetEnvironmentVariable("AuroraLambda_ConnectionString");

    var persistence = endpointConfiguration.AdvancedConfiguration.UsePersistence<SqlPersistence>();
    persistence.SqlDialect<SqlDialect.MySql>();
    persistence.ConnectionBuilder(
        connectionBuilder: () => new MySqlConnection(connection));

    return endpointConfiguration;
});

The same class defines the AWS Lambda, which hosts the NServiceBus endpoint. The ProcessOrder method hands processing of the message to NServiceBus:

public async Task ProcessOrder(SQSEvent eventData, ILambdaContext context)
{
    context.Logger.Log("ProcessOrder was called");

    await endpoint.Process(eventData, context, CancellationToken.None);
}

Meanwhile, the OrderSaga hosted within the AWS Lambda project is a regular NServiceBus saga which is also capable of sending and receiving messages itself.

public class OrderSaga : Saga<OrderSagaData>,
  IAmStartedByMessages<PlaceOrder>,
  IHandleMessages<CustomerBilled>,
  IHandleMessages<InventoryStaged>,
  IHandleTimeouts<OrderDelayed>
{
    protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
    {
        mapper.MapSaga(sagaData => sagaData.OrderId)
         .ToMessage<PlaceOrder>(s => s.OrderId)
         .ToMessage<CustomerBilled>(s => s.OrderId)
         .ToMessage<InventoryStaged>(s => s.OrderId);
    }

    public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
    {
        log.Info($"Placing order: {Data.OrderId}");

        await RequestTimeout(context, TimeSpan.FromSeconds(8), new OrderDelayed { OrderId = message.OrderId });

        await context.Publish(new OrderReceived
        {
            OrderId = message.OrderId
        });
    }

    public async Task Handle(CustomerBilled message, IMessageHandlerContext context)
    {
        log.Info($"The customer for order {Data.OrderId} has been billed.");
        Data.CustomerBilled = true;

        await ShipItIfPossible(context);
    }

    public async Task Handle(InventoryStaged message, IMessageHandlerContext context)
    {
        log.Info($"The inventory for order {Data.OrderId} has been staged.");
        Data.InventoryStaged = true;

        await ShipItIfPossible(context);
    }

    public async Task Timeout(OrderDelayed state, IMessageHandlerContext context)
    {
        log.Info($"Order {Data.OrderId} is slightly delayed.");

        await context.Publish(state);
    }

    async Task ShipItIfPossible(IMessageHandlerContext context)
    {
        if (Data is { CustomerBilled: true, InventoryStaged: true })
        {
            log.Info($"Order {Data.OrderId} has been shipped.");

            // Send duplicate message for outbox test.
            await context.Publish(new OrderShipped { OrderId = Data.OrderId });
            MarkAsComplete();
        }
    }

    static ILog log = LogManager.GetLogger(typeof(OrderSagaData));
}

Removing the sample stack

Remove the deployed stack with the following command:

dotnet lambda delete-serverless

and provide the previously chosen stack name.

Samples

Related Articles

  • Sagas
    Maintain statefulness in distributed systems with the saga pattern and NServiceBus' event-driven architecture with built-in fault-tolerance and scalability.