AWS Cloud Operations Blog

Automating the discovery of unused AWS Lambda functions

In 2017 Kyle Somers explained how you can gain visibility into the execution of your AWS Lambda functions in his blog post announcing AWS CloudTrail data events for AWS Lambda. In my blog post, I’ll expand upon Kyle’s post to show you how you can combine CloudTrail data events for AWS Lambda with the power of the Amazon Athena SQL engine to answer the question, “Do I have any Lambda functions that haven’t been used in the past 30 days?”

Whether you are a large financial institution or start-up company, understanding which functions are being invoked and which are not can help you maintain an up-to-date Lambda environment and control costs and risks by removing unused functions from a production environment.

In addition to helping identify which Lambda functions have been invoked, CloudTrail Lambda data events can be used to detect and automatically act on invocations of Lambda functions across your AWS account. For example, you can meet your IT auditing and compliance requirements by validating that your functions were invoked by permitted users, roles, and services. Customers with regulatory audit and compliance requirements can maintain the same level of visibility and auditability of Lambda function invocations as they do for other AWS services.

To help us identify unused Lambda functions we’ll use a simple Python script that demonstrates an example workflow. The Python script requires Python 2.7+ or 3.3+, but before we dive right in, let’s make sure we have all the prerequisites set up.

Recording the AWS Lambda invoke API

Enabling CloudTrail data events for recording Lambda invoke API activity is a simple setup that can be added to an existing CloudTrail trail or added during the creation of a new trail within your account. To understand which functions are being used, ensure that you enable the Log all current and future functions option during the setup.

Kyle’s blog does a great job of providing step-by-step instructions. For the Python script to return accurate results based on the past 30 days, you’ll need to ensure that CloudTrail data events for AWS Lambda have been enabled for all functions for at least that period of time. Data events are charged at the rate of $0.10 per 100,000 events. See the CloudTrail pricing page for more information.

Boto3 setup and configuration

The sample Python script depends on Boto3, the AWS SDK for Python. To install Boto3 you can clone the repository and type:

pip install -r requirements.txt

Or you can install boto3 using pip:

pip install boto3

Before you can begin using Boto3, you need to set up authentication credentials. Credentials for your AWS account can be found in the IAM console. You can create a new user or use an existing user that has the required permissions described below. Go to the Users -> Security credentials -> Access keys page and copy the existing keys or generate a new set of keys for the chosen IAM user.

If you have the AWS CLI installed, then you can use it to configure your credentials file using the command:

aws configure

Alternatively, you can create the credential file yourself. By default, its location is at ~/.aws/credentials for Mac and Linux users or C:\Users\USER_NAME.aws\credentials for Windows users. Add the following lines in the file:

[default]

aws_access_key_id = YOUR_ACCESS_KEY

aws_secret_access_key = YOUR_SECRET_KEY

See the Security Credentials page for more information on getting your keys. For more information on configuring Boto3, check out the Quickstart section in the developer guide.

AWS permissions

Last, you’ll need to ensure that the IAM credentials that you are using have the appropriate permissions to list your Lambda functions and execute Athena queries.

With all the prerequisites ready to go, it’s time to configure the Python script to run in your AWS account.

import boto3
import time
import sys

'''
This script will retrieve the list of functions from the region executed, create a
CloudTrail table in Athena, run a query to identify which functions have been invoked 
in the past 30 days, and print a list of those that are inactive. This allows you to 
understand if you have any Lambda functions not currently in use.

The script assumes the following:
1. You have CloudTrail Lambda data events enabled for all functions within your account
2. You have permissions to Athena and Lambda
3. You have Python and Boto3 installed
'''

# AWS region that you want to evaluate
# The script will only work in regions where Athena is supported
# Athena region availability can be found at https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/
REGION = "us-east-1"

# S3 bucket where Athena will store query history
# This bucket will be created in the region where the script is executed if it doesn't currently exist
#ATHENA_S3_BUCKET_NAME = "s3://athena-history-bucket-demo"
ATHENA_S3_BUCKET_NAME = "s3://athena-history-bucket-demo"


# Athena table to create for CloudTrail logs
# This table will be created in the 'default' Athena database
TABLE_NAME = "cloudtrail_lambda_logs"

# Location of S3 bucket where CloudTrail logs are stored
# including CloudTrail Lambda data events
# CLOUDTRAIL_S3_BUCKET_NAME = "s3://{BucketName}/AWSLogs/{AccountID}/"
CLOUDTRAIL_S3_BUCKET_NAME = "s3://bobtrailbucket/AWSLogs/341277845616/"

CREATE_TABLE_QUERY_TEMPLATE = \
"""CREATE EXTERNAL TABLE {0} (
eventversion STRING,
userIdentity STRUCT<
  type:STRING,
  principalid:STRING,
  arn:STRING,
  accountid:STRING,
  invokedby:STRING,
  accesskeyid:STRING,
  userName:STRING,
  sessioncontext:STRUCT<
    attributes:STRUCT<
      mfaauthenticated:STRING,
      creationdate:STRING>,
    sessionIssuer:STRUCT<
      type:STRING,
      principalId:STRING,
      arn:STRING,
      accountId:STRING,
      userName:STRING>>>,
eventTime STRING,
eventSource STRING,
eventName STRING,
awsRegion STRING,
sourceIpAddress STRING,
userAgent STRING,
errorCode STRING,
errorMessage STRING,
requestParameters STRING,
responseElements STRING,
additionalEventData STRING,
requestId STRING,
eventId STRING,
resources ARRAY<STRUCT<
  ARN:STRING,accountId:
  STRING,type:STRING>>,
eventType STRING,
apiVersion STRING,
readOnly STRING,
recipientAccountId STRING,
serviceEventDetails STRING,
sharedEventID STRING,
vpcEndpointId STRING
)
PARTITIONED BY(year string)
ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '{1}';"""

# Query to add a partition for 2018 to the CloudTrail table in Athena
CREATE_PARTITON_QUERY_TEMPLATE = """
ALTER TABLE {0} add partition (year="2018")
location '{1}/CloudTrail/{2}/2018/'"""

# Query used to search for Lambda data event Invoke activities for the past 30 days
LAST_RUN_QUERY_TEMPLATE = """
select json_extract_scalar(requestparameters, '$.functionName') as function_name, Max (eventtime) as "Last Run"
from {0}
where eventname='Invoke'
and year='2018'
and from_iso8601_timestamp(eventtime) > current_timestamp - interval '1' month
and json_extract_scalar(requestparameters, '$.functionName') in ({function_arns})
group by json_extract_scalar(requestparameters, '$.functionName')"""

lambda_client = boto3.client('lambda', region_name=REGION)
athena_client = boto3.client('athena', region_name=REGION)

# Retrieve a list of the function ARNs for the specified Region
def retrieve_function_arns(lambda_client):
    function_arns = []
    retrieve_function_arns.count=0
    functions = lambda_client.list_functions()
    for fn in functions['Functions']:
        retrieve_function_arns.count +=1
        function_arns.append(str(fn['FunctionArn']))

    print("You have {} functions in the ".format(retrieve_function_arns.count)+REGION+" region\n")
    if (retrieve_function_arns.count==0):
        print("The script will now exit")
        sys.exit()
    print("Now the script will run the following Athena queries:\n")
    print("1) Create the Athena table for CloudTrail")
    print("2) Add a partition for 'year' to the new table")
    print("3) Query Athena for the Lambda functions that have been invoked in the past 30 days\n")
    time.sleep(2)
    return function_arns


def run_query(athena_client, query):
    response = athena_client.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': 'default'
        },
        ResultConfiguration={
            'OutputLocation': ATHENA_S3_BUCKET_NAME+"-"+REGION,
        }
    )
    print('Query Execution ID: ' + response['QueryExecutionId'])
    execution_status = None
    while execution_status != 'SUCCEEDED':
        waiter = athena_client.get_query_execution(
            QueryExecutionId = response['QueryExecutionId'].lstrip('ID')
        )
        execution_status = waiter['QueryExecution']['Status']['State']

        if execution_status == 'FAILED':
            print("The query failed. Check the Athena history for details.")
            return

        print("Running")
        time.sleep(5)

    results = athena_client.get_query_results(
        QueryExecutionId = response['QueryExecutionId']
    )
    return results


def build_query_strings(function_arns):
    function_arns_csv = str(function_arns)[1:-1]
    create_table_query = CREATE_TABLE_QUERY_TEMPLATE.format(TABLE_NAME, CLOUDTRAIL_S3_BUCKET_NAME)
    create_partition_query = CREATE_PARTITON_QUERY_TEMPLATE.format(TABLE_NAME, CLOUDTRAIL_S3_BUCKET_NAME, REGION)
    last_run_query = LAST_RUN_QUERY_TEMPLATE.format(TABLE_NAME, function_arns = function_arns_csv)
    return create_table_query, create_partition_query, last_run_query


def get_set_of_function_arns_from_result_set(result_set):
    set_of_functions_used = set()
    get_set_of_function_arns_from_result_set.count=0
    for row in result_set[1:]:
        get_set_of_function_arns_from_result_set.count +=1
        function_arn = row['Data'][0]['VarCharValue']
        set_of_functions_used.add(function_arn)
    return set_of_functions_used


def main():
    function_arns = retrieve_function_arns(lambda_client)
    queries = build_query_strings(function_arns)
    query_results = []
    for q in queries:
	    query_results.append(run_query(athena_client, q))

    # We made sure that the last query run gets the data that we care about
    result_set = query_results[-1]['ResultSet']['Rows']

    set_of_functions_used = get_set_of_function_arns_from_result_set(result_set)

    # Compare the results from Athena to the list of existing functions and print the difference
    unusedcount=retrieve_function_arns.count-get_set_of_function_arns_from_result_set.count
    print("\nOut of the {}, there are {} functions that haven't been invoked in the past 30 days".format(retrieve_function_arns.count, unusedcount))
    difference_list = list(set(function_arns) - set_of_functions_used)
    difference_list.sort(key=str.lower)

    for stale_function_arn in difference_list:
        print(stale_function_arn)


if __name__ == '__main__':
    main()

Python variable configuration

You need to set four variables within the Python script before actually running. These variables are:

1. The AWS Region that you want to evaluate. It’s important to note that the script will only work in Regions where Athena is supported. Athena Region availability can be found at https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services/. The Region designation should be set as the Region code. For example, US East (N. Virginia) is “us-east-1”.

REGION = “us-east-1”

2. Name of the Amazon S3 bucket where Amazon Athena will store the query history when running the script. This bucket will be created in the Region where the script is executed if it doesn’t currently exist, and the Region name will be appended to the name you provide. Example:

ATHENA_S3_BUCKET_NAME = “s3://athena-history-bucket-demo”

3. Name of the Athena table to create for CloudTrail logs. This table will be created in the ‘default’ Athena database. Example:

TABLE_NAME = “cloudtrail_logs”

4. Location of the Amazon S3 bucket where CloudTrail logs are stored for your CloudTrail Lambda data events. You can find this location by viewing the CloudTrail trail and copying the S3 bucket where the log files are delivered. This is in the format of s3://{BucketName}/AWSLogs/{AccountID}/. Example:

CLOUDTRAIL_S3_BUCKET_NAME = “s3://cloudtrail-logs-bucket/AWSLogs/123456789012/”

Running the script

With all the prerequisites met and the variables configured, you can now run the script. Before running it, it’s important to understand that while there is no cost for the script to create the CloudTrail table within Athena, the step of running an actual Athena query to search for Lambda invocations will incur standard Athena charges. Please visit the Athena pricing page for more information.

To run the script:

python unusedlambda.py

This performs the following actions in order:

1. Retrieves a list of the current Lambda functions found in the Region specified in the configuration file or set using the AWS CLI. The script will output the total count of functions found.

The next three steps will run a set of queries within Athena. When each query is launched you’ll see a Query Execution ID and the script will return “Running” every 5 seconds waiting for the query results to return.

2. Create an Athena table with the name specified in the ‘TABLE_NAME’ variable. This table is created using the AWS CloudTrail SerDe and specific DDL required to query AWS CloudTrail logs.

3. Create a partition in the newly created CloudTrail table for the year 2018. This limits the amount of data that Amazon Athena will need to query to return the results within the past 30 days.

4. Using the list of functions retrieved in Step 1, create and execute an Amazon Athena query that returns a list of which functions have been invoked in the past 30 days.

Note: If you run the script more than once within the same Region, using the same set of script variables, you’ll notice that the queries for Step 2 and Step 3 will return failed results. This is expected behavior as the CloudTrail table and partition already exist.

5. Finally, the script will output the difference between the list of functions that exist within the Region and the query results.

Example output:

The output represents all the functions within the designated Regions that have NOT been called in the past 30 days.

Conclusion

By combining CloudTrail data events for Lambda with Athena’s SQL engine, we can now easily automate an answer to the question, “Do I have any Lambda functions that haven’t been used in the past 30 days?”.


About the Author

Bob O’Dell is a Sr. Product Manager for AWS CloudTrail. AWS CloudTrail is a service that enables governance, compliance, operational auditing, and risk auditing of AWS accounts. Bob enjoys working with customers to understand how CloudTrail can meet their needs and continue to be an integral part of their solutions. In his spare time, he enjoys spending time adventuring through the Pacific Northwest.