亚马逊AWS官方博客

使用Sqoop实现RDS MySQL到Redshift的数据同步

希腊有一个著名的谷堆悖论。“如果1粒谷子落地不能形成谷堆,2粒谷子落地不能形成谷堆,3粒谷子落地也不能形成谷堆,依此类推,无论多少粒谷子落地都不能形成谷堆。但是,事实并非如此。”
这个悖论说的,就是告诉我们量变产生质变,需要一个明显的分割线。如果说,量是一个量化的数据,质是一个结论的话。那么,数据分析做的,就是要分析量,从而引向“定性”、”定质”。定量的了解历史的规律(“质”),从而预测未来。
近几年,大数据风靡全球,越来越多的企业利用MapReduce,Hive,Spark等计算框架和工具来为自身的业务提供帮助,在AWS上,我们也提供了诸多的服务,帮助用户能够快速地构建起适合自身需求的大数据分析架构,其中,Amazon Redshift是性能优异并且完全托管的PB级别数据仓库服务,提供了标准SQL数据库访问接口,并且可以十分方便地与现有的主流商业智能数据分析工具整合,构建企业级数据仓库。

然而,大部分企业的核心数据都存储在关系型数据库中,如何能够有效地将这部分存量数据以及后续的增量数据导入Redshift中呢?本文介绍一种使用开源的Apache Sqoop工具,帮助我们轻松实现这一过程。

配置步骤:

第一步 准备工作

1.1 修改MySQL中的表结构

为了能够实现增量同步,需要在MySQL表中增加一列时间戳,该列能够自动记录行被插入更新的时间
为了能够实现同步删除操作,需要在MySQL表中增加一列删除记号列,应用对数据库的删除通过标记该列完成,而不是通过传统的delete语句,因为通常对于曾经存在过的数据,也有分析的意义

本例需要同步的表为country,orders,user,其中country表为Mycat中的全局表,在两台RDS mysql1和mysql2中都有全部信息,orders和user表为Mycat中的分片表,信息分布在RDS mysql1和mysql2中

mycat_sequence表是用于记录其他表自增字段信息的功能表,无需同步到Redshift中分析

执行如下语句添加两列

alter table country add ifdelete boolean NOT NULL default 0;
alter table country add lastmodified TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMEST AMP;

1.2 创建EMR集群

注意勾选上Hive和Sqoop,同时目前AWS EMR最新的版本为5.4.0,其中对一些组件的版本进行了更新,不过Hive和Sqoop的版本与本文一致

注意选择相应的VPC和子网,子网需要有internet的路由方便之后ssh登入

选择登入的密钥对,Master安全组使用默认的ElasticMapReduce-master,不用修改

启动EMR集群后,修改Master节点的安全组,添加允许公网ssh访问

在EMR界面获取master节点ssh登入的信息

1.3 创建Redshift数据仓库

首先创建Redshift使用的安全组,放行所有源访问5439端口的权限

分别在cn-north-1a和cn-north-1b两个可用区中创建两个子网给Redshift使 用,由于之后会通过公网连接Redshift,这两个子网需要有到internet的路由

在Redshift中创建子网组,选上之前创建的两个子网组

创建Redshift参数组

创建Redshift集群实例

选择之前创建的参数组,VPC,子网组和安全组,开启公网访问

获取连接Redshift的JDBC驱动及连接的URL信息

驱动如果无法下载,也可以从如下连接下载

https://s3---cn-north-1.amazonaws.com.rproxy.goskope.com.cn/junyublog/RedshiftJDBC41-1.1.17.1017.jar

1.4 创建并保存access key和secret access key

之后从 S3 中同步数据到Redshift时需要提供access key和secret access key信息,这边测试时可以全部放开权限

在IAM中增加一个用户并赋予权限

下载存有access key和secret access key的CSV文件

1.5 创建S3的bucket桶

S3会作为Hive表的底层存储

第二步 创建Hive表

Hive表为RDS到Redshift数据同步的中间表,底层使用S3作为存储,另外由于Hive的表名不能是user,这里使用users

exit; 退出hive

第三步 安装MySQL JDBC驱动(可选)

下载安装JDBC驱动,最新版的EMR不需要,如果在运行Sqoop的时候报找不到驱动时需要手动安装

ssh登入EMR的master节点

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.40.tar.gz

tar xzvf mysql-connector-java-5.1.40.tar.gz

cp mysql-connector-java-5.1.40/ mysql-connector-java-5.1.40-bin.jar /usr/bin/sqoop/lib/

第四步 修改java权限,否则执行Sqoop job会有warning和error

vim /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.121-0.b13.29.amzn1.86_64/jre/lib/security/java.policy
在grant{}中添加如下语句

permission javax.management.MBeanTrustPermission “register”;

第五步 配置Sqoop

5.1 创建Sqoop访问数据库的密码,XXXXXX 为创建RDS mysql1和mysql2时赋予的账号密码

echo –n “XXXXXX” > /home/hadoop/sqoop.password

5.2 创建并执行Sqoop任务

其中由于country表是全局表,所以这里只是从mysql1的read replica读副本中同步,而user和orders表由于是分片表,所以需要分别从mysql1和mysql2各自的读副本中同步数据
需要注意修改如下指令中的URL为自己RDS读副本的URL,同时,对于user和orders,两条sqoop job是不同的,第一条job中通过hive-overwrite参数覆盖上一次job执行后遗留在Hive表中的数据,第二条job是没有hive-overwrite参数的,否则会把上一条job从mysql1中同步的数据错误地删除

下面进行第一次同步,分别执行如下命令将RDS中的数据同步到Hive表中,第一次执行是全备,根据表中数据量,时间可能较长

sqoop job –exec mysql1_country

sqoop job –exec mysql1_user

sqoop job –exec mysql2_user

sqoop job –exec mysql1_orders

sqoop job –exec mysql2_orders

进入Hive,查看表是否同步成功

第六步 将Hive表中的数据同步到Redshift中

使用JDBC客户端连接Redshift,这里使用SQL Workbench
分别创建country,user,orders表及各自的中间表,同时将Hive存在S3中的数据同步到中间表中,其中aws_access_key_id和aws_secret_access_key为准备工作中在IAM下载的CSV中的值

查看stage表中的信息是否同步正确

通过如下事务插入更新country,users,orders表,并删除中间表

查看数据是否正确插入更新到country,users,orders表中

第七步 执行增量同步

人为对MySQL中的表进行适当的增删改操作,本例对country表执行插入操作, 对user表执行插入和更新操作,对orders表执行删除操作,注意到时间戳为操作执行时的时间

ssh登入EMR的master节点,再次运行sqoop job将MySQL中插入更新的数据同步到Hive表中
sqoop job –exec mysql1_country

sqoop job –exec mysql1_user

sqoop job –exec mysql2_user

sqoop job –exec mysql1_orders

sqoop job –exec mysql2_orders

在Sqoop执行输出中可以看到,sqoop job会记录之前执行任务的时间,并调整where语句来实现增量同步数据,所以如果需要多次测试,需要删除job(sqoop job –delete XXX)并重新创建,这样会再次全量同步

进入Hive,查看增量数据是否同步成功

使用SQL Workbench通过JDBC连接Redshift,执行如下命令将增量数据同步到中间表

执行如下事务将中间表的数据插入更新到country,users,orders表中

查看数据是否正确插入更新到country,users,orders表中

之后在Redshift中的分析语句都可以通过添加where ifdelete=false排除删除的记录,同时可以定期删除ifdelete标记为false的记录,释放存储空间

作者介绍:

余骏

AWS解决方案架构师,负责基于AWS的云计算方案架构的咨询和设计,同时致力于AWS云服务在国内的应用和推广。在加入AWS之前,他在思科中国担任系统工程师,负责方案咨询和架构设计,在企业私有云和基础网络方面有丰富经验。