This sample shows a basic saga using AWS Lambda with SQS and Aurora.
Prerequisites
The sample includes a CloudFormation template, which will deploy the Lambda function and create the necessary queues to run the code.
The Amazon. CLI can be used to deploy the template to an AWS account.
- Install the
Amazon.usingLambda. Tools CLI dotnet tool install -g Amazon.Lambda. Tools - Create an S3 bucket in the AWS region of choice
- Create a publicly accessible Aurora MySQL database (see AWS documentation for more information)
- Update the connection string in the
DeployDatabaseproject and run it to deploy the database schema
A publicly accessible Aurora cluster is required for the purpose of running this sample, but is not required for production scenarios. Make sure to configure the appropriate access to the database cluster.
Running the sample
It is not possible at this stage to use the AWS .NET Mock Lambda Test Tool to run the sample locally.
Open the file serverless. in the Sales project and update the value of the environment variable AuroraLambda_ConnectionString with the database's connection string.
Run the following command from the Sales directory to deploy the Lambda project:
dotnet lambda deploy-serverless
The deployment will ask for a stack name and an S3 bucket name to deploy the serverless stack. After that, running the sample will launch a single console window:
- ClientUI is a console application that will send a
PlaceOrdercommand to theSamples.endpoint, which is monitored by the AWS Lambda.Aurora. Lambda. Sales - The deployed Sales project will receive messages from the
Samples.queue and process them using the AWS Lambda runtime.Aurora. Lambda. Sales
To try the AWS Lambda:
- From the ClientUI window, press Enter to send a
PlaceOrdermessage to the trigger queue. - The AWS Lambda will receive the
PlaceOrdermessage and will start theOrderSaga. - The
OrderSagawill publish anOrderReceivedevent and a business SLA messageOrderDelayed. - The AWS Lambda receives the
OrderReceivedevent which is handled by theBillCustomerHandlerand theStageInventoryHandler. After a delay, each handler publishes an event,CustomerBilledandInventoryStaged, respectively. - The AWS Lambda will receive the events. Once both events are received, the
OrderSagapublishes anOrderShippedevent. In case it took longer than the defined business SLA to bill and stage the order the client is informed about the order being delayed by publishingOrderDelayed. - The ClientUI will handle the
OrderShippedevent and log a message to the console. It might occasionally also handle theOrderDelayedevent and hand out 10% coupon codes.
Code walk-through
The ClientUI console application is an Amazon SQS endpoint that sends PlaceOrder commands and handles the OrderShipped event.
The Sales project is hosted using AWS Lambda. The static NServiceBus endpoint must be configured using details that come from the AWS Lambda ILambdaContext. Since that is not available until a message is handled by the function, the NServiceBus endpoint instance is deferred until the first message is processed, using a lambda expression such as:
static readonly AwsLambdaSQSEndpoint endpoint = new AwsLambdaSQSEndpoint(context =>
{
var endpointConfiguration = new AwsLambdaSQSEndpointConfiguration("Samples.Aurora.Lambda.Sales");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var connection = Environment.GetEnvironmentVariable("AuroraLambda_ConnectionString");
var persistence = endpointConfiguration.AdvancedConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MySql>();
persistence.ConnectionBuilder(
connectionBuilder: () => new MySqlConnection(connection));
return endpointConfiguration;
});
The same class defines the AWS Lambda, which hosts the NServiceBus endpoint. The ProcessOrder method hands processing of the message to NServiceBus:
public async Task ProcessOrder(SQSEvent eventData, ILambdaContext context)
{
context.Logger.Log("ProcessOrder was called");
await endpoint.Process(eventData, context, CancellationToken.None);
}
Meanwhile, the OrderSaga hosted within the AWS Lambda project is a regular NServiceBus saga which is also capable of sending and receiving messages itself.
using NServiceBus.Logging;
public class OrderSaga : Saga<OrderSagaData>,
IAmStartedByMessages<PlaceOrder>,
IHandleMessages<CustomerBilled>,
IHandleMessages<InventoryStaged>,
IHandleTimeouts<OrderDelayed>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(sagaData => sagaData.OrderId)
.ToMessage<PlaceOrder>(s => s.OrderId)
.ToMessage<CustomerBilled>(s => s.OrderId)
.ToMessage<InventoryStaged>(s => s.OrderId);
}
public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
logger.InfoFormat("Placing order: {0}", Data.OrderId);
await RequestTimeout(context, TimeSpan.FromSeconds(8), new OrderDelayed { OrderId = message.OrderId });
await context.Publish(new OrderReceived
{
OrderId = message.OrderId
});
}
public async Task Handle(CustomerBilled message, IMessageHandlerContext context)
{
logger.InfoFormat("The customer for order {0} has been billed.", Data.OrderId);
Data.CustomerBilled = true;
await ShipItIfPossible(context);
}
public async Task Handle(InventoryStaged message, IMessageHandlerContext context)
{
logger.InfoFormat("The inventory for order {0} has been staged.", Data.OrderId);
Data.InventoryStaged = true;
await ShipItIfPossible(context);
}
public async Task Timeout(OrderDelayed state, IMessageHandlerContext context)
{
logger.InfoFormat("Order {0} is slightly delayed.", Data.OrderId);
await context.Publish(state);
}
async Task ShipItIfPossible(IMessageHandlerContext context)
{
if (Data is { CustomerBilled: true, InventoryStaged: true })
{
logger.InfoFormat("Order {0} has been shipped.", Data.OrderId);
// Send duplicate message for outbox test.
await context.Publish(new OrderShipped { OrderId = Data.OrderId });
MarkAsComplete();
}
}
static ILog logger = LogManager.GetLogger(typeof(OrderSagaData));
}
Removing the sample stack
Remove the deployed stack with the following command:
dotnet lambda delete-serverless
and provide the previously chosen stack name.