AWS Security Blog

Automate Amazon Athena queries for PCI DSS log review using AWS Lambda

In this post, I will show you how to use AWS Lambda to automate PCI DSS (v3.2.1) evidence generation, and daily log review to assist with your ongoing PCI DSS activities. We will specifically be looking at AWS CloudTrail Logs stored centrally in Amazon Simple Storage Service (Amazon S3) (which is also a Well-Architected Security Pillar best practice) and use Amazon Athena to query.

This post assumes familiarity with creating a database in Athena. If you’re new to Athena, please take a look at the Athena getting started guide and create a database before continuing. Take note of the bucket chosen for the output of Athena query results, we will use it later in this post.

In this post, we walk through:

  • Creating a partitioned table for your AWS CloudTrail logs. In order to reduce costs and time to query results in Athena, we’ll show you how to partition your data. If you’re not already familiar with partitioning, you can learn about it in the Athena user guide.
  • Constructing SQL queries to search for PCI DSS audit log evidence. The SQL queries that are provided in this post are directly related to PCI DSS requirement 10. Customizing these queries to meet your responsibilities may be able to assist you in preparing for a PCI DSS assessment.
  • Creating an AWS Lambda function to automate running these SQL queries daily, in order to help address the PCI DSS daily log review requirement 10.6.1.

Create and partition a table

The following code will create and partition a table for CloudTrail logs. Before you execute this query, be sure to replace the variable placeholders with the information from your database. They are:

  • <YOUR_TABLE> – the name of your Athena table
  • LOCATION – the path to your CloudTrail logs in Amazon S3. An example is included in the following code. It includes the variable placeholders:
    • <AWS_ACCOUNT_NUMBER> – your AWS account number. If using organizational CloudTrail, use the following format throughout the post for this variable: o-<orgID>/<ACCOUNT_NUMBER>
    • <LOG_BUCKET> – the bucket name where the CloudTrail logs to be queried reside

CREATE EXTERNAL TABLE <YOUR_TABLE> (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                userName: STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING
)
COMMENT 'CloudTrail table'
PARTITIONED BY(region string, year string, month string, day string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');

Execute the query. You should see a message stating Query successful.

Figure 1: Query successful

Figure 1: Query successful

The preceding query creates a CloudTrail table and defines the partitions in your Athena database. Before you begin running queries to generate evidence, you will need to run alter table commands to finalize the partitioning.

Be sure to update the following variable placeholders with your information:

  • <YOUR_DATABASE> – the name of your Athena database
  • <YOUR_TABLE>
  • <LOG_BUCKET>
  • <AWS_ACCOUNT_NUMBER>

Provide values for the following variables:

  • region – region of the logs to partition
  • month – month of the logs to partition
  • day – day of the logs to partition
  • year – year of the logs to partition
  • LOCATION – the path to your CloudTrail logs in Amazon S3 to partition, down to the specific day (should match the preceding values of region, month, day, and year). It includes the variable placeholders:
    • <AWS_ACCOUNT_NUMBER>
    • <LOG_BUCKET>

ALTER TABLE <YOUR_DATABASE>.<YOUR_TABLE>  ADD partition  (region='us-east-1', month='02', day='28', year='2020') location 's3://<LOG_BUCKET>/AWSLogs/<AWS_ACCOUNT_NUMBER>/CloudTrail/us-east-1/2020/02/28/';

After the partition has been configured, you can query logs from the date and region that was partitioned. Here’s an example for PCI DSS requirement 10.2.4 (all relevant PCI DSS requirements are described later in this post).


SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND region= 'us-east-1' AND year='2020' AND month='02' AND day='28';

Create a Lambda function to save time

As you can see, this process above can involve a lot of manual steps as you set up partitioning for each region and then query for each day or region. Let’s simplify the process by putting these into a Lambda function.

Use the Lambda console to create a function

To create the Lambda function:

  1. Open the Lambda console and choose Create function, and select the option to Author from scratch.
  2. Enter Athena_log_query as the function name, and select Python 3.8 as the runtime.
  3. Under Choose or create an execution role, select Create new role with basic Lambda permissions.
  4. Choose Create function.
  5. Once the function is created, select the Permissions tab at the top of the page and select the Execution role to view in the IAM console. It will look similar to the following figure.
     
    Figure 2: Permissions tab

    Figure 2: Permissions tab

Update the IAM Role to allow Lambda permissions to relevant services

  1. In the IAM console, select the policy name. Choose Edit policy, then select the JSON tab and paste the following code into the window, replacing the following variable and placeholders:
    • us-east-1 – This is the region where resources are. Change only if necessary.
    • <AWS_ACCOUNT_NUMBER>
    • <YOUR_DATABASE>
    • <YOUR_TABLE>
    • <LOG_BUCKET>
    • <OUTPUT_LOG_BUCKET> – bucket name you chose to store the query results when setting up Athena.
    
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:UpdateDatabase",
                    "glue:BatchCreatePartition",
                    "glue:GetDatabase",
                    "athena:StartQueryExecution",
                    "glue:GetPartitions",
                    "glue:UpdateTable",
                    "s3:CreateBucket",
                    "s3:ListBucket",
                    "glue:GetTable",
                    "s3:ListMultipartUploadParts",
                    "s3:PutObject",
                    "s3:GetObjectAcl",
                    "s3:GetObject",
                    "athena:CancelQueryExecution",
                    "athena:StopQueryExecution",
                    "athena:GetQueryExecution",
                    "s3:GetBucketLocation",
                    "glue:UpdatePartition"
                ],
                "Resource": [
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:catalog",
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:table/<YOUR_DATABASE>/<YOUR_TABLE>",
                    "arn:aws:glue:us-east-1:<AWS_ACCOUNT_NUMBER>:database/mydatabase",
                    "arn:aws:s3:::<LOG_BUCKET>/*",
                    "arn:aws:s3:::<LOG_BUCKET>",
    				"arn:aws:s3:::<OUTPUT_LOG_BUCKET>/*",
    				"arn:aws:s3:::<OUTPUT_LOG_BUCKET>",
                    "arn:aws:athena:us-east-1:<AWS_ACCOUNT_NUMBER>:workgroup/primary"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "logs:PutLogEvents",
                    "logs:CreateLogGroup",
                    "logs:CreateLogStream"
                ],
                "Resource": "arn:aws:logs:us-east-1:<AWS_ACCOUNT_NUMBER>:*"
            }
        ]
    }
    

    Note: Depending on the environment, this policy might not be restrictive enough and should be limited to only users needing access to the cardholder data environment and audit logs. More information about restricting IAM policies can be found in IAM JSON Policy Elements: Condition Operators.

  2. Choose Review policy and then Save changes.

Customize the Lambda Function

  1. On the Lambda dashboard, choose the Configuration tab. In Basic settings, increase the function timeout to 5 minutes to ensure that the function always has time to finish running your queries, and then select Save. Best Practices for Developing on AWS Lambda has more tips for using Lambda.
  2. Paste the following code into the function editor on the Configuration tab, replacing the existing text. The code includes eight example queries to run and can be customized as needed.

    The first query will add partitions to your Amazon S3 logs so that the following seven queries will run quickly and be cost effective.

    This code combines the partitioning, and example Athena queries to assist in meeting PCI DSS logging requirements, which will be explained more below:

    Replace these values in the code that follows:

    • <YOUR_DATABASE>
    • <YOUR_TABLE>
    • <LOG_BUCKET>
    • <AWS_ACCOUNT_NUMBER>
    • <OUTPUT_LOG_BUCKET>
    • REGION1 – first region to partition
    • REGION2 – second region to partition*
    
    import boto3
    import datetime
    import time
    
    #EDIT THE FOLLOWING#
    #----------------------#
    
    #This should be the name of your Athena database
    ATHENA_DATABASE = "<YOUR_DATABASE>"
    
    #This should be the name of your Athena database table
    ATHENA_TABLE = "<YOUR_TABLE>"
    
    #This is the Amazon S3 bucket name you want partitioned and logs queried from:
    LOG_BUCKET = "<LOG_BUCKET>"
    
    #AWS Account number for the Amazon S3 path to your CloudTrail logs
    AWS_ACCOUNT_ID = "<AWS_ACCOUNT_NUMBER>"
    
    #This is the Amazon S3 bucket name for the Athena Query results:
    OUTPUT_LOG_BUCKET = "<OUTPUT_LOG_BUCKET>"
    
    #Define regions to partition
    REGION1 = "us-east-1"
    REGION2 = "us-west-2"
    #----------------------#
    #STOP EDITING#
    
    RETRY_COUNT = 50
    
    #Getting the current date and splitting into variables to use in queries below
    CURRENT_DATE = datetime.datetime.today()
    DATEFORMATTED = (CURRENT_DATE.isoformat())
    ATHENA_YEAR = str(DATEFORMATTED[:4])
    ATHENA_MONTH = str(DATEFORMATTED[5:7])
    ATHENA_DAY = str(DATEFORMATTED[8:10])
    
    #location for the Athena query results
    OUTPUT_LOCATION = "s3://"+OUTPUT_LOG_BUCKET+"/DailyAthenaLogs/CloudTrail/"+str(CURRENT_DATE.isoformat())
    
    #Athena Query definitions for PCI DSS requirements
    YEAR_MONTH_DAY = f'year=\'{ATHENA_YEAR}\' AND month=\'{ATHENA_MONTH}\' AND day=\'{ATHENA_DAY}\';'
    ATHENA_DB_TABLE = f'{ATHENA_DATABASE}.{ATHENA_TABLE}'
    PARTITION_STATEMENT_1 = f'partition (region="{REGION1}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
    LOCATION_1 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION1}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
    PARTITION_STATEMENT_2 = f'partition (region="{REGION2}", month="{ATHENA_MONTH}", day="{ATHENA_DAY}", year="{ATHENA_YEAR}")'
    LOCATION_2 = f' location "s3://{LOG_BUCKET}/AWSLogs/{AWS_ACCOUNT_ID}/CloudTrail/{REGION2}/{ATHENA_YEAR}/{ATHENA_MONTH}/{ATHENA_DAY}/"'
    SELECT_STATEMENT = "SELECT * FROM "+ATHENA_DB_TABLE+ " WHERE "
    LIKE_BUCKET = f' \'%{LOG_BUCKET}%\''
    
    
    #Query to partition selected regions
    QUERY_1 = f'ALTER TABLE {ATHENA_DB_TABLE} ADD IF NOT EXISTS {PARTITION_STATEMENT_1} {LOCATION_1} {PARTITION_STATEMENT_2} {LOCATION_2}'
    
    #Access to audit trails or CHD 10.2.1/10.2.3
    QUERY_2 = f'{SELECT_STATEMENT} requestparameters LIKE {LIKE_BUCKET} AND sourceipaddress <> \'cloudtrail.amazonaws.com\' AND sourceipaddress <> \'athena.amazonaws.com\' AND eventName = \'GetObject\' AND {YEAR_MONTH_DAY}'
    
    #Root Actions PCI DSS 10.2.2
    QUERY_3 = f'{SELECT_STATEMENT} userIdentity.sessionContext.sessionIssuer.userName LIKE \'%root%\' AND {YEAR_MONTH_DAY}'
    
    #Failed Logons PCI DSS 10.2.4
    QUERY_4 = f'{SELECT_STATEMENT} eventname = \'ConsoleLogin\' AND responseelements LIKE \'%Failure%\' AND {YEAR_MONTH_DAY}'
    
    #Privilege changes PCI DSS 10.2.5.b, 10.2.5.c
    QUERY_5 = f'{SELECT_STATEMENT} eventname LIKE \'%AddUserToGroup%\' AND requestparameters LIKE \'%Admin%\' AND {YEAR_MONTH_DAY}'
    
    # Initialization, stopping, or pausing of the audit logs PCI DSS 10.2.6
    QUERY_6 = f'{SELECT_STATEMENT} eventname = \'StopLogging\' OR eventname = \'StartLogging\' AND {YEAR_MONTH_DAY}'
    
    #Suspicious activity PCI DSS 10.6
    QUERY_7 = f'{SELECT_STATEMENT} eventname LIKE \'%DeleteSecurityGroup%\' OR eventname LIKE \'%CreateSecurityGroup%\' OR eventname LIKE \'%UpdateSecurityGroup%\' OR eventname LIKE \'%AuthorizeSecurityGroup%\' AND {YEAR_MONTH_DAY}'
    
    QUERY_8 = f'{SELECT_STATEMENT} eventname LIKE \'%Subnet%\' and eventname NOT LIKE \'Describe%\' AND {YEAR_MONTH_DAY}'
    
    #Defining function to generate query status for each query
    def query_stat_fun(query, response):
        client = boto3.client('athena')
        query_execution_id = response['QueryExecutionId']
        print(query_execution_id +' : '+query)
        for i in range(1, 1 + RETRY_COUNT):
            query_status = client.get_query_execution(QueryExecutionId=query_execution_id)
            query_fail_status = query_status['QueryExecution']['Status']
            query_execution_status = query_fail_status['State']
    
            if query_execution_status == 'SUCCEEDED':
                print("STATUS:" + query_execution_status)
                break
    
            if query_execution_status == 'FAILED':
                print(query_fail_status)
    
            else:
                print("STATUS:" + query_execution_status)
                time.sleep(i)
        else:
            client.stop_query_execution(QueryExecutionId=query_execution_id)
            raise Exception('Maximum Retries Exceeded')
    
    def lambda_handler(query, context):
        client = boto3.client('athena')
        queries = [QUERY_1, QUERY_2, QUERY_3, QUERY_4, QUERY_5, QUERY_6, QUERY_7, QUERY_8]
        for query in queries:
            response = client.start_query_execution(
                QueryString=query,
                QueryExecutionContext={
                    'Database': ATHENA_DATABASE },
                ResultConfiguration={
                    'OutputLocation': OUTPUT_LOCATION })
            query_stat_fun(query, response)
    

    Note: More regions can be added if you have additional regions to partition. The ADD partition statement can be copied and pasted to add additional regions as needed. Additionally, you can hard code the regions for your environment into the statements.

  3. Choose Save in the top right.

Athena Queries used to collect evidence

The queries used to gather evidence for PCI DSS are broken down from the Lambda function we created, using the partitioned date example from above. They are listed with their respective requirement.

Note: AWS owns the security OF the cloud, providing high levels of security in alignment with our numerous compliance programs. The customer is responsible for the security of their resources IN the cloud, keeping its content secure and compliant. The queries below are meant to be a proof of concept and should be tailored to your environment.

10.2.1/10.2.3 – Implement automated audit trails for all system components to reconstruct access to either or both cardholder data and audit trails:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE requestparameters LIKE '%<LOG_BUCKET>%' AND sourceipaddress <> 'cloudtrail.amazonaws.com' AND sourceipaddress <>  'athena.amazonaws.com' AND eventName = 'GetObject' AND year='2020' AND month='02' AND day='28';"

10.2.2 – Implement automated audit trails for all system components to reconstruct all actions taken by anyone using root or administrative privileges.


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE userIdentity.sessionContext.sessionIssuer.userName LIKE '%root%' AND year='2020' AND month='02' AND day='28';"

10.2.4 – Implement automated audit trails for all system components to reconstruct invalid logical access attempts.


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'ConsoleLogin' AND responseelements LIKE '%Failure%' AND year='2020' AND month='02' AND day='28';"

10.2.5.b – Verify all elevation of privileges is logged.

10.2.5.c – Verify all changes, additions, or deletions to any account with root or administrative privileges are logged:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%AddUserToGroup%' AND requestparameters LIKE '%Admin%' AND year='2020' AND month='02' AND day='28';"

10.2.6 – Implement automated audit trails for all system components to reconstruct the initialization, stopping, or pausing of the audit logs:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname = 'StopLogging' OR eventname = 'StartLogging' AND year='2020' AND month='02' AND day='28';"

10.6 – Review logs and security events for all system components to identify anomalies or suspicious activity:


"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%DeleteSecurityGroup%' OR eventname LIKE '%CreateSecurityGroup%' OR eventname LIKE '%UpdateSecurityGroup%' OR eventname LIKE '%AuthorizeSecurityGroup%' AND year='2020' AND month='02' AND day='28';" 

"SELECT * FROM <YOUR_DATABASE>.<YOUR_TABLE> WHERE eventname LIKE '%Subnet%' and eventname NOT LIKE 'Describe%' AND year='2020' AND month='02' AND day='28';" 

You can use the AWS Command Line Interface (AWS CLI) to invoke the Lambda function using the following command, replacing <YOUR_FUNCTION> with the name of the Lambda function you created:


aws lambda invoke --function-name <YOUR_FUNCTION> outfile

The AWS Lambda API Reference has more information on using Lambda with AWS CLI.

Note: the results from the function will be located in the OUTPUT_LOCATION variable within the Lambda function.

Use Amazon CloudWatch to run your Lambda function

You can create a rule in CloudWatch to have this function run automatically on a set schedule.

Create a CloudWatch rule

  1. From the CloudWatch dashboard, under Events, select Rules, then Create rule.
  2. Under Event Source, select the radio button for Schedule and choose a fixed rate or enter in a custom cron expression.
  3. Finally, in the Targets section, choose Lambda function and find your Lambda function from the drop down.

    The example screenshot shows a CloudWatch rule configured to invoke the Lambda function daily:
     

    Figure 3: CloudWatch rule

    Figure 3: CloudWatch rule

  4. Once the schedule is configured, choose Configure details to move to the next screen.
  5. Enter a name for your rule, make sure that State is enabled, and choose Create rule.

Check that your function is running

You can then navigate to your Lambda function’s CloudWatch log group to see if the function is running as intended.

To locate the appropriate CloudWatch group, from your Lambda function within the console, select the Monitoring tab, then choose View logs in CloudWatch.
 

Figure 4: View logs in CloudWatch

Figure 4: View logs in CloudWatch

You can take this a step further and set up an SNS notification to email you when the function is triggered.

Summary

In this post, we walked through partitioning an Athena table, which assists in reducing time and cost when running queries on your S3 buckets. We then constructed example SQL queries related to PCI DSS requirement 10, to assist in audit preparation. Finally, we created a Lambda function to automate running daily queries to pull PCI DSS audit log evidence from Amazon S3, to assist with the PCI DSS daily log review requirement. I encourage you to customize, add, or remove the SQL queries to best fit your needs and compliance requirements.

If you have feedback about this post, submit comments in the Comments section below.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Logan Culotta

Logan Culotta is a Security Assurance Consultant, and a current Qualified Security Assessor (QSA). Logan is part of the AWS Security Assurance team, which is also a Qualified Security Assessor Company (QSAC). He enjoys finding ways to automate compliance and security in the AWS cloud. In his free time, you can find him spending time with his family, road cycling, or cooking.