AWS Storage Blog
Automatically modify data you are querying with Amazon Athena using Amazon S3 Object Lambda
Enterprises may want to customize their data sets for different requesting applications. For example, if you run an e-commerce website, you may want to mask Personally Identifiable Information (PII) when querying your data for analytics. Although you can create and store multiple customized copies of your data, that can increase your storage cost.
You can use Amazon S3 Object Lambda to transform your data dynamically. With S3 Object Lambda, you can write your own code to process data retrieved from Amazon S3 as it is returned to an application, as shown in this other blog. Now, you can also use S3 Object Lambda to automatically modify data queried through Amazon Athena. When you use S3 Object Lambda with Athena, you can more easily customize your data for different applications without having to maintain multiple derivative copies of source data in Amazon S3.
In this post, we show how you can use S3 Object Lambda to automatically modify your Amazon S3 data as you query it through Athena. We start with some use cases of this approach and then walk through a scenario where you can use an AWS Lambda function to mask a certain column of a Comma-Separated Values (CSV) file while it is queried through Athena. Using this approach eliminates the need to create and store multiple customized copies of your data for different requesting applications, saving on storage cost and providing a more efficient solution.
Solution overview
The following diagram shows an overview of this solution.
In this solution, we use an S3 Object Lambda Access Point alias as the source location of S3 data when creating an Amazon Athena table instead of specifying an S3 bucket. This will automatically invoke your Lambda function when you run a query in Amazon Athena. As a result, S3 Object Lambda will automatically modify your data you are querying with Athena for different requesting applications.
There are many use cases that can be simplified by this approach, for example:
- Querying data using custom authorization rules
- Querying objects in data formats other than those supported by Amazon Athena
- Querying objects that are compressed using compression formats other than those supported by Amazon Athena
- Querying objects encrypted with server-side encryption with customer-provided keys (SSE-C)
Solution walkthrough
Let us walk through a scenario where you can use S3 Object Lambda to automatically modify your data when you query it using Athena. The data we are using is the following CSV file, containing the columns transaction_id, timestamp, customer_id, city, product_id, and price
. We show how an e-commerce platform can mask the “customer_id” column using S3 Object Lambda before their data is queried for analytics through Athena.
transaction_id,timestamp,customer_id,city,product_id,price
312E1C0D,1665616201,211659,Wilmington,88,58.85
0A3E5656,1666014822,96EC97,Paradise,99,49.2
6FF4AEA6,1666050199,F07463,Tampa,79,28.43
6336A8A5,1666130292,1A7684,Bear,2,4.24
CABF3186,1666283029,9643D7,Seattle,28,62.81
3557D4C0,1666439246,CA302E,Gary,67,81.5
2C4EAC07,1666439837,098096,Chicago,27,82.51
31E51899,1666599603,676477,Sterling Heights,99,45.66
199953D0,1666917172,C71282,Akron,32,25.21
194B7797,1666983230,514B15,Milwaukee,98,39.43
You can start using S3 Object Lambda with Athena by following a few steps:
- Create an AWS Lambda Function to transform data for your use case.
- Create an S3 Object Lambda Access Point.
- Analyze data with Athena using the S3 Object Lambda Access Point alias.
Step 1: Create an AWS Lambda function
- Create a Lambda function. This post uses the
Python3
runtime. - Update the code of the Lambda function you created using the following code sample.
import csv
import hashlib
from urllib import request
import boto3
s3 = boto3.client("s3")
def transform(original_object):
lines = original_object.splitlines()
table = csv.DictReader(lines, delimiter=",")
# Masking "customer_id"
mask = lambda row: [
hashlib.sha256(v.encode()).hexdigest()[:10] if k == "customer_id" else v
for k, v in row.items()
]
transformed_table = [mask(row) for row in table]
transformed_object = "\n".join([",".join(row) for row in transformed_table])
return transformed_object
def lambda_handler(event, context):
print(event)
object_get_context = event["getObjectContext"]
request_route = object_get_context["outputRoute"]
request_token = object_get_context["outputToken"]
s3_url = object_get_context["inputS3Url"]
# Get object from S3
res = request.urlopen(s3_url)
headers = res.info()
content_type = headers["Content-Type"]
original_object = res.read().decode("utf-8")
# Transform object
transformed_object = transform(original_object)
# Write object back to S3 Object Lambda
s3.write_get_object_response(
Body=transformed_object,
RequestRoute=request_route,
RequestToken=request_token,
ContentType=content_type,
)
return {"status_code": 200}
We use this preceding Lambda function to mask the “customer_id” column in the dataset. This Lambda function has a handler function, and the request information from S3 Object Lambda is stored in the event object. If you are using S3 Object Lambda for the first time, then we recommend that you visit the user guide to learn about the format of the event context. Looking at the code of the function, there are three main sections:
- First, we use the
inputS3Url
property of the input event to download the original object. Since the value is a presigned URL, the function does not need permissions to read from Amazon S3. - Second, we transform data using the steps in the
transform
function. We hash and mask the “customer_id“ column in the CSV file. You can update this part of code to customize the behavior of the function for your use case. - Third, we use the
WriteGetObjectResponse
API to send the result of the transformation back to S3 Object Lambda. When you use theWriteGetObjectResponse
API, the transformed object can be much larger than the maximum size of the response returned by a Lambda function. This is because theWriteGetObjectResponse
API supports chunked transfer encoding to implement a streaming data transfer. The Lambda function only needs to return the status code (200 OK in this case), eventual errors, and optionally customize the metadata of the returned object as described in the S3GetObject
API. To use theWriteGetObjectResponse
API, we must update Lambda Execution Role. We can attach an AWS managed policy named AmazonS3ObjectLambdaExecutionRolePolicy.
If you want to restrict access to your Amazon S3 data to only S3 Object Lambda, then add the “Condition” shown in the following to your AWS Identity and Access Management (IAM) policy. We also recommend that you visit the user guide to learn about configuring IAM policies for S3 Object Lambda.
"Condition": {
"ForAnyValue:StringEquals": {
"aws:CalledVia": [
"s3-object-lambda.amazonaws.com"
]
}
}
Step 2: Create an S3 Object Lambda Access Point
Now that we have our Lambda function set up, let us create an S3 Access Point, which we use to create an S3 Object Lambda Access Point.
- In the Amazon S3 console, create an S3 Access Point.
- Then, create an S3 Object Lambda Access Point using the Supporting Access Point you just created.
3. During the configuration of the S3 Object Lambda Access Point, select the latest version of the preceding Lambda function that you created.
4. In the additional features section, select the option Lambda function supports requests using range GetObject, as shown in the following screenshot. This is because Athena uses range queries to optimally retrieve the object from Amazon S3.
Step 3: Analyze data with Athena using the S3 Object Lambda Access Point alias
Next, let us use the S3 Object Lambda Access Point in Athena to analyze the data.
- Open the S3 Object Lambda Access Point console. Then select the preceding S3 Object Lambda Access Point name that you created. In the Properties tab, you can see the Object Lambda Access Point alias. Copy this value to use in the following step.
Now, we use Athena to query the CSV file that we introduced earlier. But first, we create a table that Athena can use.
2. Open the Athena console. If this is your first time using the Athena console in your current AWS Region, then choose the Explore the query editor option to open the query editor. Choose Edit Settings to set up a query result location in Amazon S3. For each query that you run, Athena automatically stores the query results and metadata information in this query result location.
3. Execute the following query by selecting Run after replacing OBJECT_LAMBDA_ACCESS_POINT_ALIAS
with the value that you copied earlier.
CREATE EXTERNAL TABLE sample_sales (
`transaction_id` string,
`timestamp` int,
`customer_id` string,
`city` string,
`product_id` bigint,
`price` double
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES ('field.delim' = ',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://OBJECT_LAMBDA_ACCESS_POINT_ALIAS/'
TBLPROPERTIES (
'classification' = 'csv'
);
4. Finally, execute the following Athena query by selecting Run.
SELECT * FROM sample_sales limit 10;
This is the result we got. The “customer_id” column that was present in the original CSV file was hashed by S3 Object Lambda, as shown in the following figure.
Cleaning up
To avoid incurring future charges, you can delete the CSV file, Lambda function, Supporting Access Point, S3 Object Lambda Access Point, and Athena table created in this post when they are no longer needed.
Conclusion
In this post, we showed how you can dynamically transform your data using S3 Object Lambda when you query with Athena. Specifically, we walked through an example of dynamic masking on a CSV file where we masked the “customer_id” column. With S3 Object Lambda, you no longer need to store multiple derivative copies of your data for different requesting applications. You can transform your data on demand and save on storage cost of derivative copies. This post describes the steps for a specific example, but it can be extended easily. For example, you can add additional transformational logic to filter out rows with data more than a week old based on the “timestamp” column. To do this, you can modify your Lambda function to implement this transformation.
To learn more about S3 Object Lambda and Athena, visit the S3 Object Lambda User Guide and the Athena User Guide. For pricing information, visit the Amazon S3 pricing page and the Amazon Athena pricing page.
Thanks for reading this post. If you have any comments or questions, feel free to leave them in the comments section.