AWS Open Source Blog

Getting started with the open source data science tool Metaflow on AWS

Data science is hard. Customers face business challenges today at a scale larger and more complex than ever before, and data scientists bring unique skills to the table to help solve some of those problems. The concept is simple: Data scientists use large amounts of data to break a problem down into pieces that machines can analyze, and then use the appropriate algorithms and tooling to gain insight into possible solutions. The reality, however, isn’t nearly as simple. Going from “experiment” to “production” is often a fragmented process, and ensuring that inputs and results are consistent between iterations can be challenging. Additionally, the scope of an experiment inevitably outgrows the capacity of a data scientist’s local toolsets, which leads to other challenges. How and when should data scientists use the power of the cloud to accelerate their processes? How can they ensure consistency and repeatability between their local and scaled experiments?

At AWS re:Invent 2019, Netflix took to the stage to announce the open-sourcing of Metaflow, their answer to some of those questions. In this post, we explain how to use Metaflow and AWS to make data science pipelines not only portable to the cloud, but also seamlessly scalable and inspectable.

In his well-received talk, “More Data Science with Less Engineering,” Netflix’s Ville Tuulos tells this story through the eyes of a typical data scientist. A data scientist using Python or R may initially explore a small subset of data and a Jupyter Notebook to experiment with solutions. Although this part of the process consists of mature and popular tooling for model creation, it invariably reaches a point in which data scientists must start using updated data and iterating across different versions. Suddenly, the data scientist is faced with new challenges outside their core skill set, such as data extraction, transformation, storage, and version management. Additionally, once the process becomes large and complex enough to run on servers instead of a single laptop, the data scientist now must consider scaling, packaging, containerization, infrastructure management, and repeatability across independent runs. After those hurdles, there’s also the challenge of aggregating and analyzing the results, which may span across dozens of different systems and hundreds of iterations. As a result, preparing and training a machine-learning model ends up being a tiny fraction of the data scientist’s work, despite it being the core piece required to solve those business challenges.

Metaflow’s promise is allowing those oft-repeated and fragmented pieces of a machine-learning project to be streamlined and executed in a human-friendly way. Additionally, when the time comes to move a project away from local prototyping onto an elastically scalable cloud infrastructure, Metaflow makes the process of using the power of AWS as easy as a running a single command—no infrastructure knowledge required.

The decoupling of engineering and data science allows data scientists the flexibility to use independent data management and processing logic to feed training and evaluation across any number of modeling ecosystems. Using Metaflow, they can easily create projects that not only seamlessly transition between environments, but also use whichever tools best fit the job, ranging all the way from simple localized NumPy prototypes to large-scale distributed training and inference using platforms such as Amazon Sagemaker. Metaflow also tackles the challenge of parsing results by providing a user-friendly API that data scientists can invoke from a Jupyter notebook to evaluate the historical results of their executions independent of where they ran.

Getting started with Metaflow

At a high level, the Metaflow interface allows us to wrap our favorite bits of Python—or R as of July 2020—in simple functions or decorators, and then execute them within the context of Metaflow. Metaflow thinks about its execution logic in the context of a directed acyclic graph (DAG), which is a sequence of steps with a clear direction and dependency structure. In this post, we will explore the idea in Python using @step decorators. To follow along, grab your closest *nix or Mac device, use your favorite Python package manager (pip, conda) to install Metaflow, and copy the following code into a Python file. We’ll use basicflow.py in our example.

from metaflow import FlowSpec, step

class BasicFlow(FlowSpec):
  @step
  def start(self):
    """
    This is the 'start' step. All flows must have a step named 'start'.
    """
    print("Metaflow says: Hi!")
    self.next(self.end)
        
  @step
  def end(self):
    """
    This is the 'end' step. All flows must have an 'end' step.
    """
    print("My flow is all done.")
    
if __name__ == '__main__':
  BasicFlow()

As the preceding code shows, isolating Python logic in Metaflow is as simple as decorating it with @step and then specifying the next step in the flow with self.next. This fully functional flow has two simple steps with standalone print statements. When executed with python basicflow.py run, the following output is displayed:

Workflow starting (run-id 40):
[40/start/413 (pid 52979)] Task is starting.
[40/start/413 (pid 52979)] Metaflow says: Hi!
[40/start/413 (pid 52979)] Task finished successfully.
[40/end/414 (pid 53085)] Task is starting.
[40/end/414 (pid 53085)] My flow is all done.
[40/end/414 (pid 53085)] Task finished successfully.
Done!

We’ve effectively created one of the DAGs we defined earlier that has two nodes where end depends on start.

Diagram explaining how nodes within the DAG (directed acyclic graph) have "ends" that depend on their "starts".

DAG abstractions are a natural way to think about data science workflows, as they allow for independent implementations with dependencies specific to each step. Although the one defined above is quite simple, it’s common for flows to fan out into multiple executions using different parameters, such as the following hypothetical example from Netflix’s tech blog:

Diagram illustrating a complex flow resulting from multiple executions.

Metaflow allows us to define steps and their respective relationships, as well as specifying the underlying software dependencies for each step. Additionally, although having a workflow structured in this way comes with its own fair share of benefits, it provides the added advantage of making pipelines portable. Clearly defined software dependencies and points of isolation result in flows that can be recreated on any compatible execution environment. Fortunately, Metaflow comes with the tools necessary to make that transition simpler and seamless. Expanding these use cases isn’t possible without scalable storage, compute, and automation solutions, however, so let’s learn how Metaflow can use AWS services.

Infrastructure

AWS enables cloud execution of Metaflow pipelines with minimal user interaction. Although distributed runs on the cloud unlock the flexibility of defining resources per step that outweigh the capabilities of consumer systems, some infrastructure is required to glue the executions together and make them behave as one holistic flow. Fortunately, an AWS CloudFormation Template is provided that creates the underlying components needed. This includes a number of highly available services we’ll cover next, tightened security policies, dedicated IAM roles, and optional parameters that enable unique use cases, such as Internet-disabled environments. Once complete, it’s a simple matter of executing metaflow configure aws and filling in the required items.

To continue following along, find the template on GitHub. The AWS resources provisioned by the template have the potential to cost substantially less than a comparable on-premises deployment of Metaflow, but as always, be cognizant of potential costs you might incur by provisioning them.

Before we go further, however, let’s explore what AWS provides to Metaflow and why.

Storage

For steps in a flow to be truly independent without losing their interoperability, Metaflow needs a way to share objects between them. For example, a dataset generated in Step 1 is only valuable when it’s accessible in later steps that leverage it. Without Metaflow, it falls to the data scientist to persist the dataset appropriately and pass it between standalone bits of logic. With Metaflow, however, it’s much simpler. Take the following example:

from metaflow import FlowSpec, step

class SharedFlow(FlowSpec):
  @step
  def start(self):
    self.dataset = ['This', 'is', 'a', 'list']
    print('Dataset created')
    self.next(self.end)
        
  @step
  def end(self):
    print(self.dataset)
    
if __name__ == '__main__':
  SharedFlow()

Our result is unsurprising in that Metaflow generates its dataset in one step, but prints it in another:

Workflow starting (run-id 42):
[42/start/419 (pid 42185)] Task is starting.
[42/start/419 (pid 42185)] Dataset created
[42/start/419 (pid 42185)] Task finished successfully.
[42/end/420 (pid 42432)] Task is starting.
[42/end/420 (pid 42432)] ['This', 'is', 'a', 'list']
[42/end/420 (pid 42432)] Task finished successfully.
Done!

What’s happening under the covers, however, is that by storing our dataset within a step prepended by self, we have indicated to Metaflow that dataset is a variable that should be made available to other steps. In the context of a distributed execution, this ensures that Metaflow persists the Python variable to a high-performance storage platform. When configured for AWS, this platform is Amazon Simple Storage Service (Amazon S3), which enables Metaflow to take advantage of all the flexibility of Amazon S3’s tiering, lifecycle management, and redundancy. There’s the added benefit of enabling Metaflow’s artifacts to remain available to users after execution is complete, meaning that data scientists can define any variables they choose that can be introspected and aggregated for as long as the data is persisted in Amazon S3. This gives them the ability to effectively go back in time, replaying or resuming any flows in their entire execution history. We’ll show this in action a little later, but let’s first understand how Metaflow performs its computations in the cloud.

Compute

Our examples up until now have mostly leveraged the capabilities of our local machines for testing. This is actually a fairly common scenario for data scientists in the early stages of prototyping. When datasets and processing requirements get larger and more complex, however, data scientists find themselves needing to migrate their workflows to higher-horsepower environments, usually in the cloud. Often these are scaled-up instances running any number of machine learning tools (Jupyter, for example), but it falls to the users to migrate their workflows, and prototype functionality doesn’t always easily translate 1:1 into a different infrastructure.

The foundation built with Metaflow already includes many of the components necessary for an easy migration. The missing piece is a compute platform with the ability to easily package and execute the code in our flows. Metaflow’s role as a library-agnostic toolchain presents a unique requirement, however. One step in our flow could be processing data with PySpark, whereas the next could be fitting a model with Tensorflow. Additionally, although a step in the graph does have the potential to run for a long time, the eventual objective is for each step to “complete” in order to move on to the next. This creates the following requirements for our compute platform:

  • Provide an easy interface for packaging and extracting Python and R code into light execution environments.
  • Support non-specific libraries of varying types.
  • Provide run-to-completion functionality with long timeouts and easy cleanup.
  • Allow for sequencing of executions.
  • Be cost-effective for staggered and/or unpredictable workloads.

AWS Batch helps us meet these requirements!  Put simply, it is a set of batch-management capabilities that enables developers, scientists, and engineers to run hundreds of thousands of batch computing jobs on AWS. It dynamically provisions compute resources on Amazon Elastic Compute Cloud (Amazon EC2) based on demand, and it provides an efficient queue-based interface to Amazon Elastic Container Service (Amazon ECS). This feature set allows Metaflow to package its dependencies for use in a container and sequence the executions on AWS. Additionally, the flexibility of AWS Batch’s Compute Environments allows for organization or workload-specific configurations that can accommodate high memory or CPU demands, GPUs, or bursty workloads. Compatibility with spot instances also allows organizations to tune their Metaflow infrastructure to be as cost-efficient as possible.

The quickest way to get started with Metaflow and AWS Batch is to simply add the —with batch to the end of your Metaflow run command. Assuming you’ve gone through Metaflow’s AWS configuration process and named your file basicflow.py as suggested previously, you can see it in action with python basicflow.py run —with batch:

Workflow starting (run-id 44):
[44/start/425 (pid 83919)] Task is starting.
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Task is starting (status SUBMITTED)...
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Task is starting (status RUNNABLE)...
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Task is starting (status STARTING)...
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Task is starting (status RUNNING)...
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Setting up task environment.
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Downloading code package.
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Code package downloaded.
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Task is starting.
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Dataset created
[44/start/425 (pid 83919)] [70359ac8-21e8-42d5-86c1-0d8c499ad7ff] Task finished with exit code 0.
[44/start/425 (pid 83919)] Task finished successfully.
[44/end/426 (pid 84996)] Task is starting.
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Task is starting (status SUBMITTED)...
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Task is starting (status RUNNABLE)...
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Task is starting (status STARTING)...
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Task is starting (status RUNNING)...
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Setting up task environment.
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Downloading code package.
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Code package downloaded.
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Task is starting.
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] ['This', 'is', 'a', 'list']
[44/end/426 (pid 84996)] [c5c99933-6fcb-49f2-b2c0-f4f457e0c2a6] Task finished with exit code 0.
[44/end/426 (pid 84996)] Task finished successfully.
Done!

Our output looks more or less the same as our previous runs with the minor addition of a few lines telling us the status of our task moving from SUBMITTED to RUNNING. If we look at the AWS console during execution, we can see the jobs running in AWS Batch, and even view the logs as they’re executing. We’ve successfully transitioned our workload to the cloud and enabled each step in our flow to execute in its own container with configurable resources.

Screenshot of the AWS Console during the execution.

Screenshot showing each step of the flow of executions.

Scheduling

Although these examples illustrate a framework for Metaflow usage, most data science pipelines we’d choose to build will have more stringent demands. AWS Batch can help us manage those demands from the perspective of resources, but what about time? Parsing a dataset can be a multi-hour process in itself, as can model fitting and evaluation. Additionally, although manually triggering flows in AWS Batch is effective for prototyping, many pipelines will need incremental improvements over time, and others will never be entirely finished in that they continue to improve regularly with fresh data. How do we ensure that a replicable flow we create with Metaflow can be effectively transitioned to production, allowing it to be triggered and executed without the need of a dedicated machine for orchestration or manual intervention?

An August 2020 release of Metaflow 2.1 provides an answer in AWS Step Functions. Step Functions is a serverless orchestrator that enables centralized execution of sequential, checkpoint-aware workflows on a variety of AWS services, including Amazon ECS, AWS Lambda, and AWS Batch. Additionally, Step Functions provides a highly scalable and entirely serverless experience that ensures that scheduling of complex, long-running flows will just work, with no infrastructure management required. Metaflow’s graph-based model is a good fit for Step Functions in that it’s event-driven, dependent on successful end-to-end executions, and makes use of other AWS-managed services that are well-integrated with Step Functions. Netflix’s tech blog goes into more detail about the value provided to Metaflow’s model.

We can see this in action by using our existing BasicFlow and running python basicflow.py step-functions create. The run should be significantly shorter than the previous ones and include output similar to the following:

Deploying SharedFlow to AWS Step Functions...
It seems this is the first time you are deploying SharedFlow to AWS Step Functions.

A new production token generated.

The namespace of this production flow is
  production:sharedflow-0-12345
To analyze results of this production flow add this line in your notebooks:
  namespace("production:sharedflow-0-12345")
If you want to authorize other people to deploy new versions of this flow to AWS Step Functions, they need to call
  step-functions create —authorize sharedflow-0-12345
when deploying this flow to AWS Step Functions for the first time.
See "Organizing Results" at https://docs.metaflow.org/ for more information about production tokens.

Workflow SharedFlow pushed to AWS Step Functions successfully.

What will trigger execution of the workflow:
  No triggers defined. You need to launch this workflow manually.

The first thing we see is that Step Functions-based flows exist in a separate namespace. We’ll explore more about namespaces a little later, so for now make note of the namespace listed in your output, as everyone’s will be slightly different. In the meantime, we can visit the AWS Management Console and view our flow visually in AWS Step Functions.

Execution is now as simple as running python basicflow.py step-functions trigger. Then we can follow along in the console as Step Functions executes our flow on AWS Batch, no human interaction required.

Diagram illustrating Step Functions flow on AWS Batch.

Iterating is also simple with Step Functions and Metaflow. For example, if we make a small modification to our dataset in basicflow.py such as self.dataset = ['This', 'is', 'a', 'list', 'in', 'stepfunctions'], we can run step-functions create and trigger again to kick off another execution.

Additionally, for incrementally executed flows, we can add schedules for flow executions using a variety of external event triggers provided by Amazon EventBridge. Examples are big data processing job completions, data migrations, and more. We can also create schedules directly within Metaflow using the @schedule decorator, allowing them to be executed automatically on a schedule of our choice. Although these example flows are simple, this can scale to flows with hundreds or thousands of nodes that run for tens of hours at a time.

Persistence

At this point we’ve run a number of test flows and iterated several times across our simple use case. In a practical scenario, however, the number of iterations and adjustments across a single set of pipelines is likely to be substantially higher before a pipeline is deemed ready to serve customers. This experiment tracking is key to ensuring that models get consistently better and more intuitive over time. Because Metaflow persists the artifacts from every run to Amazon S3, users need a simple way to map flow executions to the specific artifacts they generate. Fortunately, Metaflow provides a human-readable API that does exactly that in its Metadata Service, and it delivers the added benefit of allowing users to access those artifacts directly as in-memory objects. Depending on our configuration, that service can live on our local system or in the cloud, allowing global accessibility—with namespace separation—to organizations or companies running Metaflow collaboratively.

An external Metadata service is already provided to AWS Metaflow users through the CloudFormation template covered previously, so let’s assume our previous examples were using that service. An easy way for data scientists to interact with their flows is through a Jupyter notebook, so if Metaflow and Jupyter are currently installed, all of the tools needed to begin this process are already available. To make things easier, we’ll explore Metaflow’s object structure alongside the code we will need.

Metaflow’s highest abstraction is the Flow. A Flow is the core object that wraps all of the individual executions. In our examples, we explored BasicFlow and SharedFlow. We ran them multiple times using a variety of different engines, but those are the Flows. To start introspecting, let’s import some libraries and run quick “getting started” code in Jupyter.

## Import the basics for viewing flows
from metaflow import get_metadata, Metaflow, Flow

## Show the metadata service currently used by Metaflow's config
print(get_metadata())

## Print the flows in the current namespace (your user)
print(Metaflow().flows)

The initial lines import the basic tools necessary to view the flows available to our user. The results should looks similar to the following, in that the Metadata service location matches what was configured with metaflow configure aws, and there should be flows that match what was run in the previous sections:

service@https://123456.execute-api.us-west-2.amazonaws.com/api/
[Flow('BasicFlow'), Flow('SharedFlow')]

Now that we have an idea of the Flows we can view, we can look at the Runs for one or all of those flows:

for run in (Flow('SharedFlow')):
    print(run)

A Run literally corresponds to a time that we as a user executed our flow. For example, if we ran SharedFlow from our local system three times, we should expect to see output similar to the following:

Run('SharedFlow/1')
Run('SharedFlow/2')
Run('SharedFlow/3')

Now, if we want to look at all the nodes that were executed in our SharedFlow graph, we can run the following to inspect the Steps in the last successful Run of this particular Flow:

for step in (Flow('SharedFlow').latest_successful_run):
    print(step)

The results should look similar to the following:

Step('SharedFlow/3/end')
Step('SharedFlow/3/start')

Lastly, we know in our SharedFlow example, we passed a dataset from the start step to the end step. We should be able to view all available Data Objects in the Step by doing the following. Note that to interface directly with a Step, we must import it from Metaflow:

from metaflow import Step
print(Step('SharedFlow/3/end').task.data)

Results should show an object that contains a name—which is simply a Metaflow-provided reference to the name of the Flow—and all other objects passed between your steps, including dataset:

<MetaflowData: dataset, name>

Let’s take a look at our dataset by doing the following:

print(Step('SharedFlow/3/end').task.data.dataset)

The result should be a the exact same list we created in our SharedFlow example:

['This', 'is', 'a', 'list']

As mentioned in previously, however, functions scheduled by Step Functions are designed to be shared by an organization, and as a result don’t exist in our user’s namespace. Let’s put everything together and see how we can do the same thing in the Run we scheduled via Step Functions previously (replace the Step Functions namespace below with the one you made note of earlier):

## Same as previous import with the addition of 'namespace'
from metaflow import get_metadata, namespace, Metaflow, Flow, Step

## Change your namespace to the one generated by 'step-functions create'
namespace('production:sharedflow-0-iwuv')

## It's assumed that 'SharedFlow' will still be there, but it's worth checking
print(Metaflow().flows)

## Lastly, we'll use some Python string formatting to stitch everything together into one line
print(Step('{}/end'.format(Flow('SharedFlow').latest_successful_run.pathspec)).task.data.dataset)

The results should look like the following:

[Flow('SharedFlow')]
['This', 'is', 'a', 'list', 'in', 'stepfunctions']

Clean up

Cleaning up our AWS environment should be fairly simple.  In the AWS CloudFormation console, delete the stack created at the beginning of this tutorial.  That will delete all resources created, with the exception of the S3 bucket (in the event that organizations want to replace their infrastructure without losing their assets).  To clean up the remainder, go to the Amazon S3 console and delete the bucket with the same name as the stack you created.

Conclusion

With that, we explained how to use Metaflow and AWS to make data science pipelines portable to the cloud and seamlessly scalable and inspectable. For more information, visit Metaflow’s website to explore features and learn how to get started using Metaflow in your own organizations.

Rob Hilton

Rob Hilton

Rob is a seasoned technologist and cloud evangelist with nearly 20 years of experience in a wide array of software and datacenter ecosystems. He is an open-source crusader and long-time Kubernaut, and his day-job at AWS sees him partnering with massively scaled cloud-native organizations to design and build the reference architectures of tomorrow. He also flies airplanes.