AWS Big Data Blog

Automate dynamic mapping and renaming of column names in data files using AWS Glue: Part 2

In Part 1 of this two-part post, we looked at how we can create an AWS Glue ETL job that is agnostic enough to rename columns of a data file by mapping to column names of another file. The solution focused on using a single file that was populated in the AWS Glue Data Catalog by an AWS Glue crawler.

However, for enterprise solutions, ETL developers may be required to process hundreds of files of varying schemas, even files that they might not have seen previously. Manually crawling and cataloging these tables may not be scalable and plausible. You need an automated solution that can perform this operation at a large scale with minimal effort on your part.

In this post, we look at creating an automated, scalable, and reliable solution that can process hundreds or thousands of files while still using the same AWS Glue ETL job that we created in Part 1. We do this all while creating a solution that uses serverless components and services from AWS, which can help you scale seamlessly while keeping costs low and creating a solution that is easy to manage and maintain.

For this solution, we use an event-driven processing architecture with the following components:

  • Amazon Simple Storage Service (Amazon S3) Event Notifications
  • AWS Lambda for handling events generated from Amazon S3 in a serverless manner
  • An AWS Glue crawler and the Data Catalog to automatically infer the schemas and create tables
  • AWS Glue jobs to dynamically process and rename the columns of the data file
  • S3 buckets for the landing and storage of the data files and column name files when they come in, as well as for storing processed files in the destination bucket

S3 Event Notifications enable you to receive notifications when certain object events occur in your bucket. Event-driven models like this mean that you no longer have to build or maintain server-based polling infrastructure to check for object changes, nor do you have to pay for idle time of that infrastructure when there are no changes to process. For more details on the S3 Event Notifications, see  Enabling and configuring event notifications for an S3 Bucket. It’s also important to understand how S3 Event Notifications pass the required information to Lambda and what permissions are required. For more information, see Using AWS Lambda with Amazon S3. The code uses the AWS Serverless Application Model (AWS SAM), enabling you to deploy the application in your own AWS account.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Knowledge of working with AWS Glue crawlers
  • Knowledge of working with the Data Catalog
  • Knowledge of working with AWS Glue ETL jobs and PySpark
  • Knowledge of working with roles and policies using AWS Identity and Access Management (IAM)
  • Knowledge of working with Amazon S3
  • Knowledge of working with Lambda
  • Knowledge of working with AWS SAM
  • Optionally, knowledge of using Amazon Athena to query Data Catalog tables

Overview of solution

The following diagram showcases the overall solution architecture and steps.

The following diagram showcases the overall solution architecture and steps.

The workflow includes the following steps:

  1. A new raw data file is dropped into the source data file S3 bucket.
  2. This triggers an S3 event notification, which runs a Lambda function.
  3. The function checks if a crawler exists to crawl the data file and name file.
  4. The function creates a crawler if not already present and runs it to catalog the files.
  5. The function calls the AWS Glue job and passes the file name argument.
  6. The AWS Glue job runs on the name file and creates a file with renamed columns.

This architecture is suggested when your file uploads are happening in a staggered approach. For concurrent files uploads, please make sure to consider the AWS Glue job concurrent execution quotas and use a schedule-based approach instead of event-based approach.

Creating resources with the AWS SAM template

This AWS SAM template creates the S3 buckets, Lambda function, and IAM roles needed for the solution. Let’s walk through it.

Creating the S3 buckets

We create the buckets needed for the raw data file, name file, and renamed data file. The following code parameterizes the names of the bucket so you can change them if you need to when deploying the template:

Parameters:

  FileLandingBucketParameter:
    Type: String
    Default: 'file-landing-bucketv1'
  FileDestinationBucketParameter:
    Type: String
    Default: 'file-destination-bucketv1'

Resources:

  ## S3 buckets
  FileLandingBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref FileLandingBucketParameter
  DestinationBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref FileDestinationBucketParameter

The prefix structure of the bucket is important because it determines the naming of the tables and crawlers, and also helps in raising the right S3 event notifications. This is important when you load the files to the bucket.

For this post, I create two buckets: Bucket1 and Bucket2.

Bucket1 is the raw data file landing bucket. The S3 event notifications are enabled on this bucket for any file create event. The prefix for this bucket has the structure s3://bucket-name/Filename/File.ext; for example, s3://file-landing-bucket/Sample1/Sample1DataFile.csv or s3://file-landing/bucket/Sample2/Sample2File.Orc.

The name of the last prefix for the file is important because the crawler uses it to name the table. For this post, the tables are sample1 and sample2.

Bucket2 stores the name file and renamed data file. For this post, we assume that the name file corresponding to the uploaded data file has already been uploaded and exists in the correct path. The prefix for the name files is s3://bucket-name/FilenameNameFile/namefile.ext; for example, s3://destination-bucket/Sample1NameFile/sample1namefile.csv.

Again, the table is named sample1namefile, which is important to note.

The prefix of the destination file follows the format s3://bucket-name/FilenameRenamedFile/renamedfile.ext; for example, s3://destination-bucket/Sample1RenamedFile/sample1renamedfile.orc.

For more information about creating prefixes and folders, see How do I use folders in an S3 bucket?

Creating the Lambda function and IAM roles

Next, the template creates the Lambda function named ProcessDataFileRename, which processes the events generated when a new file is created in the landing bucket. In a later step, we walk through the Lambda function code that runs our logic. The following code is the portion of AWS SAM template to create your Lambda function. Notice the permissions attached to its role:

ProcessDataFileRenameV1:
    Type: 'AWS::Serverless::Function'
    Properties:
      Handler: app.lambda_handler
      Runtime: python3.6
      CodeUri: .
      Description: Lambda funciton to process renaming of data files using AWS Glue.
      MemorySize: 128
      Timeout: 300
      Environment:
        Variables:
          GlueServiceRole: !GetAtt GlueServiceRole.Arn
          DestinationBucket: !Ref FileDestinationBucketParameter
      Policies:
        - Statement:
          - Effect: Allow
            Action:
              - 'glue:GetCrawler'
              - 'glue:BatchGetCrawlers'
              - 'glue:StartCrawler'
              - 'glue:StartJobRun'
              - 'glue:CreateCrawler'
            Resource: '*'
          - Effect: Allow
            Action:
              - 'iam:PassRole'
            Resource: !GetAtt GlueServiceRole.Arn
      Events:
        BucketEvent1:
          Type: S3
          Properties:
            Bucket:
              Ref: FileLandingBucket
            Events:
              - 's3:ObjectCreated:*'

Also notice the timeout parameters and environment variables that have been used for the function definition, along with the event association with the file landing bucket.

Let’s now look at the function definition code in app.py. The following code is an illustrative example of the Lambda function code in Python:

#a.
import json
import boto3
import logging
import os
from urllib.parse import unquote_plus

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

#b.

glue = boto3.client('glue')
GlueServiceRole = os.environ['GlueServiceRole']
DestinationBucket = os.environ['DestinationBukcet']

def lambda_handler(event, context):
  logger.info(json.dumps(event))
  for record in event['Records']:
#Grab the file name from the event record which triggered the lambda function & Construct the path for data file, name file and renamed file. 
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        data_file_name = key.split('/')[0]
        s3_data_file_path = 's3://' + bucket + '/' + data_file_name +'/'
        name_file_bucket = DestinationBucket
        name_file_prefix = data_file_name+'/'+ data_file_name + 'NameFile'
        name_file_path = 's3://'+name_file_bucket+'/' + name_file_prefix 

#c.
      
#Create crawler for the data file if it does not already exist and run it.   
        try:
            crawler = glue.get_crawler(Name=data_file_name)
        except glue.exceptions.EntityNotFoundException as e:
            crawler = glue.create_crawler(
                Name=data_file_name,
                    Role= GlueServiceRole,
                    DatabaseName='sampledb',
                    Description='Crawler for data files',
                    Targets={
                        'S3Targets': [
                            {
                                'Path': s3_data_file_path,
                                'Exclusions': [
                                ]
                            },
                        ]
                    },
                    SchemaChangePolicy={
                    'UpdateBehavior': 'UPDATE_IN_DATABASE',
                    'DeleteBehavior': 'DELETE_FROM_DATABASE'
                }
                #,Configuration='{ "Version": 2.0, "CrawlerOutput": { "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" } } }'
            )
            response = glue.start_crawler(
            Name=data_file_name)
        else:
            response = glue.start_crawler(
            Name=data_file_name)
#d.          
#Create crawler for the name file if it does not already exist and run it.   

        try:
            crawler = glue.get_crawler(Name=data_file_name + '_name_file')
        except glue.exceptions.EntityNotFoundException as e:
            crawler = glue.create_crawler(
                    Name=data_file_name + '_name_file',
                    Role= GlueServiceRole,
                    DatabaseName='sampledb',
                    Description='Crawler for name files',
                    Targets={
                        'S3Targets': [
                            {
                                'Path': name_file_path,
                                'Exclusions': [
                                ]
                            },
                        ]
                    },
                    SchemaChangePolicy={
                    'UpdateBehavior': 'UPDATE_IN_DATABASE',
                    'DeleteBehavior': 'DELETE_FROM_DATABASE'
                    }
                #,Configuration='{ "Version": 2.0, "CrawlerOutput": { "Partitions": { "AddOrUpdateBehavior": "InheritFromTable" } } }'
                    )
            response = glue.start_crawler(Name=data_file_name+'_name_file')
        else:
            response = glue.start_crawler(Name=data_file_name+'_name_file')

#e
#Run the agnostic Glue job to renamed the files by passing the file name argument. 

        try:
            glue.start_job_run( Arguments = {'--FileName': data_file_name, '--DestinationBucketName' : DestinationBucket })
        except Exception as e:
            print('Glue Job runtime Issue')

Let’s walk through the code.

We start by entering some boilerplate code for Python and creating an AWS Glue client. Next, the Lambda handler function grabs the file name being processed from the S3 event data passed to the function and constructs the path for the data file, name file, and renamed data file. It then checks for the existence of a crawler for that data file. If it exists, it runs it to catalog and updates the table. If no crawler exists, the function creates one and runs it.

The function repeats these steps for the name file corresponding to the data file being processed.

The IAM role being passed for creating the crawler jobs needs to have access to the AWS Glue APIs and the respective S3 buckets. Following portion of the AWS SAM template defines that role (always remember to use the principle of least privilege access):

  GlueServiceRole:
    Type: 'AWS::IAM::Role'
    Properties:
      Path: /
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: "Allow"
            Action:
              - "sts:AssumeRole"
            Principal:
              Service:
                - "lambda.amazonaws.com"
                - "glue.amazonaws.com"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
      Policies:
        - PolicyName : GlueServicePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action: 
                - 's3:*Object'
                - 's3:ListBucket'
                Resource: 
                - !GetAtt DestinationBucket.Arn 
                - !Sub 
                  - arn:aws:s3:::${FileLandingBucket}*
                  - { FileLandingBucket: !Ref FileLandingBucketParameter}

In the last step, the template runs the ETL job. This is the job we created in the Part 1 of this post. However, we have to make some modifications for it to integrate with the Lambda function. We do this later in the post. For now, note that we’re passing the data_file_name and destination bucket name as an argument to the job.

Modifying the AWS Glue ETL job

The next step is to modify the ETL job we created in Part 1 to make it work for any file and be able to scale to handle multiple files simultaneously. We pass the data_file_name and destinationbucket argument to the job; this argument helps the job identify which tables to read, which file to process, and where to dump the renamed file. To modify your ETL job, complete the following steps:

  1. On the AWS Glue console, on the Jobs page, select the job you created in Part 1.
  2. On the Action menu, choose Edit job.
  3. Choose Security configuration, script libraries, and job parameters.
  4. For Number of workers, enter 10.
  5. For Max concurrency, enter 1000.

You can choose the concurrency depending upon how many files you intend to process.

  1. For Job timeout, enter 2800.
  2. For Job parameters, enter the following key-value pairs:
    1. –FileName, Sample 1
    2. –DestinationBucketName, YourBucket

Make sure to match the parameter name as in the following Lambda function code.

  1. Choose Save.

Choose Save.

  1. On the Jobs page, select the job again.
  2. On the Action menu, choose Edit script.
  3. Modify the script to the following code. We use job parameters at line 9 and later in code to create a DynamicFrame as well to refer the the correct Data Catalog tables. The final change is in the datasink command, where we output the renamed file to the correct S3 path.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.job import Job
    
    args = getResolvedOptions(sys.argv, ['DestinationBucketName', 'FileName']) 
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    job = Job(glueContext)
    
    column_name_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
           database = "sampledb",
           table_name = args['FileName']+'namefile')
    
    userdata_dynamicframe = glueContext.create_dynamic_frame.from_catalog(
           database = "sampledb",
           table_name = args['FileName'])
    
    #Generate the applymapping script dynamically and apply it #on our dynamicframe data file
    	
    mapping = []
    for x in range(0, len(userdata_dynamicframe.schema().fields)) :
        mapping.append((
    	userdata_dynamicframe.schema().fields[x].name,
    	column_name_dynamicframe.schema().fields[x].name
        ))		
    userdata_dynamicframe = userdata_dynamicframe.apply_mapping(mapping)
    
    
    
    datasink4 = 
    glueContext.write_dynamic_frame.from_options(
    	frame = userdata_dynamicframe, 
    	connection_type = "s3", 
    	connection_options = {"path": "s3://"+args['DestinationBucketName']			+"/"+args['FileName']+'/'+args['FileName']+'FileRenamed'}, 
    	format = "orc", 
    	transformation_ctx = "datasink4"
    )
    job.commit()
  4. Choose Save.

This concludes the creation of our solution. The solution can now process hundreds of files simultaneously.

Testing the solution

To test the solution, make sure you have the name files already in place before you upload the data files. To do so, upload the file from the Amazon S3 console directly or use the following bash command:

aws s3 cp sample2namefile.csv s3://YOUR_BUCKET2/sample2/sample2namefile/sample2namefile.csv
aws s3 cp sample3namefile.csv s3://YOUR_BUCKET2/sample3/sample3namefile/sample3namefile.csv
aws s3 cp sample2datafile.orc s3://YOUR_BUCKET1/sample2/sample2datafile.orc
aws s3 cp sample3datafile.orc s3://YOUR_BUCKET1/sample3/sample3datafile.orc

Then verify that the corresponding renamed files have been created at the following paths:

  • s3://YOUR_BUCKET2/sample2/sample2renamedfile/
  • s3://YOUR_BUCKET2/sample3/sample3renamedfile/

The following screenshot shows the new crawlers were created and ran.

The following screenshot shows the new crawlers were created and ran.

The following screenshot shows the new tables in the Data Catalog.

The following screenshot shows the new tables in the Data Catalog.

Feel free to test it for scalability with multiple file uploads.

Cleaning up

To avoid incurring future charges, delete the data in the S3 buckets.

With AWS Glue, you pay an hourly rate, billed by the second, for crawlers (discovering data) and ETL jobs (processing and loading data). If you’re not running an ETL job or crawler, you’re not charged. You can store up to a million objects in the Data Catalog for free.

With Lambda, you pay only for what you use. You’re charged based on the number of requests for your functions and the time it takes for your code to run.

Alternatively, you can delete the AWS Glue ETL job, Data Catalog tables, crawlers, and Lambda function.

Conclusion

In this post, we looked at how we can create a scalable serverless solution for handling large-scale file renaming of hundreds of files. We utilized an event-driven architecture of S3 event processing, Lambda functions, AWS Glue ETL jobs, crawlers, and Data Catalog tables.

We looked at the integration between all these different services and created a workflow solution that caters to files of any schema. We can process multiple files without needing to write separate code for every type of file. This saves development and testing effort and allows the solution to scale to meet your growing business and technology demands.

Although we focused on a use case to rename files, you can use this solution for large-scale ETL processing for any type in which an event-driven processing architecture is appropriate.

If you have any questions or comments, please leave them in the comments section.


About the Author

Divyesh SahDivyesh Sah is a Sr. Enterprise Solutions Architect in AWS focusing on financial services customers, helping them with cloud transformation initiatives in the areas of migrations, application modernization, and cloud native solutions. He has over 18 years of technical experience specializing in AI/ML, databases, big data, containers, and BI and analytics. Prior to AWS, he has experience in areas of sales, program management, and professional services.