亚马逊AWS官方博客

使用 AWS CDK 加速中国区 OpenSearch Domain 部署

背景介绍

自从Amazon Opensearch Service登陆中国北京和宁夏区,越来越多的企业借助其特性进行交互式日志分析、实时应用程序监控、网站搜索等。由于企业对Opensearch Service的依赖日益加深,快速部署Opensearch Service的需求也愈发增多。IaC(Infrastructure as Code)这一概念以及相关工具的出现很好的满足了企业对快速部署和管理云上资源的这一需求。本文着重介绍如何使用AWS CDK进行快速部署Opensearch Domain。

部署架构

下面给出一种具备通用性的部署架构。其中Tooling (DevOps) Account是专门用于部署工具的账号,比如部署Amazon CodePipeline 服务或者Amazon CodeBuild 服务。Deployment Account是用于部署企业应用的基础设置,比如Amazon OpenSearch Domain。

先决条件

本文假定以下前提:

  • 对AWS CDK(AWS Cloud Development Kit)有一定的了解
  • 对Python编程语言有一定的基础
  • 具备一个AWS中国区的账号

实施步骤

出于演示的目的,以下实施步骤是对上述的部署架构做了简化后得出的。您可以通过Amazon EC2、Cloud 9或者本地电脑构建CDK应用环境,以下操作均在Mac笔记本电脑中执行,如果您的用户不是root,在下列各命令前需要加上sudo。在安装CDK前,请确保您已经安装了Node.js并更新到了最新版本。CDK支持TypeScript,Java,Python等多种语言,本文将使用Python语言。请参考此文档构建CDK应用环境。

  1. 创建代码目录,例如opensearch_domain
mkdir opensearch_domain
  1. 进入代码目录并执行cdk init命令初始化代码工程目录
cdk init -l python --generate-only

这时候得到的代码文件目录结构如下图所示:

  1. 进入目录opensearch_domain并编辑opensearch_domain_stack.py
  • 第一步:首先添加创建VPC相关的代码。这里一共创建四个子网,两个公有子网和两个私有子网,其中,私有子网通过NAT Gateway与外部通讯。
        vpc = ec2.Vpc(
            self,
            id="vpc",
            max_azs=2,
            cidr="10.0.8.0/24",
            subnet_configuration=[
                ec2.SubnetConfiguration(
                    subnet_type=ec2.SubnetType.PUBLIC, name="Public", cidr_mask=26
                ),
                ec2.SubnetConfiguration(
                    subnet_type=ec2.SubnetType.PRIVATE_WITH_NAT, name="Private", cidr_mask=26
                ),
            ],
            nat_gateways=1,
        )
  • 第二步:创建OpenSearch的安全组,KMS key以及opensearch service linked role。这个service linked role主要的作用是代表OpenSearch服务与其他服务进行交互,例如OpenSearch服务使用该角色输入日志到对应的CloudWatch日志组。
        # create security group for our domain
        db_security_group = ec2.SecurityGroup(
            self,
            "OpenSearchSecurityGroup",
            vpc=vpc,
            allow_all_outbound=False,
        )
        # create encryption key for our domain
        key = aws_kms.Key(
            self,
            id="ESKMSKey",
            removal_policy=RemovalPolicy.DESTROY,
            alias="OpensearchDomainKey",
        )
        iam.CfnServiceLinkedRole(
            self, "ServiceLinkedForElasticSearch", aws_service_name="es.amazonaws.com"
        )
  • 第三步:创建三种类型的CloudWatch日志组分别接收OpenSearch服务的三种不同的日志。它们分别是应用日志(Application Logs),搜索慢速日志(Search Slow Logs) 以及索引慢速日志(Indexing Slow Logs)。
        # create log groups for our domain
        logGroup_map = {"AppLogs": "", "SearchLogs": "",      "IndexLogs": ""}
        for log_kind in logGroup_map.keys():
            logGroup_map[log_kind] = logs.LogGroup(
                self,
                f"ES{log_kind}",
                log_group_name=f"ES{log_kind}",
                removal_policy=RemovalPolicy.DESTROY,
                retention=logs.RetentionDays.ONE_MONTH,
            )
  • 第四步:创建一个Lambda函数为OpenSearch服务进行赋权,授予OpenSearch服务足够的权限以输出相关的日志到上一步所创建的三个日志组中。
    • 在当前目录创建一个名为custom_resource的目录
    • 在目录custom_resource里面创建两个文件分别为__init__.py以及loggroup_resource_policy.py
    • 编辑文件loggroup_resource_policy.py并在其中加入如下代码。这段代码是Lambda函数的主体,主要作用是为OpenSearch授权以操作CloudWatch日志组。
import json
import boto3
import os
from botocore.exceptions import ClientError

logs_client = boto3.client("logs")


def PutPolicy(policy_name: str):
    loggroup_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Principal": {"Service": "es.amazonaws.com"},
                "Action": [
                    "logs:PutLogEvents",
                    "logs:PutLogEventsBatch",
                    "logs:CreateLogStream",
                ],
                "Resource": "*",
            }
        ],
    }
    logs_client.put_resource_policy(
        policyName=policy_name, policyDocument=json.dumps(loggroup_policy)
    )
    return


def DeletePolicy(policy_name: str):
    logs_client.delete_resource_policy(policyName=policy_name)
    return


def handler(event, context):
    AWS_ACCOUNT = os.environ.get('AWS_ACCOUNT')
    AWS_REGION = os.environ.get('AWS_REGION')
    print("Received:", event)
    PolicyName = event["ResourceProperties"]["Properties"]["PolicyName"]
    try:
        if event["RequestType"] == "Delete":
            DeletePolicy(PolicyName)
        if event["RequestType"] == "Create":
            PutPolicy(PolicyName)
    except ClientError as e:
        print("Unexpected error: %s" % e)
        raise e
    return {
        'PhysicalResourceId': f'LogGroupResourcePolicy{AWS_ACCOUNT}{AWS_REGION}{PolicyName}'
    }
  • 第五步:回到文件opensearch_domain_stack.py添加创建Lambda函数的代码。
        # create a lambda to put resource policy
        # Grant Lambda function permission to manuiplate log groups
        role_inline_policy = iam.Policy(
            self,
            "LambdaRolePolicy",
            statements=[
                iam.PolicyStatement(
                    actions=[
                        "logs:*",
                    ],
                    resources=["*"],
                )
            ],
        )
        # Create an iam role for this Lambda function
        lambda_role = iam.Role(
            self,
            "LambdaRole",
            inline_policies={
                "LambdaRoleInlinePolicy": role_inline_policy.document
            },
            assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
        )
        lambda_role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name(
                "service-role/AWSLambdaBasicExecutionRole"
            )
        )
        lambda_role.add_managed_policy(
            iam.ManagedPolicy.from_aws_managed_policy_name(
                "service-role/AWSLambdaVPCAccessExecutionRole"
            )
        )
        # Specify the code path for this Lambda
        loggroup_resource_policy_path = os.path.realpath(
            os.path.join(
                os.path.dirname(__file__),
                "custom_resource",
            )
        )
        # Create this Lambda function
        resource_policy_handler = _lambda.Function(
            self,
            "logGroupResourcePolicyHandler",
            role=lambda_role,
            handler="loggroup_resource_policy.handler",
            code=_lambda.Code.from_asset(
                path=loggroup_resource_policy_path,
            ),
            memory_size=512,
            timeout=Duration.seconds(900),
            environment={"LOG_LEVEL": "DEBUG"},
            vpc=vpc,
            runtime=_lambda.Runtime.PYTHON_3_9,
        )
  • 第六步:加入调用该Lambda函数的代码。
        resource_policy_handler_cr = cr.Provider(
            self,
            "ResourcePolicyHandlerProvider",
            on_event_handler=resource_policy_handler,
            log_retention=logs.RetentionDays.FIVE_DAYS,
        )
        resource_policy_handler_cr.node.add_dependency(resource_policy_handler)
        logGroup_policy = CfnCustomResource(
            self,
            "ResourcePolicyHandlerCR",
            service_token=resource_policy_handler_cr.service_token,
        )
        logGroup_policy.add_property_override(
            property_path="Properties.PolicyName",
            value="OpensearchLoggroupPolicy"
        )
  • 第七步:最后就可以创建opensearch domain了。这里创建了一个具有三个Master节点,两个data节点的opensearch集群。集群所在的子网是私有子网,并使用了上面创建的KMS密钥对集群中的静态数据进行加密。同时配置了上面创建的CloudWatch日志组和使用IAM进行访问控制。
        domain = opensearch.CfnDomain(
            self,
            id="SampleOpenSearchDomain",
            engine_version="OpenSearch_1.0",
            cluster_config=opensearch.CfnDomain.ClusterConfigProperty(
                dedicated_master_count=3,
                dedicated_master_enabled=True,
                dedicated_master_type="r5.xlarge.search",
                instance_count=2,
                instance_type="r5.xlarge.search",
                warm_enabled=False,
                zone_awareness_config=opensearch.CfnDomain.ZoneAwarenessConfigProperty(
                    availability_zone_count=2
                ),
                zone_awareness_enabled=True,
            ),
            vpc_options=opensearch.CfnDomain.VPCOptionsProperty(
                security_group_ids=[db_security_group.security_group_id],
                subnet_ids=[subnet.subnet_id for subnet in vpc.private_subnets]
            ),
            ebs_options=opensearch.CfnDomain.EBSOptionsProperty(
                ebs_enabled=True, volume_size=20, volume_type="gp2"
            ),
            encryption_at_rest_options=opensearch.CfnDomain.EncryptionAtRestOptionsProperty(
                enabled=True, kms_key_id=key.key_id
            ),
            node_to_node_encryption_options=opensearch.CfnDomain.NodeToNodeEncryptionOptionsProperty(
                enabled=True
            ),
            domain_endpoint_options=opensearch.CfnDomain.DomainEndpointOptionsProperty(
                enforce_https=True,
                tls_security_policy="Policy-Min-TLS-1-0-2019-07",
                custom_endpoint_enabled=False,
            ),
            log_publishing_options={
                "ES_APPLICATION_LOGS": opensearch.CfnDomain.LogPublishingOptionProperty(
                    enabled=True,
                    cloud_watch_logs_log_group_arn=logGroup_map[
                        "AppLogs"
                    ].log_group_arn,
                ),
                "SEARCH_SLOW_LOGS": opensearch.CfnDomain.LogPublishingOptionProperty(
                    enabled=True,
                    cloud_watch_logs_log_group_arn=logGroup_map[
                        "SearchLogs"
                    ].log_group_arn,
                ),
                "INDEX_SLOW_LOGS": opensearch.CfnDomain.LogPublishingOptionProperty(
                    enabled=True,
                    cloud_watch_logs_log_group_arn=logGroup_map[
                        "IndexLogs"
                    ].log_group_arn,
                ),
            },
            # we choose to use iam to control access
            access_policies=iam.PolicyDocument(
                statements=[
                    iam.PolicyStatement(
                        effect=iam.Effect.ALLOW,
                        actions=["es:*"],
                        resources=["*"],
                        principals=[iam.AnyPrincipal()],
                    )
                ]
            ),
        )
        domain.add_depends_on(logGroup_policy)
  1. 保存并退出文件opensearch_domain_stack.py的编辑。设置目标账号的密钥,这里建议使用环境变量进行设置,主要使用以下命令进行设置。(注意需要将正确的密钥替换上去后才可以执行)
export AWS_ACCESS_KEY_ID=<your_access_key_id>
export AWS_SECRET_ACCESS_KEY=<your_access_key_secret>
export AWS_SESSION_TOKEN=<your_session_token>
  1. 执行下面命令检查代码并查看能否正常生成对应的CloudFormation的模版。
cdk synth
  1. 执行下面命令创建opensearch domain。
cdk deploy

创建成功之后会见到如下截图的信息:

进入AWS控制台并搜索OpenSearch Service,进入Amazon OpenSearch Service 页面并点击sample-opensearch-domain,应该可以看到类似如下截图的结果。

总结

本文详细描述了如何利用AWS CDK逐步构建OpenSearch Domain。其中有两个比较重要的知识点值得关注。

  1. 在使用CDK去创建资源的时候,如果遇到没有具体资源没有相对应的API覆盖(如上述的为OpenSearch创建CloudWatch日志组的Policy),可以考虑通过Lambda函数封装Boto3的API,然后通过CDK的custom_resource调用Lambda,从而成功创建所需要的目标资源。
  2. 在中国区创建OpenSearch Domain暂时只能使用接口CfnDomain。若是使用接口aws_opensearchservice.Domain,则会出现类似“Access denied for operation. You don’t have permissions to integrate with Cognito“的错误。原因是CDK产生的CloudFormation模版里面默认包含了Cognito相关的参数(即使没有在CDK代码中显式指定),但是中国区的OpenSearch与Cognito的集成尚在建设当中,故而出现上述错误。而使用aws_opensearchservice.CfnDomain的话,就相当于直接编写CloudFormattion模板,可以控制Cognito相关的参数不出现在CDK生成的CloudFormation模版中,从而避免了上述错误。

本篇作者

梁宇

亚马逊云科技专业服务团队DevOps顾问,主要负责DevOps技术实施。尤为热衷云原生服务及其相关技术。在工作之余,他喜欢运动,以及和家人一起旅游。

罗圣杰

亚马逊云科技专业服务团队大数据架构师。负责基于AWS 的数据仓库和数据湖的解决方案咨询,架构与交付,擅长无服务计算,数据迁移,多云数据集成,数仓规划和数据服务的架构设计和落地。

刘娟池

亚马逊云科技专业服务团队高级大数据顾问。负责Dataall项目从Global到中国区的落地实施。在基于AWS的数据治理、数据分析、大数据应用等领域有多个项目咨询和成功交付经验。在进入AWS之前服务过金融、地产、销售制造等企业级用户,在数据集成、分析应用方面有10+的工作经验。