Sqoop 指南
Sqoop 是一个将数据在关系数据库(如 MySQL/Oracle 等)和大数据产品(如 Hadoop/Hive/HBase等)之间导入/导出的有效工具。本文介绍了如何使用青云 Sqoop 服务将数据在 MySQL 和 Hadoop/Hive/HBase 间导入/导出的常用场景和具体方法。其它关系数据库如 Oracle/SQL Server 在 Sqoop 中的使用方式与 MySQL 类似,关于 Sqoop 使用的所有详细信息,可参阅 Sqoop 官方网站 。
创建 Sqoop 主机
要使用Sqoop 服务,首先您需要在映像市场中使用 BigData Client 映像创建一台主机,并将该主机加入和大数据集群(如 Hadoop/Hive/HBase等)相同的私有网络。创建成功后的主机已安装好 JDK/MySQL/Hadoop/Hive/HBase/Sqoop 等软件及客户端以供使用。另外,青云的主机支持纵向在线伸缩,还提供了监控告警等功能,使得 Sqoop 服务的使用和管理更加方便。
配置 Sqoop 服务
- 如果要将数据在 MySQL 和 Hadoop 间导入/导出,请先参考 Hadoop 指南——创建 Hadoop Client主机 配置 Hadoop 客户端
- 如果要将数据在 MySQL 和数据仓库 Hive 间导入/导出,请先参考 Hive 指南——创建 Hive 主机 配置 Hive 客户端
- 如果要将数据在 MySQL 和列式数据库 HBase 间导入/导出,请先参考 HBase 指南——创建 HBase Client 主机 配置 HBase 客户端
测试 Sqoop 服务
测试场景准备 0: 在新创建的 Sqoop 主机中启动 MySQL 服务
注解 如果您的 MySQL 在远端,请保证大数据集群中的主机可以远程访问到 MySQL,例如,执行 grant all privileges on . to ‘your_user’@’%’ identified by ‘your_password’ with grant option; flush privileges; 来使得 your_user/your_password 可以远程访问 MySQL 服务。
# 切换到 ubuntu 用户
su ubuntu
# 启动 MySQL 服务
service mysql.server start
Starting MySQL
. *
# 检测 MySQL 服务状态
service mysql.server status
* MySQL running (1298)
测试场景 1:将数据从关系数据库 MySQL 全量导入到 HDFS 中
# 创建数据表 my_blog 并插入测试数据
mysql> create table my_blog(id int not null primary key auto_increment, title varchar(64) not null, mtime timestamp, comment varchar(255));
mysql> insert into my_blog values(null, 'first one', now(), 'hello world');
mysql> insert into my_blog values(null, 'second one', now(), 'my future');
mysql> insert into my_blog values(null, 'third one', now(), 'my story');
mysql> select * from my_blog;
+----+------------+---------------------+-------------+
| id | title | mtime | comment |
+----+------------+---------------------+-------------+
| 1 | first one | 2016-10-11 11:58:43 | hello world |
| 2 | second one | 2016-10-11 11:59:21 | my future |
| 3 | third one | 2016-10-11 11:59:40 | my story |
+----+------------+---------------------+-------------+
# 将 my_blog 表中的数据导入到 HDFS 分布式文件系统中,其中 username 和 password 是连接 MySQL 的测试用户名和密码,实际使用中请自行更改
sqoop-import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog
# 查看导入成功后的文件数据
hadoop fs -cat my_blog/*
1,first one,2016-10-11 11:58:43.0,hello world
2,second one,2016-10-11 11:59:21.0,my future
3,third one,2016-10-11 11:59:40.0,my story
测试场景 2:将数据从关系数据库 MySQL 增量导入到 HDFS 中
这个测试分为 append 追加模式和 lastmodified 更新模式,append 模式用在数据只是增加但没有更新的场景, lastmodified 模式用在数据不但有增加且有更新的情况。其中,append 模式会对相应字段大于 last-value 指定的值之后的记录进行追加导入,lastmodified 模式追加的是相应字段在 last-value 指定的日期之后被更新的记录。
2.1 使用 append 模式的方法如下:
# 模拟往 my_blog 表中新增数据行
insert into my_blog values(null, 'forth one', now(), 'another story');
mysql> select * from my_blog;
+----+------------+---------------------+---------------+
| id | title | mtime | comment |
+----+------------+---------------------+---------------+
| 1 | first one | 2016-10-11 11:58:43 | hello world |
| 2 | second one | 2016-10-11 11:59:21 | my future |
| 3 | third one | 2016-10-11 11:59:40 | my story |
| 4 | forth one | 2016-10-11 12:15:08 | another story |
+----+------------+---------------------+---------------+
# 将 my_blog 表中 id 字段值大于 3 的新增行追加到 HDFS 文件中
sqoop-import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog --check-column id --incremental append --last-value 3
# 查看导入成功后的文件数据
hadoop fs -cat my_blog/*
1,first one,2016-10-11 11:58:43.0,hello world
2,second one,2016-10-11 11:59:21.0,my future
3,third one,2016-10-11 11:59:40.0,my story
4,forth one,2016-10-11 12:15:08.0,another story
如果不想每次执行增量导入时都手动去查寻 last-value 的值,那可以使用 sqoop job 的方式执行增量导入,此时 sqoop 会自动保存上次导入成功后的 last-value 值,使用方式如下:
# 创建一个导入数据的 job 并保存
sqoop job -create myjob2 -- import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog -check-column id --incremental append --last-value 4
# 模拟新增数据
mysql> insert into my_blog values(null, 'fifth one', now(), 'aha moment');
# 执行以上保存的 job 就会开始导入数据
sqoop job --exec myjob2
# 查看结果
hadoop fs -cat my_blog/*
1,first one,2016-10-11 11:58:43.0,hello world
2,second one,2016-10-11 11:59:21.0,my future
3,third one,2016-10-11 11:59:40.0,my story
4,forth one,2016-10-11 12:15:08.0,another story
5,fifth one,2016-10-11 12:58:48.0,aha moment
# 模拟再新增加数据
mysql> insert into my_blog values(null, 'sixth one', now(), 'have a rest');
# 重复执行以上保存的 job 即可将新增数据导入
sqoop job --exec myjob2
# 查看导入后的结果
hadoop fs -cat my_blog/*
1,first one,2016-10-11 11:58:43.0,hello world
2,second one,2016-10-11 11:59:21.0,my future
3,third one,2016-10-11 11:59:40.0,my story
4,forth one,2016-10-11 12:15:08.0,another story
5,fifth one,2016-10-11 12:58:48.0,aha moment
6,sixth one,2016-10-11 13:34:12.0,have a rest
2.2 使用 lastmodified 模式的方法如下:
# 查询出数据行的最近更新时间,作为接下来的数据导入命令的 last-value 参数的值
mysql> select max(mtime) from my_blog;
+---------------------+
| max(mtime) |
+---------------------+
| 2016-10-11 13:34:12 |
+---------------------+
# 创建一个导入数据的 job 并保存,其中 merge-key 是新老数据在做合并时需要参考使用的表字段,last-value 表示自从上次导入后 check-column 这列的最大值, target-dir 表示 HDFS 中将要接收导入数据的文件夹
sqoop job -create myjob3 -- import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog --incremental lastmodified --merge-key id --check-column mtime --last-value '2016-10-11 13:34:12' --target-dir my_blog
# 模拟更新老数据
mysql> update my_blog set mtime=now(),comment='my brand new future' where id=2;
# 模拟新增数据
mysql> insert into my_blog values(null, 'seventh one', now(), 'exciting start');
mysql> select * from my_blog;
+----+-------------+---------------------+---------------------+
| id | title | mtime | comment |
+----+-------------+---------------------+---------------------+
| 1 | first one | 2016-10-11 11:58:43 | hello world |
| 2 | second one | 2016-10-11 14:30:25 | my brand new future |
| 3 | third one | 2016-10-11 11:59:40 | my story |
| 4 | forth one | 2016-10-11 12:15:08 | another story |
| 5 | fifth one | 2016-10-11 12:58:48 | aha moment |
| 6 | sixth one | 2016-10-11 13:34:12 | have a rest |
| 7 | seventh one | 2016-10-11 14:31:12 | exciting start |
+----+-------------+---------------------+---------------------+
# 执行保存的 job 即可导入增量的数据
sqoop job --exec myjob3
# 查看更新和增加的新数据
hadoop fs -cat my_blog/*
1,first one,2016-10-11 11:58:43.0,hello world
2,second one,2016-10-11 14:30:25.0,my brand new future
3,third one,2016-10-11 11:59:40.0,my story
4,forth one,2016-10-11 12:15:08.0,another story
5,fifth one,2016-10-11 12:58:48.0,aha moment
6,sixth one,2016-10-11 13:34:12.0,have a rest
7,seventh one,2016-10-11 14:31:12.0,exciting start
测试场景 3:将数据从 HDFS 导出到 MySQL中
# 在 MySQL 中创建一个表用来存放从 HDFS 中导出的数据
mysql> create table my_blog_back(id int not null primary key auto_increment, title varchar(64) not null, mtime timestamp, comment varchar(255));
# 将 HDFS 中 export-dir 指定文件夹的数据导出到 MySQL 的 my_blog_back 表中
sqoop export --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog_back -export-dir /user/root/my_blog
# 查看成功导出的数据
mysql> select * from my_blog_back;
+----+-------------+---------------------+---------------------+
| id | title | mtime | comment |
+----+-------------+---------------------+---------------------+
| 1 | first one | 2016-10-11 11:58:43 | hello world |
| 2 | second one | 2016-10-11 14:30:25 | my brand new future |
| 3 | third one | 2016-10-11 11:59:40 | my story |
| 4 | forth one | 2016-10-11 12:15:08 | another story |
| 5 | fifth one | 2016-10-11 12:58:48 | aha moment |
| 6 | sixth one | 2016-10-11 13:34:12 | have a rest |
| 7 | seventh one | 2016-10-11 14:31:12 | exciting start |
+----+-------------+---------------------+---------------------+
测试场景 4:当 Hive 中的表为 internal 类型时,将数据从 MySQL 全量导入到 Hive 中
# 模拟创建 Hive 表存放的数据库
hive> create database test;
# 如果 HDFS 中的存在 my_blog 目录,需要删除,因为 Hive 导入时会使用
hadoop fs -rm -r -skipTrash my_blog
# 执行数据的导入,其中 hive-dabase 是 Hive 表 my_blog 所在的数据库
sqoop import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog --hive-database test -hive-import -hive-table my_blog
# 查看成功导入后的数据
hive> select * from my_blog;
OK
1 first one 2016-10-11 11:58:43.0 hello world
2 second one 2016-10-11 14:30:25.0 my brand new future
3 third one 2016-10-11 11:59:40.0 my story
4 forth one 2016-10-11 12:15:08.0 another story
5 fifth one 2016-10-11 12:58:48.0 aha moment
6 sixth one 2016-10-11 13:34:12.0 have a rest
7 seventh one 2016-10-11 14:31:12.0 exciting start
测试场景 5:当 Hive 中的表为 internal 类型时,将数据从 MySQL 增量导入到 Hive 中
# 查询出数据行的最近更新时间,作为接下来的数据导入命令的 last-value 参数的值
mysql> select max(mtime) from my_blog;
+---------------------+
| max(mtime) |
+---------------------+
| 2016-10-11 14:31:12 |
+---------------------+
# 创建导入数据的 job 并保存,其中 target-dir 表示 Hive 表数据在 HDFS 中的存储位置,fields-terminated-by 表示 Hive 表中的每个字段是以什么符号作为结束
sqoop job --create myjob15 -- import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog --hive-database test -hive-table my_blog --incremental lastmodified --merge-key id --check-column mtime --last-value '2016-10-11 14:31:12' --target-dir /user/hive/warehouse/test.db/my_blog --fields-terminated-by '\001'
# 模拟更新老数据和增加新数据
mysql> update my_blog set mtime=now(),comment='share the same change' where id=3 or id=4;
mysql> insert into my_blog values(null, 'eighth one', now(), 'higher value change');
mysql> select * from my_blog;
+----+-------------+---------------------+-----------------------+
| id | title | mtime | comment |
+----+-------------+---------------------+-----------------------+
| 1 | first one | 2016-10-11 11:58:43 | hello world |
| 2 | second one | 2016-10-11 14:30:25 | my brand new future |
| 3 | third one | 2016-10-11 16:37:48 | share the same change |
| 4 | forth one | 2016-10-11 16:37:48 | share the same change |
| 5 | fifth one | 2016-10-11 12:58:48 | aha moment |
| 6 | sixth one | 2016-10-11 13:34:12 | have a rest |
| 7 | seventh one | 2016-10-11 14:31:12 | exciting start |
| 8 | eighth one | 2016-10-11 16:38:23 | higher value change |
+----+-------------+---------------------+-----------------------+
# 执行以上保存的 job
sqoop job --exec myjob15
# 在 Hive 中查看导入的结果
hive> select * from my_blog;
OK
1 first one 2016-10-11 11:58:43.0 hello world
2 second one 2016-10-11 14:30:25.0 my brand new future
3 third one 2016-10-11 16:37:48.0 share the same change
4 forth one 2016-10-11 16:37:48.0 share the same change
5 fifth one 2016-10-11 12:58:48.0 aha moment
6 sixth one 2016-10-11 13:34:12.0 have a rest
7 seventh one 2016-10-11 14:31:12.0 exciting start
8 eighth one 2016-10-11 16:38:23.0 higher value change
测试场景 6:当 Hive 中的表为 external 类型时,将数据从 MySQL 全量导入到 Hive 中
# 模拟在 Hive 中创建一个外部表
hive> create external table my_exblog (
> id string,
> title string,
> mtime string,
> comment string
> )
> ROW FORMAT DELIMITED
> FIELDS TERMINATED BY ','
> LOCATION '/user/hive/warehouse/test.db/my_exblog';
# 创建并保存一个导入数据的 job, 其中各项参数的含义可参照之前测试场景中的命令
sqoop job --create myjob16 -- import --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_blog --incremental lastmodified --merge-key id --check-column mtime --last-value '0' --target-dir /user/hive/warehouse/test.db/my_exblog
# 执行之上保存的 job
sqoop job --exec myjob16
# 查看导入后的数据
hive> select * from my_exblog;
OK
1 first one 2016-10-11 17:09:51.0 share the newer change
2 second one 2016-10-11 14:30:25.0 my brand new future
3 third one 2016-10-11 16:37:48.0 share the same change
4 forth one 2016-10-11 16:37:48.0 share the same change
5 fifth one 2016-10-11 17:09:51.0 share the newer change
6 sixth one 2016-10-11 13:34:12.0 have a rest
7 seventh one 2016-10-11 14:31:12.0 exciting start
8 eighth one 2016-10-11 16:38:23.0 higher value change
9 nineth one 2016-10-11 17:24:37.0 vip treatment
测试场景 7:当 Hive 中的表为 external 类型时,将数据从 MySQL 增量导入到 Hive 中
# 模拟更新老数据和插入新数据
mysql> update my_blog set mtime=now(),comment='external changes' where id=4 or id=9;
mysql> insert into my_blog values(null, 'tenth one', now(), 'external one');
mysql> insert into my_blog values(null, 'tenth one', now(), 'external two');
mysql> select * from my_blog;
+----+-------------+---------------------+------------------------+
| id | title | mtime | comment |
+----+-------------+---------------------+------------------------+
| 1 | first one | 2016-10-11 17:09:51 | share the newer change |
| 2 | second one | 2016-10-11 14:30:25 | my brand new future |
| 3 | third one | 2016-10-11 16:37:48 | share the same change |
| 4 | forth one | 2016-10-12 12:06:18 | external changes |
| 5 | fifth one | 2016-10-11 17:09:51 | share the newer change |
| 6 | sixth one | 2016-10-11 13:34:12 | have a rest |
| 7 | seventh one | 2016-10-11 14:31:12 | exciting start |
| 8 | eighth one | 2016-10-11 16:38:23 | higher value change |
| 9 | nineth one | 2016-10-12 12:06:18 | external changes |
| 10 | tenth one | 2016-10-12 12:06:41 | external one |
| 11 | tenth one | 2016-10-12 12:06:45 | external two |
+----+-------------+---------------------+------------------------+
# 执行保存过的 job
sqoop job --exec myjob16
# 查看增量导入后的数据
hive> select * from my_exblog;
OK
1 first one 2016-10-11 17:09:51.0 share the newer change
10 tenth one 2016-10-12 12:06:41.0 external one
11 tenth one 2016-10-12 12:06:45.0 external two
2 second one 2016-10-11 14:30:25.0 my brand new future
3 third one 2016-10-11 16:37:48.0 share the same change
4 forth one 2016-10-12 12:06:18.0 external changes
5 fifth one 2016-10-11 17:09:51.0 share the newer change
6 sixth one 2016-10-11 13:34:12.0 have a rest
7 seventh one 2016-10-11 14:31:12.0 exciting start
8 eighth one 2016-10-11 16:38:23.0 higher value change
9 nineth one 2016-10-12 12:06:18.0 external changes
测试场景 8:将数据从 Hive 导出到 MySQL 中
# 导出前,需要在 MySQL 中创建相应结构的表
mysql> create table my_exblog_back(id int not null primary key auto_increment, title varchar(64) not null, mtime timestamp, comment varchar(255));
# 执行导出任务, 其中 export-dir 表示要导出的 Hive 表在 HDFS 中的存储路径
sqoop export --connect jdbc:mysql://192.168.120.8/test --username root --password p12cHANgepwD --table my_exblog_back -export-dir /user/hive/warehouse/test.db/my_exblog
# 查看导出成功后的数据
mysql> select * from my_exblog_back;
+----+-------------+---------------------+------------------------+
| id | title | mtime | comment |
+----+-------------+---------------------+------------------------+
| 1 | first one | 2016-10-11 17:09:51 | share the newer change |
| 2 | second one | 2016-10-11 14:30:25 | my brand new future |
| 3 | third one | 2016-10-11 16:37:48 | share the same change |
| 4 | forth one | 2016-10-12 12:06:18 | external changes |
| 5 | fifth one | 2016-10-11 17:09:51 | share the newer change |
| 6 | sixth one | 2016-10-11 13:34:12 | have a rest |
| 7 | seventh one | 2016-10-11 14:31:12 | exciting start |
| 8 | eighth one | 2016-10-11 16:38:23 | higher value change |
| 9 | nineth one | 2016-10-12 12:06:18 | external changes |
| 10 | tenth one | 2016-10-12 12:06:41 | external one |
| 11 | tenth one | 2016-10-12 12:06:45 | external two |
+----+-------------+---------------------+------------------------+
测试场景 9:将数据从关系数据库 MySQL 导入到列式数据库 HBase 中
# 在 HBase 中创建表和字段簇
bin/hbase shell
hbase(main):001:0> create 'my_blog','content'
# 执行导入数据的任务, 其中 hbase-row-key 表示将要以 MySQL 表中的什么字段作为 HBase 表中的 row-key
sqoop-import --connect jdbc:mysql://192.168.120.19/test --username root --password p12cHANgepwD --table my_blog --hbase-table my_blog --column-family content --hbase-row-key id