当前位置:   article > 正文

全方位揭秘!大数据从0到1的完美落地之sqoop导入实战_大数据导入依赖

大数据导入依赖

企业微信截图_16865733749322

Sqoop导入实战

Sqoop-import

案例1

表没有主键,需要指定map task的个数为1个才能执行

Sqoop导入原理:

​ Sqoop默认是并行的从数据库源导入数据。您可以使用-m或–num-mappers参数指定用于执行导入的map任务(并行进程)的数量。每个参数都取一个整数值,该整数值对应于要使用的并行度。默认情况下,使用四个任务。一些数据库可以通过将这个值增加到8或16来改善性能。

​ 默认情况下,Sqoop将标识表中的主键id列用作拆分列。从数据库中检索分割列的高值和低值,map任务操作整个范围的大小均匀的组件。譬如ID的范围是0-800,那么Sqoop默认运行4个进程,通过执行 SELECT MIN(id), MAX(id) FROM emp找出id的范围,然后把4个任务的id设置范围是(0-200),(200-400),(400-600),(600-800)

但是当一个表没有主键时,上面的切分就无法进行,Sqoop导入时就会出错,这时候可以通过-m把mapper的数量设为1,只有一个Mapper在运行,这时候就不需要切分,也可以避免主键不存在时候报错的问题.

#错误信息
ERROR tool.ImportTool: Import failed: No primary key could be found for table emp. Please specify one with --split-by or perform a sequential import with '-m 1'.

  • 1
  • 2
  • 3

导入代码:

[root@qianfeng01 sqoop-1.4.7]# bin/sqoop import --connect jdbc:mysql://localhost:3306/qfdb \
--username root --password 123456 \
--table emp -m 1
  • 1
  • 2
  • 3
DBMS-HDFS

案例2

表没有主键,使用–split-by指定执行split的字段

问题同上,如果表没有主键,那么还有个办法就是手工指定要拆分的列,通过--split-by来指定

[root@qianfeng01 sqoop-1.4.7]# bin/sqoop import --connect jdbc:mysql://localhost:3306/qfdb \
--username root --password 123456 \
--table emp \
--split-by empno \
--delete-target-dir \
--target-dir hdfs://qianfeng01:8020/sqoopdata/emp
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
-- 出错
Caused by: java.sql.SQLException: null,  message from server: "Host 'qianfeng01' is not allowed to connect to this MySQL server"
  • 1
  • 2

解决方案:

连接MySql

[root@qianfeng01 sqoop-1.4.7]# mysql -uroot -p
  • 1

(执行下面的语句 .:所有库下的所有表 %:任何IP地址或主机都可以连接)

mysql> GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'mysql' WITH GRANT OPTION;
	FLUSH PRIVILEGES;
  • 1
  • 2

案例3:条件导入(增量导入)

需要导入的数据不是全部的,而是带条件导入

[root@qianfeng01 sqoop-1.4.7]# bin/sqoop import --connect jdbc:mysql://localhost:3306/qfdb \
--username root --password 123456 \
--table emp \
--split-by empno \
--where 'empno > 7777' \
--target-dir hdfs://qianfeng01:8020/sqoopdata/emp
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

案例4:部分字段导入

要导入的数据,不想包含全部字段,只需要部分字段

注意:这种跟where差不多,使用时更灵活一些

[root@qianfeng01 sqoop-1.4.7] bin/sqoop import --connect jdbc:mysql://localhost:3306/qfdb \
--username root --password 123456 \
--split-by empno \
--query 'select empno,ename,job from emp where empno > 7777 and $CONDITIONS' \
--target-dir hdfs://qianfeng01:8020/sqoopdata/7
  • 1
  • 2
  • 3
  • 4
  • 5
DBMS-Hive

案例5:将数据导入到Hive中

[root@qianfeng01 sqoop-1.4.7]# bin/sqoop import --connect jdbc:mysql://localhost:3306/qfdb 
--username root 
--password 123456
--table emp 
--hive-import 
-m 1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
DBMS-HBase

把数据导入到HBase中

hbase中创建表:
create 'mysql2hbase','info'

# 方法一:
[root@qianfeng01 sqoop-1.4.7]# sqoop import  --connect jdbc:mysql://qianfeng01:3306/qfdb \
--username root \
--password 123456 \
--table emp \
--hbase-table mysql2hbase \
--column-family info \
--hbase-create-table \
--hbase-row-key empno \
-m 1 \


注意:如果使用的是Hbase2.X版本以上,那么需要添加依赖(1.6版本的依赖),不然会出现如下错误

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.HBaseAdmin.<init>(Lorg/apache/hadoop/conf/Configuration;)V

下载安装包:https://archive.apache.org/dist/hbase/1.6.0/
操作方式:将1.6版本的Hbase的依赖lib全部拉去到Sqoop对应文件夹lib下面,再次执行上面的命令

测试:
hbase(main):008:0> scan 'mysql2hbase'
ROW                                      COLUMN+CELL
 1                                       column=info:hobby, timestamp=1585852383291, value=1
 1                                       column=info:profile, timestamp=1585852383291, value=\xE6\xBC\x94\xE5\x91\x98
 1                                       column=info:uname, timestamp=1585852383291, value=bingbing
 2                                       column=info:hobby, timestamp=1585852383291, value=2
 2                                       column=info:profile, timestamp=1585852383291, value=\xE6\xBC\x94\xE5\x91\x98
 2                                       column=info:uname, timestamp=1585852383291, value=feifei
 3                                       column=info:hobby, timestamp=1585852383291, value=1
 3                                       column=info:profile, timestamp=1585852383291, value=\xE5\x94\xB1\xE6\xAD\x8C
 3                                       column=info:uname, timestamp=1585852383291, value=\xE5\x8D\x8E\xE4\xBB\x94
3 row(s) in 2.2770 seconds


# 方法二:
hbase(main):004:0> create 'mysql2hbase11','info'
[root@qianfeng01 sqoop-1.4.7]# sqoop import  --connect jdbc:mysql://qianfeng01:3306/qfdb \
--username root \
--password 123456 \
--table emp \
--hbase-table mysql2hbase11 \
--delete-target-dir \
--column-family info \
--hbase-create-table \
--hbase-row-key empno \
-m 1 \
--hbase-bulkload 

运行后在结尾处有结果(Trying to load hfile):
s20/04/03 10:41:11 WARN mapreduce.LoadIncrementalHFiles: Skipping non-directory hdfs://qianfeng01:8020/user/root/user_info/_SUCCESS
h20/04/03 10:41:12 INFO hfile.CacheConfig: CacheConfig:disabled
a20/04/03 10:41:12 INFO mapreduce.LoadIncrementalHFiles: Trying to load hfile=hdfs://qianfeng01:8020/user/root/emp/info/1aef7d02d1a646008f18d49cbb23f20f first=1 last=3


注:
-- hbase-bulkload 不用输入路径,会自己默认导出到某目录,然后完成后自行装载数据到hbase表中;
-m 需要再--hbase-bulkload之前出现

# 测试:
hbase(main):004:0> scan 'mysql2hbase1'
ROW                                      COLUMN+CELL
 1                                       column=info:hobby, timestamp=1585881667767, value=1
 1                                       column=info:profile, timestamp=1585881667767, value=\xE6\xBC\x94\xE5\x91\x98
 1                                       column=info:uname, timestamp=1585881667767, value=bingbing
 2                                       column=info:hobby, timestamp=1585881667767, value=2
 2                                       column=info:profile, timestamp=1585881667767, value=\xE6\xBC\x94\xE5\x91\x98
 2                                       column=info:uname, timestamp=1585881667767, value=feifei
 3                                       column=info:hobby, timestamp=1585881667767, value=1
 3                                       column=info:profile, timestamp=1585881667767, value=\xE5\x94\xB1\xE6\xAD\x8C
 3                                       column=info:uname, timestamp=1585881667767, value=\xE5\x8D\x8E\xE4\xBB\x94
3 row(s) in 0.6170 seconds

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
增量导入数据
使用场景
  1. 经常被操作不断产生数据的表,建议增量。
  2. 当某表基数很大,但是变化很小,也建议增量
使用方式A
  1. query where : 能精确锁定数据范围

  2. incremental : 增量,最后记录值来做的

query where方式

通过查询具体日期的方式进行导入

新建一个脚本文件

mysql中的表格:
 CREATE TABLE qfdb.sales_order(
	orderid INT PRIMARY KEY,
	order_date DATE
	)
[root@qianfeng01 sqoop-1.4.7] vi ./import.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

写入以下内容:

#!/bin/bash
# yesterday=`date -d "1 days ago" "+%Y-%m-%d"`
yesterday=$1
sqoop import --connect jdbc:mysql://qianfeng01:3306/qfdb \
--username root \
--password 123456 \
--query "select * from sales_order where DATE(order_date) = '${yesterday}' and \$CONDITIONS" \
--delete-target-dir \
--target-dir /user/hive/warehouse/sales_order/dt=${yesterday} \
-m 1 \
--fields-terminated-by '\t' 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

执行

[root@qianfeng01 sqoop-1.4.7]# bash import.sh 2019-02-01
  • 1

通过下面HDFS可以快速查询到结果:

 [root@qianfeng01 sqoop-1.4.7]# hdfs dfs -cat /user/hive/warehouse/sales_order/dt=2019-01-01/pa*
  • 1
increment的append方式
#将会手动维护last-value 
[root@qianfeng01 sqoop-1.4.7]# sqoop import --connect jdbc:mysql://qianfeng01:3306/qfdb \
--username root \
--password 123456 \
--table sales_order \
--driver com.mysql.jdbc.Driver \
--target-dir /user/hive/warehouse/sales_order1/dt=2019-12-30 \
--split-by order_id \
-m 1 \
--check-column order_number \
--incremental append \
--last-value 800 \
--fields-terminated-by '\t'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
注意:--last-value 80000 \  从80000开始检查,如果后面有新的数据就会进行增量导入,如果没有新的数据会提示下面的信息
21/12/12 01:52:16 INFO tool.ImportTool: Incremental import based on column order_date
21/12/12 01:52:16 INFO tool.ImportTool: No new rows detected since last import.
  • 1
  • 2
  • 3

使用下面命令查看:

[root@qianfeng01 sqoop-1.4.7]# hdfs dfs -cat /user/hive/warehouse/sales_order1/dt=2019-12-30/pa*
  • 1
导入填充空值数据
[root@qianfeng01 ~]# sqoop import --connect jdbc:mysql://localhost:3306/qfdb --username root --password 123456 --table emp --delete-target-dir --target-dir hdfs://qianfeng01:9820/sqoopdata/emp --null-string '\\N' --null-non-string '0'
  • 1

关键参数

--null-string '\\N'  ## 遇到空字符串会填充\N字符
--null-non-string '0' # 遇到空数字会填充0
  • 1
  • 2

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/401853
推荐阅读
相关标签
  

闽ICP备14008679号