AWS Machine Learning Blog

How to schedule jobs and parameterize your datasets in Amazon SageMaker Data Wrangler

Data is transforming every field and every business. However, with data growing faster than most companies can keep track of, collecting data and getting value out of that data is a challenging thing to do. A modern data strategy can help you create better business outcomes with data. AWS provides the most complete set of services for the end-to-end data journey to help you unlock value from your data and turn it into insight.

Data scientists can spend up to 80% of their time preparing data for machine learning (ML) projects. This preparation process is largely undifferentiated and tedious work, and can involve multiple programming APIs and custom libraries. Amazon SageMaker Data Wrangler helps data scientists and data engineers simplify and accelerate tabular and time series data preparation and feature engineering through a visual interface. You can import data from multiple data sources, such as Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon Redshift, or even third-party solutions like Snowflake or DataBricks, and process your data with over 300 built-in data transformations and a library of code snippets, so you can quickly normalize, transform, and combine features without writing any code. You can also bring your custom transformations in PySpark, SQL, or Pandas.

This post demonstrates how you can schedule your data preparation jobs to run automatically. We also explore the new Data Wrangler capability of parameterized datasets, which allows you to specify the files to be included in a data flow by means of parameterized URIs.

Solution overview

Data Wrangler now supports importing data using a parameterized URI. This allows for further flexibility because you can now import all datasets matching the specified parameters, which can be of type String, Number, Datetime, and Pattern, in the URI. Additionally, you can now trigger your Data Wrangler transformation jobs on a schedule.

In this post, we create a sample flow with the Titanic dataset to show how you can start experimenting with these two new Data Wrangler’s features. To download the dataset, refer to Titanic – Machine Learning from Disaster.

Prerequisites

To get all the features described in this post, you need to be running the latest kernel version of Data Wrangler. For more information, refer to Update Data Wrangler. Additionally, you need to be running Amazon SageMaker Studio JupyterLab 3. To view the current version and update it, refer to JupyterLab Versioning.

File structure

For this demonstration, we follow a simple file structure that you must replicate in order to reproduce the steps outlined in this post.

  1. In Studio, create a new notebook.
  2. Run the following code snippet to create the folder structure that we use (make sure you’re in the desired folder in your file tree):
    !mkdir titanic_dataset
    !mkdir titanic_dataset/datetime_data
    !mkdir titanic_dataset/datetime_data/2021
    !mkdir titanic_dataset/datetime_data/2022
    
    !mkdir titanic_dataset/datetime_data/2021/01 titanic_dataset/datetime_data/2021/02 titanic_dataset/datetime_data/2021/03 
    !mkdir titanic_dataset/datetime_data/2021/04 titanic_dataset/datetime_data/2021/05 titanic_dataset/datetime_data/2021/06
    !mkdir titanic_dataset/datetime_data/2022/01 titanic_dataset/datetime_data/2022/02 titanic_dataset/datetime_data/2022/03 
    !mkdir titanic_dataset/datetime_data/2022/04 titanic_dataset/datetime_data/2022/05 titanic_dataset/datetime_data/2022/06
    
    !mkdir titanic_dataset/datetime_data/2021/01/01 titanic_dataset/datetime_data/2021/02/01 titanic_dataset/datetime_data/2021/03/01 
    !mkdir titanic_dataset/datetime_data/2021/04/01 titanic_dataset/datetime_data/2021/05/01 titanic_dataset/datetime_data/2021/06/01
    !mkdir titanic_dataset/datetime_data/2022/01/01 titanic_dataset/datetime_data/2022/02/01 titanic_dataset/datetime_data/2022/03/01 
    !mkdir titanic_dataset/datetime_data/2022/04/01 titanic_dataset/datetime_data/2022/05/01 titanic_dataset/datetime_data/2022/06/01
    
    !mkdir titanic_dataset/train_1 titanic_dataset/train_2 titanic_dataset/train_3 titanic_dataset/train_4 titanic_dataset/train_5
    !mkdir titanic_dataset/train titanic_dataset/test
  3. Copy the train.csv and test.csv files from the original Titanic dataset to the folders titanic_dataset/train and titanic_dataset/test, respectively.
  4. Run the following code snippet to populate the folders with the necessary files:
    import os
    import math
    import pandas as pd
    batch_size = 100
    
    #Get a list of all the leaf nodes in the folder structure
    leaf_nodes = []
    
    for root, dirs, files in os.walk('titanic_dataset'):
        if not dirs:
            if root != "titanic_dataset/test" and root != "titanic_dataset/train":
                leaf_nodes.append(root)
                
    titanic_df = pd.read_csv('titanic_dataset/train/train.csv')
    
    #Create the mini batch files
    for i in range(math.ceil(titanic_df.shape[0]/batch_size)):
        batch_df = titanic_df[i*batch_size:(i+1)*batch_size]
        
        #Place a copy of each mini batch in each one of the leaf folders
        for node in leaf_nodes:
            batch_df.to_csv(node+'/part_{}.csv'.format(i), index=False)

We split the train.csv file of the Titanic dataset into nine different files, named part_x, where x is the number of the part. Part 0 has the first 100 records, part 1 the next 100, and so on until part 8. Every node folder of the file tree contains a copy of the nine parts of the training data except for the train and test folders, which contain train.csv and test.csv.

Parameterized datasets

Data Wrangler users can now specify parameters for the datasets imported from Amazon S3. Dataset parameters are specified at the resources’ URI, and its value can be changed dynamically, allowing for more flexibility for selecting the files that we want to import. Parameters can be of four data types:

  • Number – Can take the value of any integer
  • String – Can take the value of any text string
  • Pattern – Can take the value of any regular expression
  • Datetime – Can take the value of any of the supported date/time formats

In this section, we provide a walkthrough of this new feature. This is available only after you import your dataset to your current flow and only for datasets imported from Amazon S3.

  1. From your data flow, choose the plus (+) sign next to the import step and choose Edit dataset.
  2. The preferred (and easiest) method of creating new parameters is by highlighting a section of you URI and choosing Create custom parameter on the drop-down menu. You need to specify four things for each parameter you want to create:
    1. Name
    2. Type
    3. Default value
    4. Description


    Here we have created a String type parameter called filename_param with a default value of train.csv. Now you can see the parameter name enclosed in double brackets, replacing the portion of the URI that we previously highlighted. Because the defined value for this parameter was train.csv, we now see the file train.csv listed on the import table.

  3. When we try to create a transformation job, on the Configure job step, we now see a Parameters section, where we can see a list of all of our defined parameters.
  4. Choosing the parameter gives us the option to change the parameter’s value, in this case, changing the input dataset to be transformed according to the defined flow.
    Assuming we change the value of filename_param from train.csv to part_0.csv, the transformation job now takes part_0.csv (provided that a file with the name part_0.csv exists under the same folder) as its new input data.
  5. Additionally, if you attempt to export your flow to an Amazon S3 destination (via a Jupyter notebook), you now see a new cell containing the parameters that you defined.
    Note that the parameter takes their default value, but you can change it by replacing its value in the parameter_overrides dictionary (while leaving the keys of the dictionary unchanged).

    Additionally, you can create new parameters from the Parameters UI.
  6. Open it up by choosing the parameters icon ({{}}) located next to the Go option; both of them are located next to the URI path value.
    A table opens with all the parameters that currently exist on your flow file (filename_param at this point).
  7. You can create new parameters for your flow by choosing Create Parameter.

    A pop-up window opens to let you create a new custom parameter.
  8. Here, we have created a new example_parameter as Number type with a default value of 0. This newly created parameter is now listed in the Parameters table. Hovering over the parameter displays the options Edit, Delete, and Insert.
  9. From within the Parameters UI, you can insert one of your parameters to the URI by selecting the desired parameter and choosing Insert.
    This adds the parameter to the end of your URI. You need to move it to the desired section within your URI.
  10. Change the parameter’s default value, apply the change (from the modal), choose Go, and choose the refresh icon to update the preview list using the selected dataset based on the newly defined parameter’s value.Let’s now explore other parameter types. Assume we now have a dataset split into multiple parts, where each file has a part number.
  11. If we want to dynamically change the file number, we can define a Number parameter as shown in the following screenshot.Note that the selected file is the one that matches the number specified in the parameter.
    Now let’s demonstrate how to use a Pattern parameter. Suppose we want to import all the part_1.csv files in all of the folders under the titanic-dataset/ folder. Pattern parameters can take any valid regular expression; there are some regex patterns shown as examples.
  12. Create a Pattern parameter called any_pattern to match any folder or file under the titanic-dataset/ folder with default value .*.Notice that the wildcard is not a single * (asterisk) but also has a dot.
  13. Highlight the titanic-dataset/ part of the path and create a custom parameter. This time we choose the Pattern type.This pattern selects all the files called part-1.csv from any of the folders under titanic-dataset/.
    A parameter can be used more than once in a path. In the following example, we use our newly created parameter any_pattern twice in our URI to match any of the part files in any of the folders under titanic-dataset/.
    Finally, let’s create a Datetime parameter. Datetime parameters are useful when we’re dealing with paths that are partitioned by date and time, like those generated by Amazon Kinesis Data Firehose (see Dynamic Partitioning in Kinesis Data Firehose). For this demonstration, we use the data under the datetime-data folder.
  14. Select the portion of your path that is a date/time and create a custom parameter. Choose the Datetime parameter type.
    When choosing the Datetime data type, you need to fill in more details.
  15. First of all, you must provide a date format. You can choose any of the predefined date/time formats or create a custom one.
    For the predefined date/time formats, the legend provides an example of a date matching the selected format. For this demonstration, we choose the format yyyy/MM/dd.
  16. Next, specify a time zone for the date/time values.
    For example, the current date may be January 1, 2022, in one time zone, but may be January 2, 2022, in another time zone.
  17. Finally, you can select the time range, which lets you select the range of files that you want to include in your data flow.
    You can specify your time range in hours, days, weeks, months, or years. For this example, we want to get all the files from the last year.
  18. Provide a description of the parameter and choose Create.
    If you’re using multiple datasets with different time zones, the time is not converted automatically; you need to preprocess each file or source to convert it to one time zone.The selected files are all the files under the folders corresponding to last year’s data.
  19. Now if we create a data transformation job, we can see a list of all of our defined parameters, and we can override their default values so that our transformation jobs pick the specified files.

Schedule processing jobs

You can now schedule processing jobs to automate running the data transformation jobs and exporting your transformed data to either Amazon S3 or Amazon SageMaker Feature Store. You can schedule the jobs with the time and periodicity that suits your needs.

Scheduled processing jobs use Amazon EventBridge rules to schedule the job’s run. Therefore, as a prerequisite, you have to make sure that the AWS Identity and Access Management (IAM) role being used by Data Wrangler, namely the Amazon SageMaker execution role of the Studio instance, has permissions to create EventBridge rules.

Configure IAM

Proceed with the following updates on the IAM SageMaker execution role corresponding to the Studio instance where the Data Wrangler flow is running:

  1. Attach the AmazonEventBridgeFullAccess managed policy.
  2. Attach a policy to grant permission to create a processing job:
    {
    	"Version": "2012-10-17",
    	"Statement": [
    		{
    			"Effect": "Allow",
    			"Action": "sagemaker:StartPipelineExecution",
    			"Resource": "arn:aws:sagemaker:Region:AWS-account-id:pipeline/data-wrangler-*"
    		}
    	]
    }
  3. Grant EventBridge permission to assume the role by adding the following trust policy:
    {
    	"Effect": "Allow",
    	"Principal": {
    		"Service": "events.amazonaws.com"
    	},
    	"Action": "sts:AssumeRole"
    }

Alternatively, if you’re using a different role to run the processing job, apply the policies outlined in steps 2 and 3 to that role. For details about the IAM configuration, refer to Create a Schedule to Automatically Process New Data.

Create a schedule

To create a schedule, have your flow opened in the Data Wrangler flow editor.

  1. On the Data Flow tab, choose Create job.
  2. Configure the required fields and chose Next, 2. Configure job.
  3. Expand Associate Schedules.
  4. Choose Create new schedule.

    The Create new schedule dialog opens, where you define the details of the processing job schedule.
    The dialog offers great flexibility to help you define the schedule. You can have, for example, the processing job running at a specific time or every X hours, on specific days of the week.
    The periodicity can be granular to the level of minutes.
  5. Define the schedule name and periodicity, then choose Create to save the schedule.
  6. You have the option to start the processing job right away along with the scheduling, which takes care of future runs, or leave the job to run only according to the schedule.
  7. You can also define an additional schedule for the same processing job.
  8. To finish the schedule for the processing job, choose Create.
    You see a “Job scheduled successfully” message. Additionally, if you chose to leave the job to run only according to the schedule, you see a link to the EventBridge rule that you just created.

If you choose the schedule link, a new tab in the browser opens, showing the EventBridge rule. On this page, you can make further modifications to the rule and track its invocation history. To stop your scheduled processing job from running, delete the event rule that contains the schedule name.

The EventBridge rule shows a SageMaker pipeline as its target, which is triggered according to the defined schedule, and the processing job invoked as part of the pipeline.

To track the runs of the SageMaker pipeline, you can go back to Studio, choose the SageMaker resources icon, choose Pipelines, and choose the pipeline name you want to track. You can now see a table with all current and past runs and status of that pipeline.

You can see more details by double-clicking a specific entry.

Clean up

When you’re not using Data Wrangler, it’s recommended to shut down the instance on which it runs to avoid incurring additional fees.

To avoid losing work, save your data flow before shutting Data Wrangler down.

  1. To save your data flow in Studio, choose File, then choose Save Data Wrangler Flow. Data Wrangler automatically saves your data flow every 60 seconds.
  2. To shut down the Data Wrangler instance, in Studio, choose Running Instances and Kernels.
  3. Under RUNNING APPS, choose the shutdown icon next to the sagemaker-data-wrangler-1.0 app.
  4. Choose Shut down all to confirm.

Data Wrangler runs on an ml.m5.4xlarge instance. This instance disappears from RUNNING INSTANCES when you shut down the Data Wrangler app.

After you shut down the Data Wrangler app, it has to restart the next time you open a Data Wrangler flow file. This can take a few minutes.

Conclusion

In this post, we demonstrated how you can use parameters to import your datasets using Data Wrangler flows and create data transformation jobs on them. Parameterized datasets allow for more flexibility on the datasets you use and allow you to reuse your flows. We also demonstrated how you can set up scheduled jobs to automate your data transformations and exports to either Amazon S3 or Feature Store, at the time and periodicity that suits your needs, directly from within Data Wrangler’s user interface.

To learn more about using data flows with Data Wrangler, refer to Create and Use a Data Wrangler Flow and Amazon SageMaker Pricing. To get started with Data Wrangler, see Prepare ML Data with Amazon SageMaker Data Wrangler.


About the authors

David Laredo is a Prototyping Architect for the Prototyping and Cloud Engineering team at Amazon Web Services, where he has helped develop multiple machine learning prototypes for AWS customers. He has been working in machine learning for the last 6 years, training and fine-tuning ML models and implementing end-to-end pipelines to productionize those models. His areas of interest are NLP, ML applications, and end-to-end ML.

Givanildo Alves is a Prototyping Architect with the Prototyping and Cloud Engineering team at Amazon Web Services, helping clients innovate and accelerate by showing the art of possible on AWS, having already implemented several prototypes around artificial intelligence. He has a long career in software engineering and previously worked as a Software Development Engineer at Amazon.com.br.

Adrian Fuentes is a Program Manager with the Prototyping and Cloud Engineering team at Amazon Web Services, innovating for customers in machine learning, IoT, and blockchain. He has over 15 years of experience managing and implementing projects and 1 year of tenure on AWS.