AWS for Industries

Simplify Monte Carlo Simulations with AWS Serverless services

Learn how to execute Monte Carlo Simulations and Machine Learning data processing at any scale with AWS Step Functions Distributed Map and AWS Lambda.

Organizations across the financial services industry including those providing insurance products rely on Monte Carlo simulations and machine learning feature engineering processes for product development and risk analysis. These processes often require the execution of business logic across billions of individual business records such as policies for compliance and modeling needs. Many organizations rely on internal custom orchestration systems or big data frameworks to coordinate the parallel processing of their business logic across multiple compute nodes. However, the maintenance and operation of these orchestration systems can require significant effort from development resources or even dedicated internal teams. Additionally, organizations often manage large clusters of compute resources for executing business logic at scale, requiring significant operational and infrastructure investments.

With AWS Step Functions and AWS Lambda, customers can now perform Monte Carlo simulations and machine learning data processing tasks on billions of items across thousands of parallel execution environments without managing orchestration tools or compute clusters. AWS Step Functions provides fully managed serverless orchestration capabilities, including error handling and retries, with easy-to-configure massively parallel task orchestration. AWS Lambda allows customers to focus only on the code that adds business value by executing business logic in a fully managed, highly scalable environment while only paying for the milliseconds their code is running.

AWS Step Functions’ distributed map feature allows customers to execute business logic on AWS Lambda, Amazon Elastic Container Service (Amazon ECS), and almost any other compute platform. This solution utilizes the native AWS Lambda integration for scalability and simplicity, while the Github repository includes additional compute options using Amazon ECS using AWS Fargate and Amazon ECS on EC2 Spot, allowing customers to optimize the compute needs and cost structure to suit their specific workload needs.

Solution Overview

For this example use case we will perform a default prediction for each loan in a commercial portfolio for a potential federal reserve rate scenario. The solution could be easily modified to execute several different potential rate scenarios over the dataset in parallel or in sequence. The example solution deploys a simulated dataset with a default of 500,000 loans to demonstrate the scalability of the solution while minimizing the cost of services. The solution is capable of processing billions of items or objects.

The solution provides end to end orchestration for processing billions of records with your simulation or transformation logic using AWS Step Functions Distributed Map feature for orchestration and business logic executed in AWS Lambda to process data stored in Amazon Simple Storage Service (S3).

The AWS Step Functions State Machine utilizes an S3 Inventory report for the S3 Bucket containing your source records to be processed. Step Functions processes the inventory report and manages the batching, distribution, and retries when assigning records to worker instances for record processing. The solution includes an S3 inventory pre-processing step allocating up to 100 Million S3 objects to each Distributed Map instance within the State Machine. In the example dataset each S3 object is a CSV file containing a single loan record.

An AWS Lambda function contains the example simulation logic as python code. The Distributed Map feature provides each invocation of the Lambda Function with a configurable batch of S3 objects for processing. The example code within the Lambda Function reads each entry from the list, applies the specified business logic to the record contents, and writes the output to a new S3 object. After processing all of the objects provided the Lambda Function returns a success or failure status to the Distributed Map feature.

The AWS Step Function Distributed Map feature provides error handling and retries for each batch in the event of an error within the Lambda execution.

Prerequisites

  • An AWS Account
  • Access to AWS Services
    • AWS IAM with access to create policies and roles
    • AWS Lambda
    • AWS Step Functions with access to create State Machines
    • Amazon CloudWatch with access to create Log Groups

Getting started

  1. Clone the solution from Github to your workstation
  2. From the AWS CloudFormation Console, choose Create Stack
  3. In the Specify template section, choose Upload a template file
  4. Choose Choose file and navigate to the directory you cloned the repository to
  5. Navigate into the cloudformation directory and stack you want to deploy, choose the main.yml file
  6. On the next page please review the variables. Here you can adjust things like concurrency or how many records to generate.
  7. Choose Next
  8. Review the checkboxes at the bottom of the page. If you consent, check the boxes and choose Submit
  9. Once deployed, the Stacks will create two AWS Step Functions State Machines. You will first run the *-datagen-* workflow to generate the source data before processing. Next you will run the *-dataproc-* workflow to process the data

Solution Detail

For this solution we populate an S3 bucket with our simulated source dataset comprised of individual CSV files which each contains the loan details of a single commercial loan. An S3 Inventory report is also created for the S3 bucket providing metadata including the full S3 Bucket Path for each object. As S3 Inventory reports are created asynchronously the solution’s example template creates a simulated S3 Inventory report in the S3 bucket so we can get started immediately.

A single 4 step AWS Step Functions State Machine orchestrates inventory processing, worker management, work distribution, and retry logic for a dataset of almost any size.

AWS Step Functions State Machine1. Inventory Partitioning and Sampling

The S3 Inventory Partitioning Step provides the capability of processing more than one hundred million individual S3 Objects, scale processing concurrency beyond ten thousand, and apply object selection logic such as sampling or filters. If your use case does not require any of these capabilities this step could be omitted from your implementation.

The step utilizes Step Functions optimized Invoke Lambda integration to invoke the inventory partitioning AWS Lambda function. Step Functions passes the pre-configured input payload containing configuration and source S3 inventory information to the lambda function within the launch event. For most use cases you will provide the inventory location at execution time rather than pre-configuring this information within the workflow.

The Inventory Partition Lambda function reads the S3 inventory report manifest file and creates multiple new manifest files referencing inventory CSV files comprising up to 100 million records each. Each new inventory manifest file can then be processed in parallel in the following Distributed Map Step. If your use case includes filtering or sampling requirements of the dataset you may also re-process the inventory CSV files to include only those objects relevant to the specific use case.

The example Python code below accepts the original S3 manifest file which references one or more comma separated (CSV) files containing the dataset object inventory. The example code demonstrates both simple partitioning as well as sampling of the source dataset. The example partitioning logic creates manifest.json files for each inventory csv file ensuring that each distributed map execution receives less than one hundred million records and enabling processing concurrency beyond ten thousand. Your specific use case may indicate different partitioning logic. The example sampling code iterates through the inventory CSVs creating new inventory CSVs containing references to only the dataset records which meet the sampling criteria.

import boto3
import json
import csv
import pandas as pd
import io

s3_client = boto3.client('s3')
s3_resource = boto3.resource('s3')

def lambda_handler(event, context):
    bucket_v = event['inventory']['bucket']
    manifest_key_v = event['inventory']['key']
    new_manifest_key_prefix = event['inventory']['output_prefix']
    input_sampling = event['workshop_variables']['input_sampling']
    original_manifest = s3_client.get_object(Bucket=bucket_v, Key=manifest_key_v)
    original_manifest_json = json.loads(original_manifest['Body'].read())
    print(original_manifest_json)
    bucket = s3_resource.Bucket(bucket_v)
    df_batch_inventory = pd.DataFrame()
    output_manifest_manifest = {
        'files': []
    }
    output_manifest_manifest['bucket'] = bucket_v 
    
    # Record Counting Variables
    total_records = 0
    output_records = 0
    
    #If not sampling the input (sampling = 1) then we can just re-write manifest.json files only
    manifest_counter = 1
    if input_sampling == 1:
        for file in original_manifest_json['files']:
            inventory_manifest = {
                'files': []
            }
            inventory_manifest['sourceBucket'] = original_manifest_json['sourceBucket']
            inventory_manifest['destinationBucket'] = original_manifest_json['destinationBucket']
            inventory_manifest['fileFormat'] = original_manifest_json['fileFormat']
            inventory_manifest['fileSchema'] = original_manifest_json['fileSchema']
            inventory_manifest['files'].append({
                'key': file['key'],
                'size': file['size']
                })
            inventory_manifest_json = json.dumps(inventory_manifest)
            s3_resource.Object(bucket_v, new_manifest_key_prefix + 'manifest--{}.json'.format(manifest_counter)).put(Body=inventory_manifest_json)
            output_manifest_manifest['files'].append({
                    'key': new_manifest_key_prefix + 'manifest--{}.json'.format(manifest_counter),
                    'bucket': bucket_v
                    })
            manifest_counter += 1
    #If sampling or filtering the input dataset we will read and process the inventory CVS's and create modified versions for processing        
    else:
        im = 1
        for file in original_manifest_json['files']:
            obj = s3_resource.Object(bucket_v,file['key'])
            print(obj.key)
            obj_data = io.BytesIO(obj.get()['Body'].read())
            if str(file['key']).find("csv.gz") != -1:
                df_temp = pd.read_csv(obj_data, compression='gzip', names=['Bucket', 'Key', 'Size'], header=None)
            else:
                df_temp = pd.read_csv(obj_data, compression=None, names=['Bucket', 'Key', 'Size'], header=None)
            total_records += len(df_temp)
            print("Current observed record count: " + format(total_records))
            df_batch_inventory = pd.concat([df_batch_inventory,df_temp])
            if len(df_batch_inventory) > 950000:
                inventory_manifest = {
                    'files': []
                }
                inventory_manifest['sourceBucket'] = original_manifest_json['sourceBucket']
                inventory_manifest['destinationBucket'] = original_manifest_json['destinationBucket']
                inventory_manifest['fileFormat'] = original_manifest_json['fileFormat']
                inventory_manifest['fileSchema'] = original_manifest_json['fileSchema']
                if input_sampling != 1:
                    df_batch_inventory = df_batch_inventory[::input_sampling]
                csv_buffer = io.StringIO()
                output_records += len(df_batch_inventory)
                print("Output records this batch: " + format(len(df_batch_inventory)))
                print("Total output records to this point: " + format(output_records))
                df_batch_inventory.to_csv(csv_buffer, index=False, header=False)
                csv_tmp_name = new_manifest_key_prefix + 'inventory-' + format(im) + '.csv'
                s3_resource.Object(bucket_v, csv_tmp_name.format(im)).put(Body=csv_buffer.getvalue())
                inventory_manifest['files'].append({
                    'key': csv_tmp_name,
                    'size': len(csv_buffer.getvalue())
                    })
                print(inventory_manifest)
                inventory_manifest_json = json.dumps(inventory_manifest)
                s3_resource.Object(bucket_v, new_manifest_key_prefix + 'manifest--{}.json'.format(im)).put(Body=inventory_manifest_json)
                output_manifest_manifest['files'].append({
                        'key': new_manifest_key_prefix + 'manifest--{}.json'.format(im),
                        'bucket': bucket_v
                        })
                im += 1
                df_batch_inventory = pd.DataFrame(None)
    return {
        'statusCode': 200,
        'body': output_manifest_manifest
    }

2. Inline Map Orchestration

The Inline Map step accepts the list of manifest files from the Inventory Partition Lambda function step and orchestrates the distribution of the manifest files to the following step with a maximum concurrency of 40 Processing Distributed Map child workflows. For this example the concurrency for the Distributed Map Step is set 2 which provides a maximum aggregate processing capacity of 20,000 concurrent executions within 2 parallel Distributed Map sub-workflows.

3. Distributed Map Processing

The Processing DMap step utilizes the AWS Step Functions Distributed Map feature to read the S3 Inventory manifest provided by main workflow and processes the referenced S3 Inventory files. For this use case each line of the S3 Inventory file contains metadata referencing an object is S3 containing a single customer loan record. The Distributed Map feature creates batches of 100 loan files per our configuration. Each Distributed Map child workflow supports up to ten thousand concurrent workers. For this example the Runtime Concurrency is set to 500 which allows the two concurrent Distributed Map processes to utilize the 1,000 AWS Account default Lambda Concurrency Quota in a single AWS Region.

As Step Functions Distributed Map and AWS Lambda can scale to massive concurrency very quickly it is advisable to configure back off and retries within our workflow to allow downstream systems to scale to meet the processing needs. We utilize Step Functions Distributed Map retry feature to implement graceful back offs without any code. In this example we have configured retry logic for S3 bucket throughput scaling events and Lambda quota contention scenarios. Each new S3 bucket initially allocates a single partition of 5,500 reads and 3,500 writes per second which auto-scales based on usage patterns. To allow S3 to auto-scale to meet our workloads write demands, we configure the base retry delay, maximum retries, and back-off within the step function lambda integration. The same pattern is applicable to Lambda burst concurrency scaling and any other downstream systems used to support high concurrency data processing.

Distributed Map Processing4. Business Logic Data Process

The Lambda Worker step consists of a single lambda function which accepts a JSON payload from Distributed Map containing a batch of Loan objects stored in S3. The Lambda Function processes each loan by reading the contents of the S3 object and applies the loan default prediction logic for this simulation scenario. After processing the entire batch the Lambda Function then writes a new CSV file to S3 containing the loan details and default prediction value for each loan in the batch. The example demonstrates both individual output files and output files containing an entire batch loans to facilitate more efficient reads for analytics and ML workloads.

Step Functions error handling features removes the need to implement catch and retry logic within the Lambda function code. Errors within the function execution are raised to distributed map which then executes the configured retry and error processing including custom errors.

import boto3
import botocore
import json
import csv
import os
import pandas as pd
from io import StringIO
from botocore.config import Config
import random
from random import randint
import math

config = Config(retries = dict(max_attempts = 2, mode = 'standard'))
region = os.getenv('REGION')
count = os.getenv('RECORDCOUNT')
s3 = boto3.client('s3', region_name=region)
s3_resource = boto3.resource('s3', config=config)

results = ["NO", "NO", "NO", "NO", "YES", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO", "NO"]

def calculate_net_consumption(df):
  df['net_consumption'] = df['use [kW]'] - df['gen [kW]']
  return df
    
def calculate_default(df):
  df['WillDefault'] = results[randint(0,19)]
  return df
    
def handle_processing_errors(error):
  print("botocore Error Caught")
  if error.response['Error']['Code'] == 'SlowDown':
      print ("Client SlowDown Error")
      # Throw 503 from S3
      class SlowDown(Exception):
        pass
      raise SlowDown('Reduce S3 Requests')


def lambda_handler(event, context):
  output_prefix = event['BatchInput']['workshop_variables']['output_prefix']
  batch_output_files = event['BatchInput']['workshop_variables']['batch_output_files']
  number_of_output_buckets = len(event['BatchInput']['workshop_variables']['output_bucket'])
  input_sampling = event['BatchInput']['workshop_variables']['input_sampling']
  # Pick a random output bucket for this execution
  output_bucket = event['BatchInput']['workshop_variables']['output_bucket'][(random.randint(0, number_of_output_buckets -1))]
  df_batch = pd.DataFrame()
  #Load the full batch into the dataframe
  for item in event['Items']:
    #Only process source CSV Files, skipping folders an other metadata object entries.
    if str(item['Key']).find(".csv") == -1:
        print("Key with Zero bytes, skipping : " + item['Key'])
        continue
    try:
        obj = s3.get_object(Bucket=item['Bucket'], Key=item['Key'])
        s3_data = obj['Body'].read().decode('utf-8')
    except botocore.exceptions.ClientError as error:
        handle_processing_errors(error)
    df = pd.read_csv(StringIO(s3_data))
    #df = calculate_net_consumption(df)
    df = calculate_default(df)
    df_batch = pd.concat([df_batch,df])
    #Just using the last file name when batching
    file_name = item['Key'].split('/')[-1]
  #Split DataFrame into chunks based on the configured output batching size and write each of those out as an output file
  output_rows_per_file = event['BatchInput']['workshop_variables']['output_rows_per_file']
  list_df = [df_batch[i:i+output_rows_per_file] for i in range(0,len(df_batch),output_rows_per_file)]
  # Iterate through the list_df array and write out the output files to S3
  for i in range(len(list_df)):
      #Write the chunk to a csv file
      csv_buffer = StringIO()
      list_df[i].to_csv(csv_buffer, index=False)
      output_key = output_prefix + "/" + file_name + "_" + str(i) + ".csv"
      print("Writing out to s3: " + output_key)
      try:
          s3_resource.Object(output_bucket, output_key).put(Body=csv_buffer.getvalue())
      except botocore.exceptions.ClientError as error:
          handle_processing_errors(error)
  return {
    'statusCode': 200,
    'body': 'ok'
  }

Cleaning up

Delete any example resources you no longer need to avoid incurring future costs. You can destroy the entire stack of resources by deleting the stack from AWS CloudFormation.

  1. Navigate to the AWS CloudFormation Console and choose the deployed stack
  2. Choose Delete and follow the prompts

Conclusion

This blog post suggests an example solution for simplifying Monte Carlo Simulations and Machine Learning data processing implementations on AWS using AWS Step Functions Distributed Map. The solution demonstrates how the Distributed Map feature allows you to focus on developing the business logic required for distributed data processing use cases while removing the need to build and maintain complex orchestration solutions and distributed execution environments. Future blog posts will demonstrate additional business logic execution options including AWS Fargate Spot and AWS EC2 Spot.

For more AWS Financial Services resources, visit AWS Financial Services.

For more serverless learning resources, visit Serverless Land.

Josh Ragsdale

Josh Ragsdale

Josh Ragsdale is a Senior Enterprise Solutions Architect at AWS. He focuses on helping his customers business transformations while adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

Michael Haught

Michael Haught

Michael Haught is a Senior Technical Account Manager with AWS Enterprise Support. Michael has more than 10 years of experience in designing and implementing AWS solutions and more than 20 years in Infrastructure. He specializes in DevOps and Container Technologies

Uma Ramadoss

Uma Ramadoss

Uma Ramadoss is a specialist Solutions Architect at Amazon Web Services, focused on the Serverless platform. She is responsible for helping customers design and operate event-driven cloud-native applications and modern business workflows using services like Lambda, EventBridge, Step Functions, and Amazon MWAA.