亚马逊AWS官方博客

利用 Amazon S3 inventory, Amazon EMR, 和 Amazon Athena 来触发针对预先存在的对象的跨区域复制

基于Amazon Simple Storage Service (Amazon S3)服务,您可以通过跨区域复制功能(CRR)来自动异步地拷贝分布在不同AWS区域桶中的对象。CRR是一个桶级别的配置,它能满足您在合规方面的要求,通过在不同区域存储备份数据以最大限度地帮助您减少潜在风险。CRR可以复制源存储桶中的所有对象,或者通过前缀和标签来选择其中的一个子集进行复制。在您启用CRR之前就已经预先存在的对象(pre-existingobjects) 是不会被复制的。同样的,如果是所使用的IAM角色复制权限不足或者存储桶政策授权不到位(当存储桶属于不同的AWS帐号),也可能无法完成对象的复制(failed objects)。在与客户合作过程中,我们发现大量基于上述原因而没能复制的对象。在本文中,我们会给您展示如何针对这些pre-existing和failed objects(早于CRR启用就已经存在的和复制失败的对象)进行跨区域复制。

 

方法论

从大的方向上来说,我们的策略是执行copy-in-place来实现pre-existing和failed objects的复制,利用Amazon S3 API 在这些对象之上进行复制,保留标签、接入控制列表(ACL)、元数据和压缩密钥。该操作也会在对象上重新设置复制状态(Replication_Status)标签。

具体来说我们通过以下来实现:

  • 通过Amazon S3 inventory 来识别copy in place的对象。这些对象没有复制状态,或者状态显示为失败。
  • 通过Amazon AthenaAWS Glue把S3 inventory文件提取成表。
  • 通过Amazon EMR 来执行Apache Spark任务以查询AWS Glue生成的表,并执行copy-in-place。

对象过滤

为了减少问题的出现(我们已经见过存储了数十亿对象的桶!)并杜绝S3 list操作,我们采用了Amazon S3 inventory服务。该服务在桶级别上启用,会提供一个S3对象的报告。Inventory文件包含对象的复制状态:PENDING, COMPLETED, FAILED,或REPLICA。Pre-existing objects在inventory中没有复制状态。

 

交互分析

为了简化使用S3 inventory创建的文件的过程,我们在AWS Glue Data Catalog中创建了一个表。您可以通过Amazon Athena来查询该表并分析对象,也可以利用它在Amazon EMR的Spark任务运行时识别出copy in place的对象。

 

Copy-in-place的执行

我们通过在Amazon EMR上运行一个Spark任务来执行针对S3对象的并发的copy-in-place。该步骤可以增大同时复制的运行规模,与使用单线程应用的连续复制相比,在进行大量对象的复制时性能会更好。

 

帐号设置

作为示例,我们为这次演示专门创建了三个S3存储桶。如果您想跟着一起操作的话,您需要以不同的名字先创建您自己的存储桶。我们分别将源存储桶和目标桶命名为crr-preexisting-demo-source 和crr-preexisting-demo-destination,源桶中还有预先存在的和复制状态为失败的对象。我们还将S3 inventory文件存储于名叫crr-preexisting-demo-inventory的第三个桶。

基本设置见以下图表:

您可以用任何桶来存储inventory,但是桶政策必须包含以下声明(需修改Resource和aws:SourceAccount来与之匹配)

  {
      "Version": "2012-10-17",
      "Id": "S3InventoryPolicy",
      "Statement": [
          {
              "Sid": "S3InventoryStatement",
              "Effect": "Allow",
              "Principal": {
                  "Service": "s3.amazonaws.com"
              },
              "Action": "s3:PutObject",
              "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
              "Condition": {
                  "StringEquals": {
                      "s3:x-amz-acl": "bucket-owner-full-control",
                      "aws:SourceAccount": "111111111111"
                  }
              }
         }
    ]
}

在本示例中,我们上载了六个对象到crr-preexisting-demo-source。我们添加了三个在CRR被启用之前预先存在的对象(preexisting-*.txt) ,以及三个由于CRR IAM角色的许可被移除而导致的CRR复制失败的对象(failed-*.txt)。

启用 S3 inventory

您需要在Amazon S3 console中完成以下操作,来启用源桶中的S3 inventory :

在源桶的Management标签处选择inventory

选择Add new,并完成下图设置,选择CSV格式,勾选Replication status。不想了解如何创建inventory的详细信息,请参考Amazon S3 Console User Guide中的How Do I Configure Amazon S3 Inventory?

启用S3 inventory后,请等待inventory文件的送达,第一份报告会在48小时以内送到。如果您正跟着演示操作的话,请确保在进行下一步之前inventory报告已被送达。以下是inventory文件的样例:

你也可以看到对象的Overview的标签上的S3 console。预先存在的对象没有一个复制状态,但是复制失败的对象会显示如下:

通过Amazon Athena来注册AWS Glue Data Catalog中的表

为了能够用SQL来查询inventory文件,首先您需要在AWS Glue Data Catalog中创建一个外部表。点击https://console.aws.amazon.com以打开Amazon Athena console,在Query Editor标签上运行如下SQL声明。该声明会把这个外部表注册在AWS Glue Data Catalog中。

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
	`bucket` string,
	 key string,
	 replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
	FIELDS TERMINATED BY ','
	ESCAPED BY '\\'
	LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

 

创建完这个表格以后,您需要通过在表中添加partition metadata以使AWS Glue Data Catalog能察觉到任何现存的数据和partitions。请使用Metastore Consistency Check功能来扫描和添加partition metadata到AWS Glue Data Catalog中。

MSCK REPAIR TABLE crr_preexisting_demo;

如您想进一步了解这么做的原因,请参考Amazon Athena User Guide中的MSCK REPAIR TABLEdata partitioning文献。

现在表和partitions都已经在Data Catalog中注册,您就可以用Amazon Athena来查询inventory文件了。

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

 

查询结果如下:

查询结果会显示S3 inventory中的某一个特定送达日期的所有行。现在您可以启动EMR cluster来复制(copy in place)预先存在的和之前复制失败的对象了。

注意:如果您的目的是解决之前复制失败的问题,在进行下一步之前请确保你已经纠正了导致失败的因素(IAM的权限或S3桶政策)

 

创建一个EMR cluster用于拷贝对象

为了并行copy in place,请在Amazon EMR上运行Spark job。我们写了一个bash脚本(详见:this GitHub repository)用于促进EMR cluster的创建和EMR步骤提交。请您先克隆GitHub repo来运行该脚本,然后参考以下启动EMR cluster:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

注意:运行bash脚本会产生AWS费用。默认会创建两个Amazon EC2实例,一个m4.xlarge 和一个m4.2xlarge。由于启用了自动终止,当cluster完成in-place复制后会自动终止。脚本会执行以下任务:

  1. 创建默认EMR角色(EMR_EC2_DefaultRole和EMR_DefaultRole).
  2. 上传用于bootstrap 动作和steps的文件至Amazon S3(我们采用crr-preexisting-demo-inventory来存储这些文件)
  3. 使用create-cluster 创建带Apache Spark的EMR集群。

完成cluster的配置之后:

  1. 通过bootstrap action安装boto3awscli
  2. 会执行两个步骤,先复制Spark应用到master node上,然后再运行该应用。

以下是Spark应用中的重点部分。您可以点击amazon-s3-crr-preexisting-objectsrepo在Github上找到本例的完整编码。

在此,我们从通过AWS Glue Data Catalog注册的表中挑选了replication_status为”FAILED” or “”的记录。

query = """
         SELECT bucket, key
         FROM {}
         WHERE dt = '{}'
         AND (replication_status = '""'
         OR replication_status = '"FAILED"')
         """.format(inventory_table, inventory_date)
							
print('Query: {}'.format(query))

crr_failed = spark.sql(query)

针对之前查询中返回的每个key,我们调用copy_object功能

def copy_object(self, bucket, key, copy_acls):
         dest_bucket = self._s3.Bucket(bucket)
         dest_obj = dest_bucket.Object(key)

         src_bucket = self._s3.Bucket(bucket)
         src_obj = src_bucket.Object(key)
		 
	 # Get the S3 Object's Storage Class, Metadata,
	 # and Server Side Encryption
         storage_class, metadata, sse_type, last_modified = \
             self._get_object_attributes(src_obj)
			 
	 # Update the Metadata so the copy will work
     metadata['forcedreplication'] = runtime

     # Get and copy the current ACL
	 if copy_acls:
              src_acl = src_obj.Acl()
              src_acl.load()		  
              dest_acl = {
                  'Grants': src_acl.grants,
                  'Owner': src_acl.owner
              }

             params = {
	         'CopySource': {
                     'Bucket': bucket,
	             'Key': key
                 },
                 'MetadataDirective': 'REPLACE',
		 'TaggingDirective': 'COPY',
		 'Metadata': metadata,
		 'StorageClass': storage_class
             }

       # Set Server Side Encryption
	   if sse_type == 'AES256':
           params['ServerSideEncryption'] = 'AES256'
	   elif sse_type == 'aws:kms':
           kms_key = src_obj.ssekms_key_id
           params['ServerSideEncryption'] = 'aws:kms'
           params['SSEKMSKeyId'] = kms_key

      # Copy the S3 Object over the top of itself,
	  # with the Storage Class, updated Metadata,
	  # and Server Side Encryption
      result = dest_obj.copy_from(**params)
				 
	  # Put the ACL back on the Object
	  if copy_acls:
          dest_obj.Acl().put(AccessControlPolicy=dest_acl)
					 
	     return {
		     'CopyInPlace': 'TRUE',
             'LastModified': str(result['CopyObjectResult']['LastModified'])
             }
 
        
注意:在Spark应用添加了一个forcedreplication key到对象的metadata中。之所以这么做是因为Amazon S3不允许您在没有改变对象或其metadata的情况下执行copy in place  

通过在Amazon Athena中运行一个查询来验证EMR工作是否成功

Spark应用将结果输出到S3。您可以用Amazon Athena创建另一个外部表并用AWS Glue Data Catalog来注册。然后用Athena来查询表以确保此次copy-in-place操作是成功的。
CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;
 
       

查询结果在控制台上显示如下:


虽然这表明此次copy-in-place操作是成功的,CRR仍然需要复制对象。接下来的inventory文件显示对象复制状态为COMPLETED。您也可以在console上验证preexisting-*.txt and failed-*.txt是否为COMPLETED状态。


值得提醒的是,因为CRR要求存储桶开启了多版本的功能,copy-in-place操作会产生对象的另一个版本,对此您可以用S3 lifecycle policies来管理过期的版本。

 

结论

在本文中,我们展示了如何用Amazon S3 inventory, Amazon Athena, AWS Glue Data Catalog和Amazon EMR来对预先存在的和之前复制失败的对象进行规模化的copy-in-place。

注意:Amazon S3 batch operations是复制对象的备选方案。区别在于S3 batch operations 不会检查每一个对象目前的属性和设置对象的ACLs和存储级别,以及对每个对象逐个进行加密。如想了解更多相关信息,请参考Amazon S3 Console User Guide中的Introduction to Amazon S3 Batch Operations

本篇作者

Michael Sambol

AWS 高级顾问,他获得了佐治亚理工学院的计算机科学硕士学位。Michael 喜欢锻炼、打网球、旅游和看西部电影

Chauncy McCaughey

AWS 高级数据架构师,他目前在做的业余项目是利用驾驶习惯和交通模式的统计分析来了解自己是如何做到总是开在慢车道的

校译作者

陈昇波

亚马逊AWS解决方案架构师,负责基于AWS的云计算方案架构的咨询和设计,同时致力于AWS云服务在国内的应用和推广。现致力于网络安全和大数据分析相关领域的研究。在加入AWS之前,在爱立信东北亚区担任产品经理,负责产品规划和方案架构设计和实施,在用户体验管理以及大数据变现等服务方面有丰富经验。