AWS Big Data Blog

Introducing shared VPC support on Amazon MWAA

In this post, we demonstrate automating deployment of Amazon Managed Workflows for Apache Airflow (Amazon MWAA) using customer-managed endpoints in a VPC, providing compatibility with shared, or otherwise restricted, VPCs.

Data scientists and engineers have made Apache Airflow a leading open source tool to create data pipelines due to its active open source community, familiar Python development as Directed Acyclic Graph (DAG) workflows, and extensive library of pre-built integrations. Amazon MWAA is a managed service for Airflow that makes it easy to run Airflow on AWS without the operational burden of having to manage the underlying infrastructure. For each Airflow environment, Amazon MWAA creates a single-tenant service VPC, which hosts the metadatabase that stores states and the web server that provides the user interface. Amazon MWAA further manages Airflow scheduler and worker instances in a customer-owned and managed VPC, in order to schedule and run tasks that interact with customer resources. Those Airflow containers in the customer VPC access resources in the service VPC via a VPC endpoint.

Many organizations choose to centrally manage their VPC using AWS Organizations, allowing a VPC in an owner account to be shared with resources in a different participant account. However, because creating a new route outside of a VPC is considered a privileged operation, participant accounts can’t create endpoints in owner VPCs. Furthermore, many customers don’t want to extend the security privileges required to create VPC endpoints to all users provisioning Amazon MWAA environments. In addition to VPC endpoints, customers also wish to restrict data egress via Amazon Simple Queue Service (Amazon SQS) queues, and Amazon SQS access is a requirement in the Amazon MWAA architecture.

Shared VPC support for Amazon MWAA adds the ability for you to manage your own endpoints within your VPCs, adding compatibility to shared and otherwise restricted VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by your Amazon MWAA environments. This post demonstrates how customer-managed endpoints work with Amazon MWAA and provides examples of how to automate the provisioning of those endpoints.

Solution overview

Shared VPC support for Amazon MWAA allows multiple AWS accounts to create their Airflow environments into shared, centrally managed VPCs. The account that owns the VPC (owner) shares the two private subnets required by Amazon MWAA with other accounts (participants) that belong to the same organization from AWS Organizations. After the subnets are shared, the participants can view, create, modify, and delete Amazon MWAA environments in the subnets shared with them.

When users specify the need for a shared, or otherwise policy-restricted, VPC during environment creation, Amazon MWAA will first create the service VPC resources, then enter a pending state for up to 72 hours, with an Amazon EventBridge notification of the change in state. This allows owners to create the required endpoints on behalf of participants based on endpoint service information from the Amazon MWAA console or API, or programmatically via an AWS Lambda function and EventBridge rule, as in the example in this post.

After those endpoints are created on the owner account, the endpoint service in the single-tenant Amazon MWAA VPC will detect the endpoint connection event and resume environment creation. Should there be an issue, you can cancel environment creation by deleting the environment during this pending state.

This feature also allows you to remove the create, modify, and delete VPCE privileges from the AWS Identity and Access Management (IAM) principal creating Amazon MWAA environments, even when not using a shared VPC, because that permission will instead be imposed on the IAM principal creating the endpoint (the Lambda function in our example). Furthermore, the Amazon MWAA environment will provide the SQS queue Amazon Resource Name (ARN) used by the Airflow Celery Executor to queue tasks (the Celery Executor Queue), allowing you to explicitly enter those resources into your network policy rather than having to provide a more open and generalized permission.

In this example, we create the VPC and Amazon MWAA environment in the same account. For shared VPCs across accounts, the EventBridge rule and Lambda function would exist in the owner account, and the Amazon MWAA environment would be created in the participant account. See Sending and receiving Amazon EventBridge events between AWS accounts for more information.

Prerequisites

You should have the following prerequisites:

  • An AWS account
  • An AWS user in that account, with permissions to create VPCs, VPC endpoints, and Amazon MWAA environments
  • An Amazon Simple Storage Service (Amazon S3) bucket in that account, with a folder called dags

Create the VPC

We begin by creating a restrictive VPC using an AWS CloudFormation template, in order to simulate creating the necessary VPC endpoint and modifying the SQS endpoint policy. If you want to use an existing VPC, you can proceed to the next section.

  1. Download the CloudFormation template referenced in Option three: Creating an Amazon VPC network without Internet access.
  2. Extract the file cfn-vpc-private-bjs.yml from the downloaded ZIP archive.
  3. Now we edit our CloudFormation template to restrict access to Amazon SQS. In cfn-vpc-private-bjs.yml, edit the SqsVpcEndoint section to appear as follows:
   SqsVpcEndoint:
     Type: AWS::EC2::VPCEndpoint
     Properties:
       ServiceName: !Sub "com.amazonaws.${AWS::Region}.sqs"
       VpcEndpointType: Interface
       VpcId: !Ref VPC
       PrivateDnsEnabled: true
       SubnetIds:
        - !Ref PrivateSubnet1
        - !Ref PrivateSubnet2
       SecurityGroupIds:
        - !Ref SecurityGroup
       PolicyDocument:
        Statement:
         - Effect: Allow
           Principal: '*'
           Action: '*'
           Resource: []
Bash

This additional policy document entry prevents Amazon SQS egress to any resource not explicitly listed.

Now we can create our CloudFormation stack.

  1. On the AWS CloudFormation console, choose Create stack.
  2. Select Upload a template file.
  3. Choose Choose file.
  4. Browse to the file you modified.
  5. Choose Next.
  6. For Stack name, enter MWAA-Environment-VPC.
  7. Choose Next until you reach the review page.
  8. Choose Submit.

Create the Lambda function

We have two options for self-managing our endpoints: manual and automated. In this example, we create a Lambda function that responds to the Amazon MWAA EventBridge notification. You could also use the EventBridge notification to send an Amazon Simple Notification Service (Amazon SNS) message, such as an email, to someone with permission to create the VPC endpoint manually.

First, we create a Lambda function to respond to the EventBridge event that Amazon MWAA will emit.

  1. On the Lambda console, choose Create function.
  2. For Name, enter mwaa-create-lambda.
  3. For Runtime, choose Python 3.11.
  4. Choose Create function.
  5. For Code, in the Code source section, for lambda_function, enter the following code:
    import boto3
    import json
    import logging
    
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    def lambda_handler(event, context):
        if event['detail']['status']=="PENDING":
            detail=event['detail']
            name=detail['name']
            celeryExecutorQueue=detail['celeryExecutorQueue']
            subnetIds=detail['networkConfiguration']['subnetIds']
            securityGroupIds=detail['networkConfiguration']['securityGroupIds']
            databaseVpcEndpointService=detail['databaseVpcEndpointService']
    
            # MWAA does not need to store the VPC ID, but we can get it from the subnets
            client = boto3.client('ec2')
            response = client.describe_subnets(SubnetIds=subnetIds)
            logger.info(response['Subnets'][0]['VpcId'])  
            vpcId=response['Subnets'][0]['VpcId']
            logger.info("vpcId: " + vpcId)       
            
            webserverVpcEndpointService=None
            if detail['webserverAccessMode']=="PRIVATE_ONLY":
                webserverVpcEndpointService=event['detail']['webserverVpcEndpointService']
            
            response = client.describe_vpc_endpoints(
                VpcEndpointIds=[],
                Filters=[
                    {"Name": "vpc-id", "Values": [vpcId]},
                    {"Name": "service-name", "Values": ["*.sqs"]},
                    ],
                MaxResults=1000
            )
            sqsVpcEndpoint=None
            for r in response['VpcEndpoints']:
                if subnetIds[0] in r['SubnetIds'] or subnetIds[0] in r['SubnetIds']:
                    # We are filtering describe by service name, so this must be SQS
                    sqsVpcEndpoint=r
                    break
            
            if sqsVpcEndpoint:
                logger.info("Found SQS endpoint: " + sqsVpcEndpoint['VpcEndpointId'])
    
                logger.info(sqsVpcEndpoint)
                pd = json.loads(sqsVpcEndpoint['PolicyDocument'])
                for s in pd['Statement']:
                    if s['Effect']=='Allow':
                        resource = s['Resource']
                        logger.info(resource)
                        if '*' in resource:
                            logger.info("'*' already allowed")
                        elif celeryExecutorQueue in resource: 
                            logger.info("'"+celeryExecutorQueue+"' already allowed")                
                        else:
                            s['Resource'].append(celeryExecutorQueue)
                            logger.info("Updating SQS policy to " + str(pd))
            
                            client.modify_vpc_endpoint(
                                VpcEndpointId=sqsVpcEndpoint['VpcEndpointId'],
                                PolicyDocument=json.dumps(pd)
                                )
                        break
            
            # create MWAA database endpoint
            logger.info("creating endpoint to " + databaseVpcEndpointService)
            endpointName=name+"-database"
            response = client.create_vpc_endpoint(
                VpcEndpointType='Interface',
                VpcId=vpcId,
                ServiceName=databaseVpcEndpointService,
                SubnetIds=subnetIds,
                SecurityGroupIds=securityGroupIds,
                TagSpecifications=[
                    {
                        "ResourceType": "vpc-endpoint",
                        "Tags": [
                            {
                                "Key": "Name",
                                "Value": endpointName
                            },
                        ]
                    },
                ],           
            )
            logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
                
            # create MWAA web server endpoint (if private)
            if webserverVpcEndpointService:
                endpointName=name+"-webserver"
                logger.info("creating endpoint to " + webserverVpcEndpointService)
                response = client.create_vpc_endpoint(
                    VpcEndpointType='Interface',
                    VpcId=vpcId,
                    ServiceName=webserverVpcEndpointService,
                    SubnetIds=subnetIds,
                    SecurityGroupIds=securityGroupIds,
                    TagSpecifications=[
                        {
                            "ResourceType": "vpc-endpoint",
                            "Tags": [
                                {
                                    "Key": "Name",
                                    "Value": endpointName
                                },
                            ]
                        },
                    ],                  
                )
                logger.info("created VPCE: " + response['VpcEndpoint']['VpcEndpointId'])
    
        return {
            'statusCode': 200,
            'body': json.dumps(event['detail']['status'])
        }
    Python
  6. Choose Deploy.
  7. On the Configuration tab of the Lambda function, in the General configuration section, choose Edit.
  8. For Timeout, increate to 5 minutes, 0 seconds.
  9. Choose Save.
  10. In the Permissions section, under Execution role, choose the role name to edit the permissions of this function.
  11. For Permission policies, choose the link under Policy name.
  12. Choose Edit and add a comma and the following statement:
    {
    		"Sid": "Statement1",
    		"Effect": "Allow",
    		"Action": 
    		[
    			"ec2:DescribeVpcEndpoints",
    			"ec2:CreateVpcEndpoint",
    			"ec2:ModifyVpcEndpoint",
                "ec2:DescribeSubnets",
    			"ec2:CreateTags"
    		],
    		"Resource": 
    		[
    			"*"
    		]
    }
    Bash

The complete policy should look similar to the following:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "logs:CreateLogGroup",
			"Resource": "arn:aws:logs:us-east-1:112233445566:*"
		},
		{
			"Effect": "Allow",
			"Action": [
				"logs:CreateLogStream",
				"logs:PutLogEvents"
			],
			"Resource": [
				"arn:aws:logs:us-east-1:112233445566:log-group:/aws/lambda/mwaa-create-lambda:*"
			]
		},
		{
			"Sid": "Statement1",
			"Effect": "Allow",
			"Action": [
				"ec2:DescribeVpcEndpoints",
				"ec2:CreateVpcEndpoint",
				"ec2:ModifyVpcEndpoint",
               	"ec2:DescribeSubnets",
				"ec2:CreateTags"
			],
			"Resource": [
				"*"
			]
		}
	]
}
Bash
  1. Choose Next until you reach the review page.
  2. Choose Save changes.

Create an EventBridge rule

Next, we configure EventBridge to send the Amazon MWAA notifications to our Lambda function.

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter mwaa-create.
  3. Select Rule with an event pattern.
  4. Choose Next.
  5. For Creation method, choose User pattern form.
  6. Choose Edit pattern.
  7. For Event pattern, enter the following:
    {
      "source": ["aws.airflow"],
      "detail-type": ["MWAA Environment Status Change"]
    }
    Bash
  8. Choose Next.
  9. For Select a target, choose Lambda function.

You may also specify an SNS notification in order to receive a message when the environment state changes.

  1. For Function, choose mwaa-create-lambda.
  2. Choose Next until you reach the final section, then choose Create rule.

Create an Amazon MWAA environment

Finally, we create an Amazon MWAA environment with customer-managed endpoints.

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter a unique name for your environment.
  3. For Airflow version, choose the latest Airflow version.
  4. For S3 bucket, choose Browse S3 and choose your S3 bucket, or enter the Amazon S3 URI.
  5. For DAGs folder, choose Browse S3 and choose the dags/ folder in your S3 bucket, or enter the Amazon S3 URI.
  6. Choose Next.
  7. For Virtual Private Cloud, choose the VPC you created earlier.
  8. For Web server access, choose Public network (Internet accessible).
  9. For Security groups, deselect Create new security group.
  10. Choose the shared VPC security group created by the CloudFormation template.

Because the security groups of the AWS PrivateLink endpoints from the earlier step are self-referencing, you must choose the same security group for your Amazon MWAA environment.

  1. For Endpoint management, choose Customer managed endpoints.
  2. Keep the remaining settings as default and choose Next.
  3. Choose Create environment.

When your environment is available, you can access it via the Open Airflow UI link on the Amazon MWAA console.

Clean up

Cleaning up resources that are not actively being used reduces costs and is a best practice. If you don’t delete your resources, you can incur additional charges. To clean up your resources, complete the following steps:

  1. Delete your Amazon MWAA environment, EventBridge rule, and Lambda function.
  2. Delete the VPC endpoints created by the Lambda function.
  3. Delete any security groups created, if applicable.
  4. After the above resources have completed deletion, delete the CloudFormation stack to ensure that you have removed all of the remaining resources.

Summary

This post described how to automate environment creation with shared VPC support in Amazon MWAA. This gives you the ability to manage your own endpoints within your VPC, adding compatibility to shared, or otherwise restricted, VPCs. Specifying customer-managed endpoints also provides the ability to meet strict security policies by explicitly restricting VPC resource access to just those needed by their Amazon MWAA environments. To learn more about Amazon MWAA, refer to the Amazon MWAA User Guide. For more posts about Amazon MWAA, visit the Amazon MWAA resources page.


About the author

John Jackson has over 25 years of software experience as a developer, systems architect, and product manager in both startups and large corporations and is the AWS Principal Product Manager responsible for Amazon MWAA.