This sample shows a basic saga using AWS Lambda with SQS and Aurora.
Prerequisites
The sample includes a CloudFormation
template, which will deploy the Lambda function and create the necessary queues to run the code.
The Amazon.
CLI can be used to deploy the template to an AWS account.
- Install the
Amazon.
usingLambda. Tools CLI dotnet tool install -g Amazon.
Lambda. Tools - Create an S3 bucket in the AWS region of choice
- Create a publicly accessible Aurora MySQL database (see AWS documentation for more information)
- Update the connection string in the
DeployDatabase
project and run it to deploy the database schema
A publicly accessible Aurora cluster is required for the purpose of running this sample, but is not required for production scenarios. Make sure to configure the appropriate access to the database cluster.
Running the sample
It is not possible at this stage to use the AWS .NET Mock Lambda Test Tool to run the sample locally.
Open the file serverless.
in the Sales
project and update the value of the environment variable AuroraLambda_ConnectionString
with the database's connection string.
Run the following command from the Sales
directory to deploy the Lambda project:
dotnet lambda deploy-serverless
The deployment will ask for a stack name and an S3 bucket name to deploy the serverless stack. After that, running the sample will launch a single console window:
- ClientUI is a console application that will send a
PlaceOrder
command to theSamples.
endpoint, which is monitored by the AWS Lambda.Aurora. Lambda. Sales - The deployed Sales project will receive messages from the
Samples.
queue and process them using the AWS Lambda runtime.Aurora. Lambda. Sales
To try the AWS Lambda:
- From the ClientUI window, press Enter to send a
PlaceOrder
message to the trigger queue. - The AWS Lambda will receive the
PlaceOrder
message and will start theOrderSaga
. - The
OrderSaga
will publish anOrderReceived
event and a business SLA messageOrderDelayed
. - The AWS Lambda receives the
OrderReceived
event which is handled by theBillCustomerHandler
and theStageInventoryHandler
. After a delay, each handler publishes an event,CustomerBilled
andInventoryStaged
, respectively. - The AWS Lambda will receive the events. Once both events are received, the
OrderSaga
publishes anOrderShipped
event. In case it took longer than the defined business SLA to bill and stage the order the client is informed about the order being delayed by publishingOrderDelayed
. - The ClientUI will handle the
OrderShipped
event and log a message to the console. It might occasionally also handle theOrderDelayed
event and hand out 10% coupon codes.
Code walk-through
The ClientUI console application is an Amazon SQS endpoint that sends PlaceOrder
commands and handles the OrderShipped
event.
The Sales project is hosted using AWS Lambda. The static NServiceBus endpoint must be configured using details that come from the AWS Lambda ILambdaContext
. Since that is not available until a message is handled by the function, the NServiceBus endpoint instance is deferred until the first message is processed, using a lambda expression such as:
static readonly AwsLambdaSQSEndpoint endpoint = new AwsLambdaSQSEndpoint(context =>
{
var endpointConfiguration = new AwsLambdaSQSEndpointConfiguration("Samples.Aurora.Lambda.Sales");
endpointConfiguration.UseSerialization<SystemJsonSerializer>();
var connection = Environment.GetEnvironmentVariable("AuroraLambda_ConnectionString");
var persistence = endpointConfiguration.AdvancedConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MySql>();
persistence.ConnectionBuilder(
connectionBuilder: () => new MySqlConnection(connection));
return endpointConfiguration;
});
The same class defines the AWS Lambda, which hosts the NServiceBus endpoint. The ProcessOrder
method hands processing of the message to NServiceBus:
public async Task ProcessOrder(SQSEvent eventData, ILambdaContext context)
{
context.Logger.Log("ProcessOrder was called");
await endpoint.Process(eventData, context, CancellationToken.None);
}
Meanwhile, the OrderSaga
hosted within the AWS Lambda project is a regular NServiceBus saga which is also capable of sending and receiving messages itself.
public class OrderSaga : Saga<OrderSagaData>,
IAmStartedByMessages<PlaceOrder>,
IHandleMessages<CustomerBilled>,
IHandleMessages<InventoryStaged>,
IHandleTimeouts<OrderDelayed>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<OrderSagaData> mapper)
{
mapper.MapSaga(sagaData => sagaData.OrderId)
.ToMessage<PlaceOrder>(s => s.OrderId)
.ToMessage<CustomerBilled>(s => s.OrderId)
.ToMessage<InventoryStaged>(s => s.OrderId);
}
public async Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
log.Info($"Placing order: {Data.OrderId}");
await RequestTimeout(context, TimeSpan.FromSeconds(8), new OrderDelayed { OrderId = message.OrderId });
await context.Publish(new OrderReceived
{
OrderId = message.OrderId
});
}
public async Task Handle(CustomerBilled message, IMessageHandlerContext context)
{
log.Info($"The customer for order {Data.OrderId} has been billed.");
Data.CustomerBilled = true;
await ShipItIfPossible(context);
}
public async Task Handle(InventoryStaged message, IMessageHandlerContext context)
{
log.Info($"The inventory for order {Data.OrderId} has been staged.");
Data.InventoryStaged = true;
await ShipItIfPossible(context);
}
public async Task Timeout(OrderDelayed state, IMessageHandlerContext context)
{
log.Info($"Order {Data.OrderId} is slightly delayed.");
await context.Publish(state);
}
async Task ShipItIfPossible(IMessageHandlerContext context)
{
if (Data is { CustomerBilled: true, InventoryStaged: true })
{
log.Info($"Order {Data.OrderId} has been shipped.");
// Send duplicate message for outbox test.
await context.Publish(new OrderShipped { OrderId = Data.OrderId });
MarkAsComplete();
}
}
static ILog log = LogManager.GetLogger(typeof(OrderSagaData));
}
Removing the sample stack
Remove the deployed stack with the following command:
dotnet lambda delete-serverless
and provide the previously chosen stack name.