AWS Storage Blog
Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and AWS Lambda
Data, the lifeblood of any modern organization, is rarely static. For high-performance applications and workloads, enterprises need the ability to run operations on massive amounts of data, including modifying the data as is necessary for each use case, to further accelerate processing. This could include modifying uploaded images with a watermark, changing the bitrate of audio files for optimized playback of different device types, or ETL processing where individual files need to be extracted, transformed, and then loaded.
Recently, we launched the Amazon S3 Express One Zone storage class, which provides up to 10x faster performance than Amazon S3 Standard. For years, customers have successfully used Amazon S3 to trigger AWS Lambda functions for large-scale data processing with their own custom file-processing logic. For data already in Amazon S3, the S3 Batch Operations feature can automate the Lambda processing of millions or billions of objects.
In this blog, we demonstrate how you can use S3 Batch Operations to automate large-scale object processing with AWS Lambda for data in S3 Express One Zone. We walk you through the steps to configure S3 Batch Operations with AWS Lambda to modify and copy objects from one S3 directory bucket to another. Performance-critical applications that need consistent single-digit millisecond request latency with data in S3 Express One Zone can leverage batch operations to further accelerate processing times.
S3 Express One Zone and S3 Batch Operations
Amazon S3 Express One Zone is a new high-performance, single-Availability Zone S3 storage class that is purpose-built to deliver consistent single-digit millisecond data access for your most latency-sensitive applications. With S3 Express One Zone’s fully elastic storage and virtually unlimited scale, you don’t need to plan or provision capacity or throughput requirements in advance. While you have always been able to choose a specific AWS Region to store your S3 data, with S3 Express One Zone you can select a specific AWS Availability Zone within an AWS Region to store your data. You can choose to co-locate your storage and compute resources in the same Availability Zone to further optimize performance, which helps lower compute costs and run workloads faster. With S3 Express One Zone, data is stored in a different bucket type—an Amazon S3 directory bucket—which supports hundreds of thousands of requests per second.
With S3 Batch Operations, you can perform large-scale batch operations on a list of specific S3 objects. To set up an S3 Batch Operation you do not need to write code nor deploy a service, instead you set up a job with few clicks and then leave it to S3. A single job can perform a specified operation on billions of objects containing exabytes of data. S3 Batch Operations tracks job progress, sends notifications, and stores a detailed completion report of all actions, providing a fully managed, auditable, and serverless experience. You can use S3 Batch Operations through the AWS Management Console, AWS Command Line Interface (CLI), Amazon SDKs, or REST API.
Solution overview and walkthrough
You can use S3 Batch Operations to invoke a Lambda function on large sets of S3 Express One Zone objects to give you more options to process your data. When you create the S3 Batch Operations job, you provide a manifest file located in an S3 general purpose bucket that contains the list of all objects in S3 Express One Zone to process. You can create this manifest file by exporting S3 ListObjectsV2 results to a CSV file. You can then configure the Lambda action to perform on those objects. When the job starts, Amazon S3 invokes the Lambda function for each object in the manifest.
To get started, you must package an updated AWS SDK version and new payload schema to AWS Lambda to use S3 Batch Operations, S3 Express One Zone, and AWS Lambda together. Here is what we cover next:
- Payload and AWS Lambda function code
- Creating an S3 Batch Operations job that invokes an AWS Lambda function
- Packaging the AWS Lambda function
1. Payload and AWS Lambda function code
S3 Batch Operations needs the required permissions to invoke a Lambda function. Use this guide to help you get started using S3 Batch Operations with Lambda. You can also use the directions listed here to assist you in creating an S3 Batch Operations job.
S3 Batch Operations has introduced a new payload schema to Lambda to help simplify your experience as you use S3 Batch Operations and Lambda. This includes the addition of UserArguments
and InvocationSchemaVersion
in the Create Job
request. To initiate a Lambda function to perform custom actions on objects in directory buckets, you must specify 2.0 as the value for InvocationSchemaVersion
.
Example request and response
For each object listed in your manifest, S3 Batch Operations invokes Lambda with a payload containing information about each of the objects in the manifest. The following is a JSON example of a request for the Lambda function. New fields are bolded.
{
"invocationSchemaVersion": "2.0",
"invocationId": "Jr3s8KZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZzatx7Ruy",
"job": {
"id": "ry77cd60-61f6-4a2b-8a21-d07600c874gf",
"userArguments": {
"MyDestinationBucket": "<destination-directory-bucket-name>",
"MyDestinationBucketRegion": "<destination-directory-bucket-region>",
"MyDestinationPrefix": "copied/",
"MyDestinationObjectKeySuffix": "_new_suffix"
}
},
"tasks": [
{
"taskId": "y5R3a2lkZ29lc2hlurcS",
"s3Key": "<S3 Object Key>",
"s3VersionId": null,
"s3Bucket": "<source-directory-bucket-name>"
}
]
}
The result code and result string are included in the S3 Batch Operations job report. Note that the result string will be truncated to 1,024 characters in the job report.
Example Lambda function for S3 Batch Operations
The following example Python Lambda renames the S3 Object key using data from the UserArguments
. These objects are then copied to a new directory bucket. The return payload to S3 Batch Operations contains the result code and result string for each object that is being processed.
Save this Lambda function in lambda_function.py on your local computer.
import datetime
import os
import time
import boto3
from urllib import parse
from botocore.exceptions import ClientError
def lambda_handler(event, context):
# Parse job parameters from S3 Batch Operations
invocationId = event['invocationId']
invocationSchemaVersion = event['invocationSchemaVersion']
# Parse user arguments
userArguments = event['job']['userArguments']
destinationBucket = userArguments['MyDestinationBucket']
destinationBucketRegion = userArguments['MyDestinationBucketRegion']
destinationPrefix = userArguments['MyDestinationPrefix']
destinationObjectKeySuffix = userArguments['MyDestinationObjectKeySuffix']
# Prepare results
results = []
s3Client = boto3.client('s3', region_name=destinationBucketRegion)
for task in event['tasks']:
taskId = task['taskId']
# Parse Amazon S3 Key, Key Version, and Bucket Name
s3Key = parse.unquote(task['s3Key'], encoding='utf-8')
s3VersionId = task['s3VersionId']
s3Bucket = task['s3Bucket']
# Construct CopySource with VersionId
copySrc = {'Bucket': s3Bucket, 'Key': s3Key}
if s3VersionId is not None:
copySrc['VersionId'] = s3VersionId
try:
# Prepare result code and string
resultCode = None
resultString = None
# Construct New Key
newKey = destinationPrefix + s3Key + destinationObjectKeySuffix
# Copy Object to New Bucket
response = s3Client.copy_object(
Bucket=destinationBucket,
CopySource= copySrc,
Key=newKey
)
# Mark as succeeded
resultCode = 'Succeeded'
# The resultString will be truncated by Batch Operations
resultString = str(response)
except ClientError as e:
# For all handled exceptions, mark as a temporary failure
# and S3 Batch Operations will mark the task for retry. If
# any other exceptions are received, mark as permanent failure.
print(e)
errorCode = e.response['Error']['Code']
errorMessage = e.response['Error']['Message']
if errorCode == 'RequestTimeout':
resultCode = 'TemporaryFailure'
resultString = 'Retry request to Amazon S3 due to RequestTimeout.'
elif errorCode == 'SlowDown':
resultCode = 'TemporaryFailure'
resultString = 'Retry request to Amazon S3 due to SlowDown.'
elif errorCode == '503 Service Unavailable':
resultCode = 'TemporaryFailure'
resultString = 'Retry request to Amazon S3 due to 503 Service Unavailable.'
elif errorCode == 'ServiceUnavailable':
resultCode = 'TemporaryFailure'
resultString = 'Retry request to Amazon S3 due to ServiceUnavailable.'
elif errorCode == 'InternalServerError':
resultCode = 'TemporaryFailure'
resultString = 'Retry request to Amazon S3 due to InternalServerError.'
elif errorCode == 'InternalError':
resultCode = 'TemporaryFailure'
resultString = 'Retry request to Amazon S3 due to InternalError.'
else:
resultCode = 'PermanentFailure'
resultString = '{}: {}'.format(errorCode, errorMessage)
except Exception as e:
print(e)
# Catch all exceptions to permanently fail the task
resultCode = 'PermanentFailure'
resultString = 'Exception: {}'.format(e.message)
finally:
results.append({
'taskId': taskId,
'resultCode': resultCode,
'resultString': resultString
})
return {
'invocationSchemaVersion': invocationSchemaVersion,
'treatMissingKeysAs': 'PermanentFailure',
'invocationId': invocationId,
'results': results
}
2. Creating an S3 Batch Operations job that invokes an AWS Lambda function
When creating an S3 Batch Operations job to invoke a Lambda function, you must provide the following:
- The ARN of your Lambda function (which might include the function alias or a specific version number)
- An AWS Identity and Access Management (IAM) role with permission to invoke the function
- The operation parameter
LambdaInvoke
For more information about creating an S3 Batch Operations job, see the documentation on creating an S3 Batch Operations job and Operations supported by S3 Batch operations.
The following example creates an S3 Batch Operations job that invokes a Lambda function using the AWS CLI to include the new UserArguments
. You must use a minimum version 2.14.2 of the AWS CLI or later.
aws s3control create-job \ --account-id <AccountID> \ --operation '{"LambdaInvoke": {"FunctionArn":"<Lambda Function ARN>","InvocationSchemaVersion":"2.0", "UserArguments": {"MyDestinationBucket": "<Destination Bucket Name>", "MyDestinationBucketRegion": "<Destination Bucket Region>", "MyDestinationPrefix": "copied/","MyDestinationObjectKeySuffix": "_new_suffix"}}}' \ --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields": ["Bucket","Key"]},"Location":{"ObjectArn":"<S3 Manifest File ARN>", "ETag":"<S3 Manifest File Entity Tag>"}}' \ --priority 1 \ --role-arn <Batch Operations Role ARN> \ --no-confirmation-required \ --report '{"Bucket":"<Report Bucket ARN>","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}'
3. Packaging the AWS Lambda function
To integrate Lambda with the new S3 directory buckets, you must use version 1.33.13 of the AWS SDK for Python or later. One way you can do this is to package the Lambda as a zip file and upload it to Lambda. The function must also have the necessary permissions to copy objects to the destination bucket. You can follow this guide to packaging your Lambda functions.
Save these policy files on your local computer:
Example trust-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Example s3express-policy-for-lambda.json
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Statement1",
"Effect": "Allow",
"Action": [
"s3express:CreateSession"
],
"Resource": [
"<Source Directory Bucket ARN>",
"<Destination Directory Bucket ARN>"
]
}
]
}
Run the following commands from the same folder that contains the Lambda function and the policy files.
## Create IAM role and policy aws iam create-role --role-name lambda-ex --assume-role-policy-document file://trust-policy.json # Attach a policy aws iam attach-role-policy --role-name lambda-ex --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole # Attach the policy to communicate with S3 directory buckets aws iam put-role-policy --role-name lambda-ex --policy-name s3express-policy-for-lambda --policy-document file://s3express-policy-for-lambda.json ## Package the code mkdir package pip install --target ./package boto3 cd package zip -r ../package.zip . cd .. zip package.zip lambda_function.py ## Create the function aws lambda create-function --function-name s3batchops-function \ --zip-file fileb://package.zip --handler lambda_function.lambda_handler --runtime python3.11 \ --role arn:aws:iam::123456789012:role/lambda-ex
Cleaning up
Perform the following steps to clean up your environment and avoid incurring future unwanted charges.
- Delete the Lambda function s3batchops-function.
- Delete the IAM role and policy for the Lambda function (lambda-ex and s3express-policy-for-lambda, respectively).
- Delete the IAM role for S3 Batch Operations.
Conclusion
In this post, we demonstrated how you can use AWS Lambda with S3 Batch Operations to process a large of number of objects stored in the high-performance S3 Express One Zone storage class and then copy those objects between S3 directory buckets. To accomplish this, we’ve demonstrated how you can use the updated payload schema in S3 Batch Operations to simplify integrating it with AWS Lambda.
You can use the examples provided in this post, and expand on them to perform customized operations on S3 objects. Using Lambda and the S3 Batch Operations UserArguments gives you a powerful, yet simple, method to perform large-scale operations on Amazon S3 objects. Accelerate your high-performance applications running on S3 Express One Zone even further with efficient, automated batch processing of objects.
Thanks for reading this post. If you have any comments or questions, feel free to leave them in the comments section.