赞
踩
人工智能AI:Keras PyTorch MXNet TensorFlow PaddlePaddle 深度学习实战(不定时更新)
- 2.Hive 基本操作:
- 1.创建数据库
- 创建数据库 实际是在hdfs文件系统中 /user/hive/warehouse目录下 创建一个文件夹“数据库名.db”
- 默认创建在 <name>hive.metastore.warehouse.dir</name>所配置的 <value>/user/hive/warehouse</value>目录下
- 1.连接 Hive:
- cd /root/hive/bin
- ./hive 或 ./beeline -u jdbc:hive2://NODE1:10000 -n root
- 2.执行 show databases; 默认只有一个 数据库: default
- 3.创建新的数据库:create database [if not exists] 数据库名;
- 4.进入 http://192.168.25.100:50070 或 http://node1:50070
- 如果点击 user目录显示以下信息表示当前用户没有权限访问该目录,修改权限:hadoop fs -chmod -R 777 /user(可以直接关闭HDFS权限检查来解决访问的权限问题)
5.进入 /user/hive/warehouse,即能看到所创建的 数据库(文件夹):
- 6.数据库相关操作
- 1.创建数据库:create database [if not exists] 数据库名;
- 2.显示所有数据库:show databases;
- 3.删除数据库:(drop database 数据库名)
- drop database [if exists] 数据库名 [restrict|cascade];
- 默认情况下,hive不允许删除含有表的数据库,要先将数据库中的表清空才能drop,否则会报错
- restrict:默认是restrict,表示有限制的
- cascade:加入cascade关键字,可以强制删除一个数据库
- 强制删除一个数据库:drop database if exists users cascade;
- 4.切换数据库:use 数据库名;
-
-
- 2.在指定的数据库下创建表,并且往表(文件夹)中 存储结构化数据的文件
- 1.在指定的数据库下创建表:首先执行“use 数据库名;”,才然后能在指定数据库下创建表,
- 实际上是在“/user/hive/warehouse/数据库名.db”目录下创建一个名为“表名”的文件夹。
- 2.然后往“/user/hive/warehouse/数据库名.db/表名”目录中存入结构化数据的文件,
- 创建表的语句中必须指定“对应结构化数据的”分隔符。
-
- 1.创建表:
- 1.在创建表之前,必须先指定在哪个数据库下创建表:use 数据库名;
- 2.该表必须映射指定数据库(文件夹)下的结构化数据的文件。
- 在创建表的时候 必须加上 结构化数据的文件中对应的分割符,那么结构化数据才能映射到表中。
- 3.格式:create table 表名(id int, 字段名 类型) [partitioned by (分区字段名 类型)] row format delimited fields terminated by '分隔符';
- 例子:create table nagisa(name string,num1 int,num2 int) row format delimited fields terminated by ',';
- 4.显示当前数据库下的所有表:show tables;
- 5.复制一张表:只复制表结构,不复制表数据
- create table 表名 like 表名;
- 2.删除表:drop table 表名
-
- 3.三种方式把 结构化数据的文件 存到 数据库表中:方式一 和 方式二 作用相同
- 1.方式一:把 linux的结构化数据的文件 存到 hdfs文件系统目录下的某表中
- 格式:LOAD DATA local INPATH 'linux本地的结构化数据文件所在的绝对路径' [overwrite] INTO TABLE 表名;
- 属性解析:
- 1.有LOCAL 表示从本地文件系统加载(文件会被拷贝到HDFS中)
- 2.无LOCAL 表示从HDFS中加载数据(注意:文件直接被移动,而不是拷贝,并且文件名都不带改的)
- 3.没有OVERWRITE 会直接APPEND,有OVERWRITE 表示是否覆盖表中数据(或指定分区的数据)
-
- 2.方式二:把 linux本地的结构化数据的文件 存到 hdfs文件系统目录下的某表中
- 把 linux的结构化数据的文件 存到 hdfs文件系统目录下的 “/user/hive/warehouse/数据库名.db/表名”的文件夹中,相当于存到 数据库表中,
- 格式:hadoop fs -put linux本地的结构化数据文件所在路径 hdfs文件系统中“表”所在路径
- hadoop fs -put /root/结构化数据文件名 /user/hive/warehouse/数据库名.db/表名
- 例子:hadoop fs -put /root/1.txt /user/hive/warehouse/rimengshe.db/nagisa
- 3.方式三:
- 把 hdfs中文件夹中所有数据文件 导入到 某hive表中 或 某分区的hive中
- 格式一:load data inpath 'hdfs文件系统下绝对路径' [overwrite] into table 表名 partition(分区字段名='分区字段值');
- 格式二:load data inpath 'hdfs文件系统下绝对路径' [overwrite] into table 表名;
- 例子:load data inpath '/weblog/preprocessed/' overwrite into table ods_weblog_origin partition(datestr='20130918');
- 属性解析:
- 1.有LOCAL 表示从本地文件系统加载(文件会被拷贝到HDFS中)
- 2.无LOCAL 表示从HDFS中加载数据(注意:文件直接被移动,而不是拷贝,并且文件名都不带改的)
- 3.没有OVERWRITE 会直接APPEND,有OVERWRITE 表示是否覆盖表中数据(或指定分区的数据)
-
- 4.两种方式 把查询的数据导入到表中:
- 1.create table 表名 as select ...from 表名
- 2.insert into table 表名 select ...from 表名
- insert overwrite table 表名 select ...from 表名 # overwrite 会覆盖表中原有的所有数据,没有overwrite 有into 表示追加
- 例子:insert into table 表名 select ...from (多行多列select ... from ...) a join 表名 b on a.字段=b.字段 and a.字段=b.字段
- group by a.字段,a.字段 having a.字段=xx值 and a.字段 is not null and 聚合函数 order by a.字段 asc,a.字段 desc;
- 3.insert into table 表名 partition(分区字段名='分区字段值') select ...from 表名
- insert overwrite table 表名 partition(分区字段名='分区字段值') select ...from 表名 # overwrite 会覆盖表中原有的所有数据,没有overwrite 有into 表示追加
- 3.查询表中数据:
- “表”映射“结构化数据的文件中的”结构化数据
- 格式:select * from 表名;
- select * from 表名 where 分区字段名='分区字段值';
- select 表名.* from 表名 where 表名.分区字段名='分区字段值';
- 4.创建表时指定 分区字段:
- 1.单分区建表语句:
- 创表语句中只带有一个分区字段,那么hdfs文件系统中的完整路径是:/user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值
- create table 表名(id int, 字段名 类型) partitioned by (分区字段名 类型) row format delimited fields terminated by '分隔符';
-
- 2.双分区建表语句:
- 创表语句中带有两个分区字段。
- 那么hdfs文件系统中的完整路径是:/user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值/分区字段名2=分区字段值
- create table 表名(id int, 字段名 类型) partitioned by (分区字段名1 类型, 分区字段名2 类型) row format delimited fields terminated by '分隔符';
-
- 3.partitioned by (分区字段名 类型):
- 1.分区字段是虚拟的字段,分区字段不存在于结构化数据的文件中,但分区字段存在于表结构中。
- 2.创建表的同时带有分区字段的话,那么该表被称为分区表,并且会在hdfs文件系统中的“/user/hive/warehouse/数据库名.db/表名”目录下
- 创建名为“分区字段名=分区字段值”文件夹。
- 3.分区字段的作用:
- “分区字段名=分区字段值”用于作为where子句中的查询条件。
- 辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
-
- 4.把结构化数据的文件存储到单分区表中:
- 即把结构化数据的文件存储到“/user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值/分区字段名2=分区字段值”的目录下
- 1.第一种方式:LOAD DATA local INPATH '结构化数据文件所在的绝对路径' INTO TABLE 表名 partition(分区字段名='分区字段值');
- 2.第二种方式:hdfs dfs -put /root/xx.txt /user/hive/warehouse/数据库名.db/表名/分区字段名=分区字段值
-
- 5.把结构化数据的文件存储到双分区表中:
- 即把结构化数据的文件存储到“/user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值”的目录下
- 1.第一种方式:LOAD DATA local INPATH '结构化数据文件所在的绝对路径' INTO TABLE 表名 partition(分区字段名1='分区字段值', 分区字段名2='分区字段值');
- 2.第二种方式:hdfs dfs -put /root/xx.txt /user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值/分区字段名2=分区字段值
-
- 6.分区字段的查询:
- “分区字段名=分区字段值”用于作为where子句中的查询条件。
- 如:select * from 表名 where 分区字段名='分区字段值';
- select 表名.* from 表名 where 表名.分区字段名='分区字段值';
-
- 7.查看表中所有的分区:
- show partitions 表名;
-
-
- 4.创建表时指定 分隔符:
- 1.格式:create table 表名(id int, 字段名 类型) partitioned by (分区字段名 类型) row format delimited fields terminated by '分隔符';
- 2.row format delimited fields terminated by '分隔符':
- 对应的是结构化数据中的分隔符。如果结构化数据中带有分隔符的话,那么创建表时也必须指定分隔符。
- 因为只有这样,结构化数据才能映射到表中,否则表中显示的数据都为null。
-
- 3.复杂类型的数据表指定分隔符
- ROW FORMAT DELIMITED 每行中的分隔格式
- [FIELDS TERMINATED BY char] 以某字符来分割每个字段值
- [COLLECTION ITEMS TERMINATED BY char] Array数组中以某字符作为每个元素的分割符/以某字符来分割每个Map集合
- [MAP KEYS TERMINATED BY char] Map集合中以某字符来作为键值对的分割符
- [LINES TERMINATED BY char] 以某字符来分割每行
-
- 1.结构化数据如下:
- zhangsan beijing,shanghai,tianjin,hangzhou
- wangwu shanghai,chengdu,wuhan,haerbin
-
- 那么该表中创建分隔符的语句是:
- create table 表名(name string,字段名 array<string>) row format delimited fields terminated by '\t' collection items terminated by ',';
- 参数:
- 1.字段名 array<string>:表示该字段的类型是array数组,该数组中的每个元素的类型是string,每个元素值是字符串
- 2.row format delimited fields terminated by '\t':每个字段值之间的分隔符是'\t'
- 3.collection items terminated by ',':
- collection items 可把字段值数据转化为 Array数组 / Map集合。
- 此处字段值中的分隔符是',',那么由','进行分割出来的每个值作为数组中的元素值。
-
- 2.结构化数据如下:
- 1,zhangsan,唱歌:非常喜欢-跳舞:喜欢-游泳:一般般
- 2,lisi,打游戏:非常喜欢-篮球:不喜欢
-
- 那么该表中创建分隔符的语句是:
- create table t_map(id int, name string, 字段名 map<string,string>)
- row format delimited fields terminated by ','
- collection items terminated by '-'
- map keys terminated by ':' ;
-
- 参数:
- 1.字段名 map<string,string>:表示该字段的类型是map集合,该map集合的key和value分别都是string
- 2.map keys terminated by ':'
- Map集合中的键值对数据以':'作为分隔符,分割出key和value。
- 3.row format delimited fields terminated by ',':每个字段值之间的分隔符是','
- 4.collection items terminated by '-':
- collection items 可把字段值数据转化为数组/Map集合。
- 此处字段值中的分隔符是'-',那么由'-'进行分割出来的每个值作为Map集合中的键值对。
-
-
- 3.本地模式:
- # 设置本地模式(仅需当前机器)执行查询语句,不设置的话则需要使用yarn集群(多台集群的机器)执行查询语句
- # 本地模式只推荐在开发环境开启,以便提高查询效率,但在生产上线环境下应重新设置为使用yarm集群模式
- set hive.exec.mode.local.auto=true;
-
-
- 4.bVisualizer 连接Hive软件的使用:
- show databases;
- use rimengshe;
- show tables;
- select * from nagisa;
- # 设置本地模式(仅需当前机器)执行查询语句,不设置的话则需要使用yarn集群(多台集群的机器)执行查询语句
- # 本地模式只推荐在开发环境开启,以便提高查询效率,但在生产上线环境下应重新设置为使用yarm集群模式
- set hive.exec.mode.local.auto=true;
- select count(*) from nagisa;
-
-
- 5.分桶字段、分桶表:
- 1.分桶表 创建之前 需要开启分桶功能
- 2.分桶表(分簇表)创建的时候 分桶字段必须是表中已经存储(存在)的字段
- 3.分桶表数据的导入方式insert+select语句,实际是插入的数据来自查询的结果,查询的时候执行(Yarn集群)MR程序,如果是spark集群整合了hive的话,
- 则执行insert+select语句时执行的不再是(Yarn集群)MR程序,执行的而是spark job,效率比(Yarn集群)MR程序还高。
- 4.分桶的规则原理:Hive 采用对分桶字段值进行哈希取值操作,然后该值除以桶的个数 即求出的余数 决定该条记录存放在哪个桶中,即把该行数据放到哪个分组中。
- 5.分桶表和分桶字段的使用:针对join的连接字段进行分桶操作,即把join的连接字段作为分桶字段
- 6.分桶的好处:
- 如果多行数据中的join连接字段值都相同的话,那么通过分桶操作后(join的连接字段作为分桶字段),根据分桶的规则原理,
- 该多行数据都会被一起放进同一个分组(分桶)中,那么当两张表进行join连接操作时,只针对表对应的“列值(join连接字段值)相同的”分组数据(分桶数据)进行连接,
- 不需要对全表数据查找判断,减少join的操作量,从而提高效率。
-
-
- 1.分桶设置
- 1.查看分桶设置:
- set hive.enforce.bucketing; # 查看是否开启分桶机制
- set mapreduce.job.reduces; # 查看分桶数量
-
- 2.设置开启分桶(注意每次重新连接都需要重新开启分桶,因为每次断开连接后配置都会失效)
- set hive.enforce.bucketing = true;
- set mapreduce.job.reduces = 分桶数量;
- # 设置分桶数量:代表整张表数据会被分为多少份,同时“/user/hive/warehouse/数据库名.db/表名”的文件夹下便会生成多少份文件
-
- 2.分桶表 和 分桶字段的创建:
- 1.按照分桶字段值的哈希值,把表数据分为N组:
- create table 表名(字段1 类型, 字段2 类型)
- clustered by(分桶字段)
- into 分桶数量 buckets
- row format delimited fields terminated by '分隔符';
-
- 例子:
- 1.先执行:use 数据库名;
- 2.分桶表 和 分桶字段的创建:同时自动创建出“/user/hive/warehouse/stu_buck”文件夹
- create table stu_buck(Sno int,Sname string,Sex string,Sage int,Sdept string)
- clustered by(Sno)
- into 4 buckets
- row format delimited fields terminated by ',';
- 3.show tables;
- 4.把结构化数据文件导入到表中所在的路径下:格式一 和 格式二 作用相同
- 格式一:LOAD DATA local INPATH '结构化数据文件所在的绝对路径' INTO TABLE 表名;
- 格式二:hadoop fs -put 本地结构化数据文件所在路径 hdfs文件系统中“表”所在路径;
- 此处例子:LOAD DATA local INPATH '/root/hivedata/students.txt' INTO TABLE stu_buck;
- 5.查看到表中 已经显示出结构化数据:表已经关联了结构化数据文件
- select * from stu_buck;
-
- 6.对表中数据进行分桶操作(分组操作):
- 1.格式:insert overwrite table 分桶表名 select * from 分桶表名 cluster by(分桶字段);
- 注意:对分桶字段进行分组的同时,每个分组内的分桶字段还会进行从小到大的排序
- 2.例子:insert overwrite table stu_buck select * from stu_buck cluster by(Sno);
- 3.错误用法:select * from 分桶表名 cluster by(分桶字段) sort by(排序字段)
- 原因:cluster 和 sort 在对表中数据进行分桶操作时 不能一起使用,但在创建表时 cluster 和 sort 可以一起使用。
- 2.创建分桶表和分桶字段的时候,同时创建排序字段:
- 1.格式:create table 表名(字段 类型) clustered by(分桶字段) sorted by(排序字段 ASC/DESC) into 分桶数 buckets row format delimited fields terminated by '分隔符';
- 例子:create table stu_buck1(Sno int,Sname string,Sex string,Sage int,Sdept string) clustered by(Sno) sorted by(Sage ASC) into 4 buckets row format delimited fields terminated by ',';
- 注意:cluster 和 sort 在对表中数据进行分桶操作时 不能一起使用,但在创建表时 cluster 和 sort 可以一起使用。
-
- 2.把结构化数据文件导入到表中所在的路径下:格式一 和 格式二 作用相同
- 格式一:LOAD DATA local INPATH '结构化数据文件所在的绝对路径' INTO TABLE 表名;
- 格式二:hadoop fs -put 本地结构化数据文件所在路径 hdfs文件系统中“表”所在路径;
- 此处例子:LOAD DATA local INPATH '/root/hivedata/students.txt' INTO TABLE stu_buck1;
-
- 4.对表中数据进行分桶操作(分组操作),同时对分桶表中的某字段进行排序:
- 格式:insert overwrite table 分桶表名 select * from 分桶表名 distribute by(分桶字段名) sort by(排序字段名 asc);
- 例子:insert overwrite table stu_buck1 select * from stu_buck1 distribute by(Sno) sort by(Sage asc);
- 3.全局排序:对表中的数据按照某字段进行排序,表中数据并且不分组(不分桶)
- select * from 表名 order by(排序字段);
-
- 6.内部表、外部表
- 1.内部表:
- 1.创建内部表的格式:create table 表名(字段1 类型, 字段2 类型) row format delimited fields terminated by '分隔符';
- 内部表默认创建在 <name>hive.metastore.warehouse.dir</name>所配置的 <value>/user/hive/warehouse</value>目录下。
- 2.创建内部表的结果:创建出“/user/hive/warehouse/数据库名.db/表名”的文件夹
- 3.把结构化数据文件导入到内部表中所在的路径下:格式一 和 格式二 作用相同
- 格式一:LOAD DATA local INPATH '结构化数据文件所在的绝对路径' INTO TABLE 表名;
- 格式二:hadoop fs -put 本地结构化数据文件所在路径 hdfs文件系统中“表”所在路径;
- 4.删除内部表(drop table 内部表名)时:
- 不仅把表和表中元数据删除,还会把内部表所在的hdfs目录下的数据文件删除
- 5.创建内部表的例子:
- create table student(Sno int,Sname string,Sex string,Sage int,Sdept string) row format delimited fields terminated by ',';
-
- 2.外部表
- 1.创建外部表的格式:
- 1.格式一:create external table 表名(字段1 类型, 字段2 类型) row format delimited fields terminated by '分隔符';
- 在使用external 创建外部表时,不使用location,那么外部表默认创建在 <name>hive.metastore.warehouse.dir</name>所配置的
- <value>/user/hive/warehouse</value>目录下。
- 2.格式二:create external table 表名(字段1 类型, 字段2 类型) row format delimited fields terminated by '分隔符' location 'hdfs文件系统下的目录的绝对路径';
- 在使用external 创建外部表时,同时使用location指定一个hdfs文件系统下的目录的绝对路径的话,表示把外部表创建在此,
- 并且结构化数据的文件也要放到该location指定的目录下。即该location指定的目录下存放该外部表对应的结构化数据的文件,
- 用于外部表关联该目录下的结构化数据的文件。
-
- 2.location 'hdfs文件系统下的目录的绝对路径' 的作用:
- 1.在使用external 创建外部表时,同时可以使用location,也可以不使用location。
- 2.在使用external 创建外部表时,同时使用location指定一个hdfs文件系统下的目录的绝对路径的话,表示把外部表创建在此,
- 并且结构化数据的文件也要放到该location指定的目录下。即该location指定的目录下存放该外部表对应的结构化数据的文件,
- 用于外部表关联该目录下的结构化数据的文件。
- 3.在使用external 创建外部表时,不使用location的话,那么外部表默认创建在 <name>hive.metastore.warehouse.dir</name>所配置的
- <value>/user/hive/warehouse</value>目录下。
-
- 3.把结构化数据文件导入到外部表中所在的路径下:格式一 和 格式二 作用相同
- 格式一:LOAD DATA local INPATH '结构化数据文件所在的绝对路径' INTO TABLE 表名;
- 格式二:hadoop fs -put 本地结构化数据文件所在路径 hdfs文件系统中“表”所在路径;
- 例子:load data inpath '/stu' into table stu_ext;
- 4.删除外部表(drop table 外部表名)时:
- 仅会把表和表中元数据删除,但不会把外部表所在的hdfs目录下的数据文件删除
- 5.创建外部表的例子:
- 1.hadoop fs -mkdir /stu # hdfs文件系统下的目录:存放该表对应的结构化数据的文件
- 2.hadoop fs -put /root/hivedata/students.txt /stu # 把 结构化数据的文件 存到 hdfs文件系统下的目录下
- 3.create external table stu_ext(Sno int,Sname string,Sex string,Sage int,Sdept string) row format delimited fields terminated by ',' location '/stu';
- # location '/stu' 表示 该外部表关联/stu目录下的 结构化数据的文件
- 4.select * from stu_ext; # 查询外部表:显示出 /stu 目录下的结构化数据
-
-
- 7.多重插入:
- 1.创建出3张表:
- 表1:create table source_table (id int, name string) row format delimited fields terminated by ',';
- 表2:create table test_insert1 (id int) row format delimited fields terminated by ',';
- 表3:create table test_insert2 (name string) row format delimited fields terminated by ',';
- 2.把 source_table表1 中的 id字段/列的值 拷贝到 test_insert1表2 中的id字段/列上,把 source_table表1中的 name字段/列的值 拷贝到 test_insert2表3 中的name字段/列上
- from source_table insert overwrite table test_insert1 select id insert overwrite table test_insert2 select name;
-
-
- 8.动态分区插入
- 1.开启动态分区
- set hive.exec.dynamic.partition=true; #是否开启动态分区功能,默认false关闭。
- set hive.exec.dynamic.partition.mode=nonstrict;
- #动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。
-
- 2.需求:将dynamic_partition_table中的数据按照时间(day),插入到目标表d_p_t的相应分区中。
- 实现:
- 1.创建dynamic_partition_table表:create table dynamic_partition_table(day string,ip string) row format delimited fields terminated by ",";
- 2.结构化数据的文件dynamic_partition_table.txt:
- 2015-05-10,ip1
- 2015-05-10,ip2
- 2015-06-14,ip3
- 2015-06-14,ip4
- 2015-06-15,ip1
- 2015-06-15,ip2
- 3.把结构化数据的文件中的结构化数据 导入到 dynamic_partition_table表中
- load data local inpath '/root/hivedata/dynamic_partition_table.txt' into table dynamic_partition_table;
- 4.创建目标表d_p_t:create table d_p_t(ip string) partitioned by (month string,day string);
- 5.动态插入操作:
- 格式:insert overwrite table 表名 partition (分区字段1, 分区字段2) select ip,substr(字段,start,end) as 分区字段1, 分区字段2 字段 from 表名;
- insert overwrite table d_p_t partition (month, day) select ip,substr(day,1,7) as month, day from dynamic_partition_table;
- 分析:
- dynamic_partition_table表中 ip字段值 赋值给 d_p_t表中的 ip字段
- dynamic_partition_table表中 day字段值 中的 第一个字符到第七个字符 进行拷贝 到 d_p_t表中的 month分区字段中
- dynamic_partition_table 表中 day字段值 赋值给 d_p_t表中的 day分区字段中
-
- 9.把select查询结果数据 导出到 本地文件中
- 1.将查询结果 保存到 指定的本地文件系统下的目录中
- insert overwrite local directory '本地文件系统中目录的绝对路径' select * from 表名;
-
- 2.将查询结果 保存到 指定的hdfs文件系统下的目录中
- insert overwrite directory 'hdfs文件系统目录的绝对路径' select * from 表名;
-
- 3.使用sqoop 可把 hive数据 导出到 MySQL,也可把 MySQL数据 导出到 hive中
-
- 10.增加/删除分区
- 1.准备例子:
- drop table t_partition;
- create table t_partition(id int,name string)
- partitioned by (dt string)
- row format delimited fields terminated by ',';
-
- 2.增加分区:
- 1.alter table 表名 add partition (分区字段名='分区字段值') 即创建 /user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值
- 2.alter table 表名 add partition (分区字段名1='分区字段值',分区字段名2='分区字段值')
- 即创建 /user/hive/warehouse/数据库名.db/表名/分区字段名1=分区字段值/分区字段名2=分区字段值
- 3.alter table t_partition add partition (分区字段名='分区字段值') location 'hdfs://NODE1:9000/XX';
- 给外部表添加分区,并使用location 指定分区所在的目录
-
- 3.删除分区
- alter table 表名 drop partition (分区字段名='分区字段值');
- 执行删除分区时,同时把分区目录和分区目录下的数据同时删除
- 注意区别于load data时候添加分区 会移动数据 会创建分区目录
-
-
- 11.join连接:
- 1.准备数据
- 1./root/hivedata/a.txt 要插入到 table a
- 1,a
- 2,b
- 3,c
- 4,d
- 7,y
- 8,u
- 2./root/hivedata/b.txt 要插入到 table b
- 2,bb
- 3,cc
- 7,yy
- 9,pp
- 3.建表:
- create table a(id int,name string) row format delimited fields terminated by ',';
- create table b(id int,name string) row format delimited fields terminated by ',';
-
- 4.导入数据:
- load data local inpath '/root/hivedata/a.txt' into table a;
- load data local inpath '/root/hivedata/b.txt' into table b;
-
- 2.各种join连接语句
- left join(左联接):返回包括左表中的所有记录和右表中联结字段相等的记录
- right join(右联接):返回包括右表中的所有记录和左表中联结字段相等的记录
- inner join(等值连接):只返回两个表中联结字段相等的行
-
- 1.select * from 表1 inner join 表2 on 表1.字段名=表2.字段名;
- select * from a inner join b on a.id=b.id;
- +-------+---------+-------+---------+--+
- | a.id | a.name | b.id | b.name |
- +-------+---------+-------+---------+--+
- | 2 | b | 2 | bb |
- | 3 | c | 3 | cc |
- | 7 | y | 7 | yy |
- +-------+---------+-------+---------+--+
-
- 2.left join
- select * from a left join b on a.id=b.id;
- +-------+---------+-------+---------+--+
- | a.id | a.name | b.id | b.name |
- +-------+---------+-------+---------+--+
- | 1 | a | NULL | NULL |
- | 2 | b | 2 | bb |
- | 3 | c | 3 | cc |
- | 4 | d | NULL | NULL |
- | 7 | y | 7 | yy |
- | 8 | u | NULL | NULL |
- +-------+---------+-------+---------+--+
-
- 3.right join
- select * from a right join b on a.id=b.id;
- +-------+---------+-------+---------+--+
- | a.id | a.name | b.id | b.name |
- +-------+---------+-------+---------+--+
- | 2 | b | 2 | bb |
- | 3 | c | 3 | cc |
- | 7 | y | 7 | yy |
- | NULL | NULL | 9 | pp |
- +-------+---------+-------+---------+--+
-
- 4.select * from a full outer join b on a.id=b.id;
- +-------+---------+-------+---------+--+
- | a.id | a.name | b.id | b.name |
- +-------+---------+-------+---------+--+
- | 1 | a | NULL | NULL |
- | 2 | b | 2 | bb |
- | 3 | c | 3 | cc |
- | 4 | d | NULL | NULL |
- | 7 | y | 7 | yy |
- | 8 | u | NULL | NULL |
- | NULL | NULL | 9 | pp |
- +-------+---------+-------+---------+--+
-
- 5.hive中的特别join:left semi join
- select * from a left semi join b on a.id = b.id;
- 相当于 select a.id,a.name from a where a.id in (select b.id from b); 在hive中效率极低
- +-------+---------+--+
- | a.id | a.name |
- +-------+---------+--+
- | 2 | b |
- | 3 | c |
- | 7 | y |
- +-------+---------+--+
-
- 6.select a.id,a.name from a join b on (a.id = b.id);
- 7.cross join:慎用
- 返回两个表的笛卡尔积结果,不需要指定关联键。
- select a.*,b.* from a cross join b;
-
-
- 12.内置jason函数:get_json_object
- 1.get_json_object(string json_string, string path)
- 1.第一个参数填写json对象变量名,即string json_string表示JSON格式值的字段名
- 2.第二个参数使用$表示json变量标识,即string path表示JSON格式值中的key,可以用“.”读取对象 或用“[]”读取数组;
- 如果输入的json字符串无效,那么返回NULL。 每次只能返回一个数据项。
- 2.例子1:
- test表中的其中一个字段名data ,data字段值的数据结构如下:
- data =
- {
- "store":
- {
- "fruit":[{"weight":8,"type":"apple"}, {"weight":9,"type":"pear"}],
- "bicycle":{"price":19.95,"color":"red"}
- },
- "email":"amy@only_for_json_udf_test.net",
- "owner":"amy"
- }
- 1.获取键值对中的的value(获取单值):hive> select get_json_object(data, '$.owner') from test;
- 结果:amy
-
- 2.获取键值对中的的value(获取JSON对象,使用“.”):hive> select get_json_object(data, '$.store.bicycle.price') from test;
- 结果:19.95
-
- 3.获取键值对中的的value(获取数组值,使用“[]”)
- hive> select get_json_object(data, '$.store.fruit[0]') from test;
- 结果:{"weight":8,"type":"apple"}
-
- 3.例子2:select get_json_object(line,'$.movie') as moive, get_json_object(line,'$.rate') as rate from rat_json limit 10;
-
-
- 13.transform
- 1.先加载rating.json文件到hive的一个原始表 rat_json
- create table rat_json(line string) row format delimited;
- load data local inpath '/root/hivedata/rating.json' into table rat_json;
-
- 2.需要解析json数据成四个字段,插入一张新的表 t_rating
- 1.drop table if exists t_rating;
- 2.create table t_rating(movieid string, rate int, timestring string, uid string)
- row format delimited fields terminated by '\t';
- 3.insert overwrite table t_rating
- select get_json_object(line,'$.movie') as moive,
- get_json_object(line,'$.rate') as rate,
- get_json_object(line,'$.timeStamp') as timestring,
- get_json_object(line,'$.uid') as uid
- from rat_json limit 10;
-
- 3.使用transform+python的方式去转换unixtime为weekday
- 先编辑一个python脚本文件
- ########python######代码
- vi weekday_mapper.py
- #!/bin/python
- import sys
- import datetime
-
- for line in sys.stdin:
- line = line.strip()
- movieid, rating, unixtime,userid = line.split('\t')
- weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
- print '\t'.join([movieid, rating, str(weekday),userid])
-
- 保存文件
- 然后,将文件加入hive的classpath:hive>add FILE /root/hivedata/weekday_mapper.py;
- create table u_data_new as select
- transform (movieid, rate, timestring,uid)
- using 'python weekday_mapper.py' as (movieid, rate, weekday,uid)
- from t_rating;
- select distinct(weekday) from u_data_new limit 10;
-
-
- 14.用户自定义函数 UDF
- 1.添加 hive-exec-1.2.1.jar 和 hadoop-common-2.7.4.jar 依赖
- <dependencies>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>1.2.1</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.4</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- 2.写一个 java 类,继承 UDF,并重载 evaluate 方法
- public class ItcastFunc extends UDF
- {
- //evaluate就是hive自定义函数实现具体业务逻辑的地方
- //重载的evaluate函数
- public String evaluate(String input)
- {
- if(input==null){return null;}
- return input.toLowerCase();
- }
-
- //重载的evaluate函数
- public int evaluate(int a,int b)
- {
- return a+b;
- }
- }
-
- 3.使用package命令打包项目为一个jar包:会在 \项目名\target 目录下生成一个jar包
- 4.启动 mysql 版的 Hive:
- 本地连接方式:cd /root/hive/bin
- ./hive
- 5.在Hive客户端下:
- 执行 hive> add jar xxx.jar; 表示把项目所打成的jar包上传到hive的classpath路径下
- 例子:add jar /root/hivedata/HiveUDF-1.0-SNAPSHOT.jar;
-
- 6.创建临时函数的同时并与项目(jar包)中的 class类 所关联
- 执行hive> create temporary function 临时函数名 as 'class类的全限定名(包名.类名)';
- 例子:create temporary function ItcastFunc as 'cn.itcast.hive.udf.ItcastFunc';
-
- 7.执行函数临时函数并传入参数,即等于执行class类中的对应的evaluate函数。
- 如果class类中定义了多个重载的evaluate函数的话,那么根据传入的参数数量和参数类型调用对应的重载函数。
- 执行 hive> select 临时函数名('ABC'); # 输出结果为 abc
- 执行 hive> select 临时函数名(2,3); # 输出结果为 5
- 例子:select ItcastFunc('ABC'); # 输出结果为 abc
- 例子:select ItcastFunc(2,3); # 输出结果为 5
-
- 或者也可以吧hql中查询出的结果传入到临时函数中进行处理返回新的结果:
- 执行 hive> select 临时函数名(name), age from 表名; # 表示把查询出的name字段值 传入到临时函数中进行处理返回新的结果
-
- 8.由于创建的临时函数 仅在当前打开的hive交互客户端下有效,当断开连接或打开一个hive交互客户端窗口的话,
- 是无法使用之前创建的临时函数,此时仍然需要重新临时函数。
- 1.两者分别是什么?
- Apache Hive是一个构建在Hadoop基础设施之上的数据仓库。通过Hive可以使用HQL语言查询存放在HDFS上的数据。
- HQL是一种类SQL语言,这种语言最终被转化为Map/Reduce. 虽然Hive提供了SQL查询功能,但是Hive不能够进行交互查询--因为它只能够在Haoop上批量的执行Hadoop。
- Apache HBase是一种Key/Value系统,它运行在HDFS之上。和Hive不一样,Hbase的能够在它的数据库上实时运行,而不是运行MapReduce任务。
- HBase被分区为表格,表格又被进一步分割为列簇。列簇必须使用schema定义,列簇将某一类型列集合起来(列不要求schema定义)。
- 例如,“message”列簇可能包含:“to”, ”from” “date”, “subject”, 和”body”. 每一个 key/value对在Hbase中被定义为一个cell,
- 每一个key由row-key,列簇、列和时间戳。在Hbase中,行是key/value映射的集合,这个映射通过row-key来唯一标识。Hbase利用Hadoop的基础设施,
- 可以利用通用的设备进行水平的扩展。
-
- 2.两者的特点
- Hive帮助熟悉SQL的人运行MapReduce任务。因为它是JDBC兼容的,同时,它也能够和现存的SQL工具整合在一起。
- 运行Hive查询会花费很长时间,因为它会默认遍历表中所有的数据。虽然有这样的缺点,一次遍历的数据量可以通过Hive的分区机制来控制。
- 分区允许在数据集上运行过滤查询,这些数据集存储在不同的文件夹内,查询的时候只遍历指定文件夹(分区)中的数据。这种机制可以用来,
- 例如,只处理在某一个时间范围内的文件,只要这些文件名中包括了时间格式。
- HBase通过存储key/value来工作。它支持四种主要的操作:增加或者更新行,查看一个范围内的cell,获取指定的行,删除指定的行、列或者是列的版本。
- 版本信息用来获取历史数据(每一行的历史数据可以被删除,然后通过Hbase compactions就可以释放出空间)。
- 虽然HBase包括表格,但是schema仅仅被表格和列簇所要求,列不需要schema。Hbase的表格包括增加/计数功能。
-
- 3.限制
- Hive目前不支持更新操作。另外,由于hive在hadoop上运行批量操作,它需要花费很长的时间,通常是几分钟到几个小时才可以获取到查询的结果。
- Hive必须提供预先定义好的schema将文件和目录映射到列,并且Hive与ACID不兼容。
- HBase查询是通过特定的语言来编写的,这种语言需要重新学习。类SQL的功能可以通过Apache Phonenix实现,但这是以必须提供schema为代价的。
- 另外,Hbase也并不是兼容所有的ACID特性,虽然它支持某些特性。最后但不是最重要的--为了运行Hbase,Zookeeper是必须的,
- zookeeper是一个用来进行分布式协调的服务,这些服务包括配置服务,维护元信息和命名空间服务。
-
- 4.应用场景
- Hive适合用来对一段时间内的数据进行分析查询,例如,用来计算趋势或者网站的日志。Hive不应该用来进行实时的查询。因为它需要很长时间才可以返回结果。
- Hbase非常适合用来进行大数据的实时查询。Facebook用Hbase进行消息和实时的分析。它也可以用来统计Facebook的连接数。
-
- 5.总结
- Hive和Hbase是两种基于Hadoop的不同技术--Hive是一种类SQL的引擎,并且运行MapReduce任务,Hbase是一种在Hadoop之上的NoSQL 的Key/vale数据库。
- 当然,这两种工具是可以同时使用的。就像用Google来搜索,用FaceBook进行社交一样,Hive可以用来进行统计查询,HBase可以用来进行实时查询,
- 数据也可以从Hive写到Hbase,设置再从Hbase写回Hive。
- 1.ORC File文件结构
- 1.ORC的全称是(Optimized Row Columnar),ORC文件格式是一种Hadoop生态圈中的列式存储格式,它的产生早在2013年初,最初产生自Apache Hive,
- 用于降低Hadoop数据存储空间和加速Hive查询速度。和Parquet类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。
- ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗,
- 目前也被Spark SQL、Presto等查询引擎支持,但是Impala对于ORC目前没有支持,仍然使用Parquet作为主要的列式存储格式。
- 2015年ORC项目被Apache项目基金会提升为Apache顶级项目。
-
- 2.ORC具有以下一些优势:
- 1.ORC是列式存储,有多种文件压缩方式,并且有着很高的压缩比。
- 2.文件是可切分(Split)的。因此,在Hive中使用ORC作为表的文件存储格式,不仅节省HDFS存储资源,查询任务的输入数据量减少,使用的MapTask也就减少了。
- 3.提供了多种索引,row group index、bloom filter index。
- 4.ORC可以支持复杂的数据结构(比如Map等)
-
- 2.列式存储
- 1.由于OLAP查询的特点,列式存储可以提升其查询性能,但是它是如何做到的呢?
- 这就要从列式存储的原理说起,从图1中可以看到,相对于关系数据库中通常使用的行式存储,在使用列式存储时每一列的所有元素都是顺序存储的。
- 由此特点可以给查询带来如下的优化:
- 1.查询的时候不需要扫描全部的数据,而只需要读取每次查询涉及的列,这样可以将I/O消耗降低N倍,另外可以保存每一列的统计信息(min、max、sum等),
- 实现部分的谓词下推。
- 2.由于每一列的成员都是同构性的,可以针对不同的数据类型使用更高效的数据压缩算法,进一步减小I/O。
- 3.由于每一列的成员的同构性,可以使用更加适合CPU pipeline的编码方式,减小CPU的缓存失效。
- 2.需要注意的是,ORC在读写时候需要消耗额外的CPU资源来压缩和解压缩,当然这部分的CPU消耗是非常少的。
- 3.数据模型
- 1.和Parquet不同,ORC原生是不支持嵌套数据格式的,而是通过对复杂数据类型特殊处理的方式实现嵌套格式的支持。
- 2.对于如下的hive表:
- CREATE TABLE `orcStructTable`(
- `name` string,
- `course` struct<course:string,score:int>,
- `score` map<string,int>,
- `work_locations` array<string>
- )
-
- 在ORC的结构中包含了复杂类型列和原始类型,前者包括LIST、STRUCT、MAP和UNION类型,后者包括BOOLEAN、整数、浮点数、字符串类型等,
- 其中STRUCT的孩子节点包括它的成员变量,可能有多个孩子节点,MAP有两个孩子节点,分别为key和value,LIST包含一个孩子节点,
- 类型为该LIST的成员类型,UNION一般不怎么用得到。每一个Schema树的根节点为一个Struct类型,所有的column按照树的中序遍历顺序编号。
- ORC只需要存储schema树中叶子节点的值,而中间的非叶子节点只是做一层代理,它们只需要负责孩子节点值得读取,只有真正的叶子节点才会读取数据,
- 然后交由父节点封装成对应的数据结构返回。
-
-
-
- 4.文件结构
- 1.和Parquet类似,ORC文件也是以二进制方式存储的,所以是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。
- 2.ORC的文件结构如下图,其中涉及到如下的概念:
- 1.ORC文件:保存在文件系统上的普通二进制文件,一个ORC文件中可以包含多个stripe,每一个stripe包含多条记录,这些记录按照列进行独立存储,
- 对应到Parquet中的row group的概念。
- 2.文件级元数据:包括文件的描述信息PostScript、文件meta信息(包括整个文件的统计信息)、所有stripe的信息和文件schema信息。
- 3.stripe:一组行形成一个stripe,每次读取文件是以行组为单位的,一般为HDFS的块大小,保存了每一列的索引和数据。
- 4.stripe元数据:保存stripe的位置、每一个列的在该stripe的统计信息以及所有的stream类型和位置。
- 5.row group:索引的最小单位,一个stripe中包含多个row group,默认为10000个值组成。
- 6.stream:一个stream表示文件中一段有效的数据,包括索引和数据两类。索引stream保存每一个row group的位置和统计信息,数据stream包括多种类型的数据,
- 具体需要哪几种是由该列类型和编码方式决定。
- 3.在ORC文件中保存了三个层级的统计信息,分别为文件级别、stripe级别和row group级别的,他们都可以用来根据Search ARGuments(谓词下推条件)判断是否可以跳过某些数据,
- 在统计信息中都包含成员数和是否有null值,并且对于不同类型的数据设置一些特定的统计信息。
- 1.file level
- 在ORC文件的末尾会记录文件级别的统计信息,会记录整个文件中columns的统计信息。这些信息主要用于查询的优化,
- 也可以为一些简单的聚合查询比如max, min, sum输出结果。
- 2.stripe level
- ORC文件会保存每个字段stripe级别的统计信息,ORC reader使用这些统计信息来确定对于一个查询语句来说,需要读入哪些stripe中的记录。
- 比如说某个stripe的字段max(a)=10,min(a)=3,那么当where条件为a >10或者a <3时,那么这个stripe中的所有记录在查询语句执行时不会被读入。
- 3.row level
- 为了进一步的避免读入不必要的数据,在逻辑上将一个column的index以一个给定的值(默认为10000,可由参数配置)分割为多个index组。
- 以10000条记录为一个组,对数据进行统计。Hive查询引擎会将where条件中的约束传递给ORC reader,这些reader根据组级别的统计信息,
- 过滤掉不必要的数据。如果该值设置的太小,就会保存更多的统计信息,用户需要根据自己数据的特点权衡一个合理的值
-
-
-
- 5.数据访问
- 1.读取ORC文件是从尾部开始的,第一次读取16KB的大小,尽可能的将Postscript和Footer数据都读入内存。文件的最后一个字节保存着PostScript的长度,它的长度不会超过256字节,
- PostScript中保存着整个文件的元数据信息,它包括文件的压缩格式、文件内部每一个压缩块的最大长度(每次分配内存的大小)、Footer长度,以及一些版本信息。
- 在Postscript和Footer之间存储着整个文件的统计信息(上图中未画出),这部分的统计信息包括每一个stripe中每一列的信息,主要统计成员数、最大值、最小值、是否有空值等。
- 2.接下来读取文件的Footer信息,它包含了每一个stripe的长度和偏移量,该文件的schema信息(将schema树按照schema中的编号保存在数组中)、整个文件的统计信息以及
- 每一个row group的行数。
- 3.处理stripe时首先从Footer中获取每一个stripe的其实位置和长度、每一个stripe的Footer数据(元数据,记录了index和data的的长度),整个striper被分为index和data两部分,
- stripe内部是按照row group进行分块的(每一个row group中多少条记录在文件的Footer中存储),row group内部按列存储。
- 每一个row group由多个stream保存数据和索引信息。每一个stream的数据会根据该列的类型使用特定的压缩算法保存。
- 在ORC中存在如下几种stream类型:
- PRESENT:每一个成员值在这个stream中保持一位(bit)用于标示该值是否为NULL,通过它可以只记录部位NULL的值
- DATA:该列的中属于当前stripe的成员值。
- LENGTH:每一个成员的长度,这个是针对string类型的列才有的。
- DICTIONARY_DATA:对string类型数据编码之后字典的内容。
- SECONDARY:存储Decimal、timestamp类型的小数或者纳秒数等。
- ROW_INDEX:保存stripe中每一个row group的统计信息和每一个row group起始位置信息。
- 4.在初始化阶段获取全部的元数据之后,可以通过includes数组指定需要读取的列编号,它是一个boolean数组,如果不指定则读取全部的列,
- 还可以通过传递SearchArgument参数指定过滤条件,根据元数据首先读取每一个stripe中的index信息,然后根据index中统计信息以及SearchArgument参数确定
- 需要读取的row group编号,再根据includes数据决定需要从这些row group中读取的列,通过这两层的过滤需要读取的数据只是整个stripe多个小段的区间,
- 然后ORC会尽可能合并多个离散的区间尽可能的减少I/O次数。然后再根据index中保存的下一个row group的位置信息调至该stripe中第一个需要读取的row group中。
- 5.ORC文件格式只支持读取指定字段,还不支持只读取特殊字段类型中的指定部分。
- 6.使用ORC文件格式时,用户可以使用HDFS的每一个block存储ORC文件的一个stripe。对于一个ORC文件来说,stripe的大小一般需要设置得比HDFS的block小,
- 如果不这样的话,一个stripe就会分别在HDFS的多个block上,当读取这种数据时就会发生远程读数据的行为。如果设置stripe的只保存在一个block上的话,
- 如果当前block上的剩余空间不足以存储下一个strpie,ORC的writer接下来会将数据打散保存在block剩余的空间上,直到这个block存满为止。
- 这样,下一个stripe又会从下一个block开始存储。
- 7.由于ORC中使用了更加精确的索引信息,使得在读取数据时可以指定从任意一行开始读取,更细粒度的统计信息使得读取ORC文件跳过整个row group,
- ORC默认会对任何一块数据和索引信息使用ZLIB压缩,因此ORC文件占用的存储空间也更小,这点在后面的测试对比中也有所印证。
-
-
-
- 6.文件压缩
- 1.ORC文件使用两级压缩机制,首先将一个数据流使用流式编码器进行编码,然后使用一个可选的压缩器对数据流进行进一步压缩。
- 一个column可能保存在一个或多个数据流中,可以将数据流划分为以下四种类型:
- 1.Byte Stream:字节流保存一系列的字节数据,不对数据进行编码。
- 2.Run Length Byte Stream:字节长度字节流保存一系列的字节数据,对于相同的字节,保存这个重复值以及该值在字节流中出现的位置。
- 3.Integer Stream:整形数据流保存一系列整形数据。可以对数据量进行字节长度编码以及delta编码。具体使用哪种编码方式需要根据整形流中的子序列模式来确定。
- 4.Bit Field Stream:比特流主要用来保存boolean值组成的序列,一个字节代表一个boolean值,在比特流的底层是用Run Length Byte Stream来实现的。
- 2.接下来会以Integer和String类型的字段举例来说明。
- 1.Integer:对于一个整形字段,会同时使用一个比特流和整形流。比特流用于标识某个值是否为null,整形流用于保存该整形字段非空记录的整数值。
- 2.String:
- 对于一个String类型字段,ORC writer在开始时会检查该字段值中不同的内容数占非空记录总数的百分比不超过0.8的话,就使用字典编码,
- 字段值会保存在一个比特流,一个字节流及两个整形流中。比特流也是用于标识null值的,字节流用于存储字典值,
- 一个整形流用于存储字典中每个词条的长度,另一个整形流用于记录字段值。
- 如果不能用字典编码,ORC writer会知道这个字段的重复值太少,用字典编码效率不高,ORC writer会使用一个字节流保存String字段的值,
- 然后用一个整形流来保存每个字段的字节长度。
- 在ORC文件中,在各种数据流的底层,用户可以自选ZLIB, Snappy和LZO压缩方式对数据流进行压缩。编码器一般会将一个数据流压缩成一个个小的压缩单元,
- 在目前的实现中,压缩单元的默认大小是256KB。
-
-
-
- 7.Hive + ORC建立数据仓库
- 1.在建Hive表的时候我们就应该指定文件的存储格式。所以你可以在Hive SQL语句里面指定用ORC File这种文件格式,如下:
- 1.CREATE TABLE ... STORED AS ORC
- hive创建表时stored as orc:指定hive表存储文件的格式。一般默认为txt格式。
- 2.ALTER TABLE ... [PARTITION partition_spec] SET FILEFORMAT ORC
- 3.SET hive.default.fileformat=Orc
-
- 2.所有关于ORC File的参数都是在Hive SQL语句的TBLPROPERTIES字段里面出现,他们是:
- Key Default Notes
- orc.compress ZLIB 高级压缩(one of NONE, ZLIB, SNAPPY)
- orc.compress.size 262,144 每个压缩块中的字节数
- orc.stripe.size 268435456 每个stripe的字节数
- orc.row.index.stride 10,000 索引条目之间的行数(must be >= 1000)
- orc.create.index true 是否创建行索引
-
-
-
- 8.Java操作ORC:用JAVA在本地生成ORC文件
- 到https://orc.apache.org官网下载orc源码包,然后编译获取orc-core-1.3.0.jar、orc-mapreduce-1.3.0.jar、orc-tools-1.3.0.jar,将其加入项目中。
- 大多情况下,还是建议在Hive中将文本文件转成ORC格式,这种用JAVA在本地生成ORC文件,属于特殊需求场景。
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
- import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
- import org.apache.orc.CompressionKind;
- import org.apache.orc.OrcFile;
- import org.apache.orc.TypeDescription;
- import org.apache.orc.Writer;
-
- public class TestORCWriter
- {
- public static void main(String[] args) throws Exception
- {
- Path testFilePath = new Path("/tmp/test.orc");
- Configuration conf = new Configuration();
- TypeDescription schema = TypeDescription.fromString("struct<field1:int,field2:int,field3:int>");
- Writer writer = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf).setSchema(schema).compress(CompressionKind.SNAPPY));
- VectorizedRowBatch batch = schema.createRowBatch();
- LongColumnVector first = (LongColumnVector) batch.cols[0];
- LongColumnVector second = (LongColumnVector) batch.cols[1];
- LongColumnVector third = (LongColumnVector) batch.cols[2];
-
- final int BATCH_SIZE = batch.getMaxSize();
- // add 1500 rows to file
- for (int r = 0; r < 15000000; ++r) {
- int row = batch.size++;
- first.vector[row] = r;
- second.vector[row] = r * 3;
- third.vector[row] = r * 6;
- if (row == BATCH_SIZE - 1) {
- writer.addRowBatch(batch);
- batch.reset();
- }
- }
- if (batch.size != 0) {
- writer.addRowBatch(batch);
- batch.reset();
- }
- writer.close();
- }
- }
-
-
-
- 9.更高的压缩比,更好的性能–使用ORC文件格式优化Hive
- 1.Hive从0.11版本开始提供了ORC的文件格式,ORC文件不仅仅是一种列式文件存储格式,最重要的是有着很高的压缩比,并且对于MapReduce来说是可切分(Split)的。
- 因此,在Hive中使用ORC作为表的文件存储格式,不仅可以很大程度的节省HDFS存储资源,而且对数据的查询和处理性能有着非常大的提升,因为ORC较其他文件格式压缩比高,
- 查询任务的输入数据量减少,使用的Task也就减少了。
-
- 2.需要注意的是,ORC能很大程序的节省存储和计算资源,但它在读写时候需要消耗额外的CPU资源来压缩和解压缩,当然这部分的CPU消耗是非常少的。
- 对性能提升的另一个方面是通过在ORC文件中为每一个字段建立一个轻量级的索引,来判定一个文件中是否满足WHERE子句中的过滤条件。
- 比如:当执行HQL语句”SELECT COUNT(1) FROM 表名 WHERE id = 0”时候,先从ORC文件的metadata(元数据)中读取索引信息,
- 快速定位到id=0所在的offsets,如果从索引信息中没有发现id=0的信息,则直接跳过该文件。
- 10.创建ORC表、查看ORC的文件元数据
- 1.创建ORC表:
- 1.CREATE TABLE 表名lxw1234_orc1 (
- id INT,
- name STRING
- ) stored AS ORC; # hive创建表时stored as orc:指定hive表存储文件的格式。一般默认为txt格式。
-
- 2.通过INSERT + SELECT 把查询数据插入到ORC表lxw1234_orc1中
- INSERT overwrite TABLE 表名lxw1234_orc1
- SELECT CAST(siteid AS INT) AS id, pcid
- FROM 表名lxw1234_text
- limit 10;
- 注意:CAST( 字段 AS 目标数据类型)函数的参数是一个表达式,它包括用AS关键字分隔的源值和目标数据类型。
- 以下例子用于将文本字符串'12'转换为int整型:SELECT CAST('12' AS int)
-
- 3.查询ORC表lxw1234_orc1:SELECT * FROM lxw1234_orc1 ORDER BY id;
- 查询数据如下:
- 139 89578071000037563815CC
- 139 E811C27809708556F87C79
- 633 82E0D8720C8D1556C75ABA
- 819 726B86DB00026B56F3F151
- 1134 8153CD6F059210539E4552
- 1154 5E26977B0EEE5456F7E7FB
- 1160 583C0271044D3D56F95436
- 1351 FA05CFDD05622756F953EE
- 1351 16A5707006C43356F95392
- 1361 3C17A17C076A7E56F87CCC
-
- 2.查看ORC的文件元数据
- 1.ORC表lxw1234_orc1对应的HDFS文件为:/hivedata/warehouse2/lxw1234_orc1/000000_0
- 新版本的Hive中提供了更详细的查看ORC文件信息的工具 orcfiledump。
- 执行命令 格式:./hive –orcfiledump -j -p ORC表对应的HDFS文件的绝对路径
- 例子:./hive –orcfiledump -j -p /hivedata/warehouse2/lxw1234_orc1/000000_0
- 返回一段JSON,将其格式化后:
- 11.ORC查询优化
- 1.经过上面ORC文件的元数据 了解了一个 ORC文件会被分成多个stripe,而且ORC文件的元数据中有每个字段的统计信息(min/max,hasNull等等),
- 这就为ORC的查询优化做好了基础准备。假如我的查询过滤条件为 WHERE id = 0 在Map Task读到一个ORC文件时,首先从ORC文件的统计信息中看看id字段的min/max值,
- 如果0不包含在内,那么这个文件就可以直接跳过了。
- 2.基于这点,还有一个更有效的优化手段是在数据入库的时候,根据id字段排序后入库,这样尽量能使id=0的数据位于同一个文件甚至是同一个stripe中,
- 那么在查询时候,只有负责读取该文件的Map Task需要扫描文件,其他的Map Task都会跳过扫描,大大节省Map Task的执行时间。
- 海量数据下,使用SORT BY 排序字段id ASC/DESC可能不太现实,另一个有效手段是使用DISTRIBUTE BY 分桶字段id SORT BY 排序字段id ASC/DESC;
-
- 3.使用下面的HQL构造一个较大的ORC表进行ORC查询优化:
- 1.# hive创建表时stored as orc:指定hive表存储文件的格式。一般默认为txt格式。
- CREATE TABLE 表名lxw1234_orc2 stored AS ORC
- AS SELECT CAST(siteid AS INT) AS id, pcid
- FROM 表名lxw1234_text DISTRIBUTE BY 分桶字段id SORT BY 排序字段id ASC/DESC;
-
- 2.该分桶字段+排序字段的语句保证相同的id位于同一个ORC文件中,并且是排序的。
- SELECT DISTINCT INPUT__FILE__NAME FROM ORC表lxw1234_orc2 WHERE id = 0;
- id=0的数据只存在于这一个ORC文件(hdfs://cdh5/hivedata/warehouse2/lxw1234_orc2/000000_0)中,而这个ORC表lxw1234_orc2有 33个ORC文件。
- 3.也可以通过命令查看文件的统计信息:
- 格式:./hive –orcfiledump -j -p ORC表对应的HDFS文件的绝对路径
- 例子:./hive –orcfiledump -j -p hdfs://cdh5/hivedata/warehouse2/lxw1234_orc2/000000_0
- 查询出的文件信息:
- 该文件中id的最小值为0,最大值为1155。因此,对于HQL查询”SELECT COUNT(1) FROM ORC表lxw1234_orc2 WHERE id = 0”,
- 优化器在执行时候,只会扫描这一个文件,其他文件都应该跳过。
- 4.在验证之前,先介绍一个参数:hive.optimize.index.filter,是否自动使用索引,默认为false(不使用);
- 如果不设置该参数为true,那么ORC的索引当然也不会使用。
- 在Hive中执行 set hive.optimize.index.filter=true; # 设置该参数为true,启用ORC的索引
- 执行查询:SELECT COUNT(1) FROM ORC表lxw1234_orc2 WHERE id = 0;
- 查看日志信息:该查询一共有13个MapTask,找到包含/hivedata/warehouse2/lxw1234_orc2/000000_0的MapTask进行查看,然后查看其它MapTask,均没有扫描记录的日志。
- 不使用索引,再执行一次:
- 设置不使用索引,HIVE中执行:set hive.optimize.index.filter=false;
- 执行查询:SELECT COUNT(1) FROM ORC表lxw1234_orc2 WHERE id = 0;
- 再查看日志时,每个MapTask中都有扫描记录的日志,说明每个MapTask都对自己的分片进行了扫描。
- 两次执行,MapTask的执行时间也能说明问题。
- 由此可见,Hive中的ORC不仅仅有着高压缩比,很大程序的节省存储空间和计算资源,而且在其上还做了许多优化(这里仅仅介绍了row_index)。
- 如果使用Hive作为大数据仓库,强烈建议主要使用ORC文件格式作为表的存储格式。
- 1.目的:将上网日志导入到hive中,要求速度快,压缩高,查询快,表易维护。推荐使用ORC格式的表存储数据
- 2.思路:因为在hive指定 RCFILE(升级版为ORC FILE)格式的表,不能直接load数据到RCFILE(升级版为ORC FILE)格式的表中,
- 只能通过对text file表进行insert + select插入“查询数据”到 RCFILE(升级版为ORC FILE)格式的表中。
- 考虑先建立text File格式内部临时表tmp_testp,使用hdfs fs -put命令向tmp_testp表路径拷贝数据(不是load),再建立ORC格式外部表http_orc,
- 使用insert + select命令把text File格式内部临时表tmp_test表数据 导入到 ORC格式外部表http_orc中,最后删除掉临时表数据。过程消耗的时间。
-
- 3.优化方案:如何提高hdfs文件上传效率
- 1.文件不要太大(测试用文件从200m到1G不均),启动多个客户端并行上传文件
- 2.考虑减少hive数据副本为2
- 3.优化mapReduce及hadoop集群,提高I/O,减少内存使用
-
- 3.执行:
- 1.建立text File格式的内部临时表,使表的location关联到一个日志文件的文件夹下:
- create table IF NOT EXISTS tmp_testp(p_id INT,tm BIGINT,idate BIGINT,phone BIGINT)
- partitioned by (dt string)
- row format delimited fields terminated by '\,'
- location '/hdfs/incoming';
-
- 2.通过hdfs上传文件124G文件,同时手动建立分区映射关系来导入数据。
- 给text File格式的内部临时表 添加分区:ALTER TABLE tmp_testp ADD PARTITION(dt='2013-09-30');
- 把源数据文件上传到内部临时表中的分区文件夹中:hadoop fs -put /hdfs/incoming/*d /hdfs/incoming/dt=2013-09-30
- 记录耗时:上传速度缓慢,内存消耗巨大
- 12:44 - 14:58 =两小时14分钟
- Mem: 3906648k total, 3753584k used, 153064k free, 54088k buffers
- 内存利用率96%
-
- 3.测试text File格式的内部临时表是否可以直接读取数据
- select * from tmp_testp where dt='2013-09-30';
-
- 4.建立ORC格式的外部表
- create external table IF NOT EXISTS http_orc(p_id INT,tm BIGINT,idate BIGINT,phone BIGINT )
- partitioned by (dt string)
- row format delimited fields terminated by '\,'
- stored as orc; # hive创建表时stored as orc:指定hive表存储文件的格式。一般默认为txt格式。
-
- 5.通过insert + select 方式将text File格式的内部临时表中数据 导入到 ORC格式的外部表中
- insert overwrite table http_orc partition(dt='2013-09-30') select p_id,tm,idate,phone from tmp_testp where dt='2013-09-30';
- 记录耗时:Time taken: 3511.626 seconds = 59分钟,
- 注意:insert这一步,可以选择字段导入到orc表中,达到精简字段,多次利用临时表建立不同纬度分析表的效果,不需要提前处理原始log文件,
- 缺点是上传到hdfs原始文件时间太长
-
- 6.计算ORC表压缩率:
- HDFS Read: 134096430275 HDFS Write: 519817638 SUCCESS
- 压缩率:519817638/134096430275=0.386% 哎呀,都压缩没了
-
- 7.删除text File格式的内部临时表,保证hdfs中只存一份ORC压缩后的文件
- drop table tmp_testp;
-
- 8.简单测试一下ORC压缩表操作看看,ORC压缩表与txtFile不压缩表的性能对比
- 1.ORC格式的外部表执行:select count(*) from http_orc;
- 结果:469407190 Time taken: 669.639 seconds, Fetched: 1 row(s)
-
- 2.text File格式的内部临时表执行:select count(*) from tmp_testp;
- 结果:469407190 Time taken: 727.944 seconds, Fetched: 1 row(s)
-
- 3.结论:ORC效果不错,比text File效果好一点点。平均每s上传文件:124G / (2hour14min+59min)= 11M/s。可以清楚看到向hdfs上传文件浪费了大量时间。
- 1.hive表的源文件存储格式有几类:
- 1.TEXT FILE
- 默认格式,建表时不指定默认为这个格式,导入数据时会直接把数据文件拷贝到hdfs上不进行处理。源文件可以直接通过hadoop fs -cat 查看
-
- 2.SEQUENCE FILE
- 1.一种Hadoop API提供的二进制文件,使用方便、可分割、可压缩等特点。
- SEQUENCEFILE将数据以<key,value>的形式序列化到文件中。序列化和反序列化使用Hadoop 的标准的Writable 接口实现。
- key为空,用value 存放实际的值,这样可以避免map 阶段的排序过程。
- 2.三种压缩选择:NONE, RECORD, BLOCK。 Record压缩率低,一般建议使用BLOCK压缩。
- 使用时设置参数:
- 1.SET hive.exec.compress.output=true;
- 2.SET io.seqfile.compression.type=BLOCK; # 可以是 NONE/RECORD/BLOCK
- 3.create table test2(str STRING) STORED AS SEQUENCEFILE;
- 3.RC FILE
- 1.一种行列存储相结合的存储方式。
- 首先,其将数据按行分块,保证同一个record在一个块上,避免读一个记录需要读取多个block。其次,块数据列式存储,有利于数据压缩和快速的列存取。
- 理论上具有高查询效率(但hive官方说效果不明显,只有存储上能省10%的空间,所以不好用,可以不用)。
- 2.RCFile结合行存储查询的快速和列存储节省空间的特点
- 1.同一行的数据位于同一节点,因此元组重构的开销很低;
- 2.块内列存储,可以进行列维度的数据压缩,跳过不必要的列读取。
- 3.查询过程中,在IO上跳过不关心的列。实际过程是,在map阶段从远端拷贝仍然拷贝整个数据块到本地目录,也并不是真正直接跳过列,
- 而是通过扫描每一个row group的头部定义来实现的。但是在整个HDFS Block 级别的头部并没有定义每个列从哪个row group起始到哪个row group结束。
- 所以在读取所有列的情况下,RCFile的性能反而没有SequenceFile高。
-
- 4.ORC FILE(RCFILE的升级版)
- hive给出的新格式,属于RCFILE的升级版。
-
- 2.自定义格式
- 用户的数据文件格式不能被当前 Hive 所识别的,通过实现inputformat和outputformat来自定义输入输出格式
-
- 2.只有TEXT FILE表能直接加载数据,必须本地load数据,和external外部表直接加载运路径数据一样,都只能用TEXT FILE表。
- 更深一步,hive默认支持的压缩文件(hadoop默认支持的压缩格式),也只能用TEXT FILE表直接读取。其他格式不行。可以通过TEXT FILE表加载后insert到其他表中。
- 换句话说,Sequence File、RC File、ORC File表不能直接加载数据,数据要先导入到TEXT FILE表,再通过insert + select 把TEXT FILE表 导入到 Sequence File、RC File、ORC File表。
- Sequence File、RC File、ORC File表的源文件不能直接查看,而是在hive中用select看。
- RCFile源文件可以用 hive --service rcfilecat /xxxxxxxxxxxxxxxxxxxxxxxxxxx/000000_0查看,但是格式不同,很乱。
-
- 3.ORC FILE格式(RCFILE的升级版)
- ORC是RCfile的升级版,性能有大幅度提升,而且数据可以压缩存储,压缩比和Lzo压缩差不多,比text文件压缩比可以达到70%的空间。而且读性能非常高,可以实现高效查询。
-
- 4.ORC FILE表的创建:
- 1.方式一
- 1.create table if not exists test_orc( advertiser_id string, ad_plan_id string, cnt BIGINT )
- partitioned by (day string, type TINYINT COMMENT '0 as bid, 1 as win, 2 as ck', hour TINYINT)
- STORED AS ORC;
- 2.alter table test_orc set serdeproperties('serialization.null.format' = '');
- 1.在hive里面默认的情况下会使用’/N’来表示null值,但是这样的表示并不符合我们平时的习惯。
- 所以需要通过serialization.null.format的设置来修改表的默认的null表示方式。
- 2.例子1:没有指定serialization.null.format,在查询时NULL值数据被转写成’/N’显示
- CREATE TABLE 表名(id int,name STRING) STORED AS TEXTFILE;
- 查看表对应的hdfs的文件中数据:hadoop fs -cat /xx/表名/attempt_201105020924_0011_m_000000_0
- id name
- /N mary
- 101 tom
-
- 3.例子2:指定serialization.null.format,在查询时NULL值没有被转写成’/N’,而是以空字符串显示NULL值
- CREATE TABLE 表名(id int,name STRING)
- ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
- WITH SERDEPROPERTIES ( field.delim’='/t’, ‘escape.delim’='//’, ‘serialization.null.format’='' )
- STORED AS TEXTFILE;
- 查看表对应的hdfs的文件中数据:hadoop fs -cat /xx/表名/attempt_201105020924_0011_m_000000_0
- id name
- mary
- 101 tom
-
- 3.hive中空值判断基本分两种
- 1.NULL 与 \N
- 1.hive在底层数据中如何保存和标识NULL,是由 alter table name SET SERDEPROPERTIES('serialization.null.format' = '\N') 参数控制的。
- 在hive里面默认的情况下会使用’/N’来表示null值。
- 2.设置 alter table name SET SERDEPROPERTIES('serialization.null.format' = '\N');
- 则底层数据保存的是'NULL',通过查询显示的是'\N'
- 这时如果查询为空值的字段可通过 语句:a is null 或者 a='\\N'
- 3.设置 alter tablename SET SERDEPROPERTIES('serialization.null.format' = 'NULL');
- 则底层数据保存的是'NULL',通过查询显示的是'NULL'
- 这时如果查询为空值的字段可通过 语句:a is null 或者 a='NULL'
- 4.设置 alter tablename SET SERDEPROPERTIES('serialization.null.format' = '');
- 则底层数据保存的是'NULL',通过查询显示的是空字符串''
- 这时如果查询为空值的字段可通过 语句:a is null
- 2.空字符串'' 与 length(xx)=0
- 空字符串'' 表示的是字段不为null且为空字符串,此时用 a is null 是无法查询这种值的,必须通过 a=''或者 length(a)=0 查询
-
- 3.查看结果
- hive> show create table test_orc;
- CREATE TABLE `test_orc`(
- `advertiser_id` string,
- `ad_plan_id` string,
- `cnt` bigint)
- PARTITIONED BY (
- `day` string,
- `type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck',
- `hour` tinyint)
- ROW FORMAT DELIMITED
- NULL DEFINED AS ''
- STORED AS INPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
- OUTPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
- LOCATION
- 'hdfs://namenode/hivedata/warehouse/pmp.db/test_orc'
- TBLPROPERTIES (
- 'last_modified_by'='pmp_bi',
- 'last_modified_time'='1465992624',
- 'transient_lastDdlTime'='1465992624')
-
-
- 2.方式二
- 1.drop table test_orc;
- 2.create table if not exists test_orc( advertiser_id string, ad_plan_id string, cnt BIGINT )
- partitioned by (day string, type TINYINT COMMENT '0 as bid, 1 as win, 2 as ck', hour TINYINT)
- ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
- with serdeproperties('serialization.null.format' = '') # 表示查询出NULL值数据时显示为空字符串
- STORED AS ORC;
- 3.查看结果
- hive> show create table test_orc;
- CREATE TABLE `test_orc`(
- `advertiser_id` string,
- `ad_plan_id` string,
- `cnt` bigint)
- PARTITIONED BY (
- `day` string,
- `type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck',
- `hour` tinyint)
- ROW FORMAT DELIMITED
- NULL DEFINED AS ''
- STORED AS INPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
- OUTPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
- LOCATION
- 'hdfs://namenode/hivedata/warehouse/pmp.db/test_orc'
- TBLPROPERTIES (
- 'transient_lastDdlTime'='1465992726')
-
-
- 3.方式三
- 1.drop table test_orc;
- 2.create table if not exists test_orc( advertiser_id string, ad_plan_id string, cnt BIGINT )
- partitioned by (day string, type TINYINT COMMENT '0 as bid, 1 as win, 2 as ck', hour TINYINT)
- ROW FORMAT DELIMITED NULL DEFINED AS '' # 表示查询出NULL值数据时显示为空字符串
- STORED AS ORC;
- 3.查看结果
- hive> show create table test_orc;
- CREATE TABLE `test_orc`(
- `advertiser_id` string,
- `ad_plan_id` string,
- `cnt` bigint)
- PARTITIONED BY (
- `day` string,
- `type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck',
- `hour` tinyint)
- ROW FORMAT DELIMITED
- NULL DEFINED AS ''
- STORED AS INPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
- OUTPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
- LOCATION
- 'hdfs://namenode/hivedata/warehouse/pmp.db/test_orc'
- TBLPROPERTIES (
- 'transient_lastDdlTime'='1465992916')
- 1.数据仓库分层架构
- 1.按照数据流入流出的过程, 数据仓库架构可分为三层:源数据、 数据仓库、数据应用。
- 2.数据仓库的数据来源于不同的源数据,并提供多样的数据应用,数据自下而上流入数据仓库后向上层开放应用,而数据仓库只是中间集成化数据管理的一个平台。
- 3.源数据层(ODS):此层数据无任何更改, 直接沿用外围系统数据结构和数据,不对外开放;为临时存储层,是接口数据的临时存储区域,为后一步的数据处理做准备。
- 4.数据仓库层(DW):也称为细节层,DW 层的数据应该是一致的、准确的、干净的数据,即对源系统数据进行了清洗(去除了杂质)后的数据。
- 5.数据应用层(DA 或 APP):前端应用直接读取的数据源;根据报表、专题分析需求而计算生成的数据。
- 6.数据仓库从各数据源获取数据及在数据仓库内的数据转换和流动都可以认为是 ETL(抽取 Extra, 转化 Transfer, 装载 Load)的过程,
- ETL 是数据仓库的流水线,也可以认为是数据仓库的血液,它维系着数据仓库中数据的新陈代谢,而数据仓库日常的管理和维护工作的大部分精力就是保持 ETL 的正常和稳定。
- 7.为什么要对数据仓库分层?
- 用空间换时间,通过大量的预处理来提升应用系统的用户体验(效率),因此数据仓库会存在大量冗余的数据;
- 不分层的话,如果源业务系统的业务规则发生变化将会影响整个数据清洗过程,工作量巨大。
- 通过数据分层管理可以简化数据清洗的过程, 因为把原来一步的工作分到了多个步骤去完成,相当于把一个复杂的工作拆成了多个简单的工作,
- 把一个大的黑盒变成了一个白盒,每一层的处理逻辑都相对简单和容易理解,这样我们比较容易保证每一个步骤的正确性,当数据发生错误的时候,
- 往往我们只需要局部调整某个步骤即可。
--------------------------------------------------------------------------------------------------------------
- 我的项目的HIVE数据仓库分层:
- 1.源数据层(ODS):存储源数据文件
- 2.数据仓库层(DW):建立动态分区表
- 格式:insert overwrite table 表名 partition (分区字段1, 分区字段2) select ip,substr(字段,start,end) as 分区字段1, 分区字段2 字段 from 表名;
- 例子:insert overwrite table d_p_t partition (month, day) select ip,substr(day,1,7) as month, day from dynamic_partition_table;
- 分析:dynamic_partition_table表中 ip字段值 赋值给 d_p_t表中的 ip字段
- dynamic_partition_table表中 day字段值 中的 第一个字符到第七个字符 进行拷贝 到 d_p_t表中的 month分区字段中
- dynamic_partition_table 表中 day字段值 赋值给 d_p_t表中的 day分区字段中
- 3.数据应用层(DA 或 APP):
- 1.创建分桶表(分桶字段) + 分区表(分区字段)
- create table 表名(字段1 类型, 字段2 类型)
- clustered by(分桶字段)
- into 分桶数量 buckets
- partitioned by (分区字段名 类型, 分区字段名 类型)
- row format delimited fields terminated by '分隔符';
-
- 2.对 分桶表+分区表 中的数据 进行分桶操作(分组操作),此时表中的分桶字段数据才具有分桶特性
- insert overwrite table 分桶表名 select * from 分桶表名 cluster by(分桶字段);
-
- 3.创建分桶表(分桶字段) + 分区表(分区字段) + 排序字段
- create table 表名(字段 类型)
- clustered by(分桶字段)
- sorted by(排序字段 ASC/DESC)
- into 分桶数 buckets
- partitioned by (分区字段名 类型, 分区字段名 类型)
- row format delimited fields terminated by '分隔符';
-
- 4.对表中数据进行分桶操作(分组操作),同时对分桶表中的某字段进行排序,此时表中的分桶字段数据才具有分桶特性,排序字段数据才具有排序特性
- insert overwrite table 分桶表名 select * from 分桶表名 distribute by(分桶字段名) sort by(排序字段名 ASC/DESC);
--------------------------------------------------------------------------------------------------------------
- 1.数据仓库准备工作:为什么要对数据仓库分层?
- 1.用空间换时间,通过大量的预处理来提升应用系统的用户体验(效率),因此数据仓库会存在大量冗余的数据;
- 2.如果不分层的话,如果源业务系统的业务规则发生变化将会影响整个数据清洗过程,工作量巨大
- 3.通过数据分层管理可以简化数据清洗的过程,因为把原来一步的工作分到了多个步骤去完成,相当于把一个复杂的工作拆成了多个简单的工作,
- 把一个大的黑盒变成了一个白盒,每一层的处理逻辑都相对简单和容易理解,这样我们比较容易保证每一个步骤的正确性,当数据发生错误的时候,
- 往往我们只需要局部调整某个步骤即可。
-
- 2.数据仓库标准上可以分为四层: ODS(临时存储层)、PDW(数据仓库层)、MID(数据集市层)、APP(应用层)
- 1.ODS 层:
- 为临时存储层,是接口数据的临时存储区域,为后一步的数据处理做准备。一般来说 ODS 层的数据和源系统的数据是同构的,
- 主要目的是简化后续数据加工处理的工作。从数据粒度上来说 ODS 层的数据粒度是最细的。ODS 层的表通常包括两类,一个用于存储当前需要加载的数据,
- 一个用于存储处理完后的历史数据。历史数据一般保存 3-6 个月后需要清除,以节省空间。但不同的项目要区别对待,如果源系统的数据量不大,
- 可以保留更长的时间,甚至全量保存;
-
- 2.PDW 层:
- 为数据仓库层,PDW 层的数据应该是一致的、准确的、干净的数据,即对源系统数据进行了清洗(去除了杂质)后的数据。
- 这一层的数据一般是遵循数据库第三范式的,其数据粒度通常和 ODS 的粒度相同。在 PDW 层会保存 BI 系统中所有的历史数据,例如保存 10 年的数据
-
- 3.MID 层:
- 为数据集市层,这层数据是面向主题来组织数据的,通常是星形或雪花结构的数据。从数据粒度来说,这层的数据是轻度汇总级的数据,
- 已经不存在明细数据了。从数据的时间跨度来说,通常是 PDW 层的一部分,主要的目的是为了满足用户分析的需求,而从分析的角度来说,
- 用户通常只需要分析近几年(如近三年的数据)的即可。从数据的广度来说,仍然覆盖了所有业务数据。
-
- 4.APP 层:
- 为应用层,这层数据是完全为了满足具体的分析需求而构建的数据,也是星形或雪花结构的数据。从数据粒度来说是高度汇总的数据。
- 从数据的广度来说,则并不一定会覆盖所有业务数据,而是 MID 层数据的一个真子集,从某种意义上来说是 MID 层数据的一个重复。
- 从极端情况来说,可以为每一张报表在 APP 层构建一个模型来支持,达到以空间换时间的目的数据仓库的标准分层只是一个建议性质的标准,
- 实际实施时需要根据实际情况确定数据仓库的分层,不同类型的数据也可能采取不同的分层方法。
-
- 3.这里我们采用的是京东的数据仓库分层模式,是根据标准的模型演化而来。
- 数据仓库分层:
- BDM:缓冲数据,源数据的直接映像
- FDM:基础数据层,数据拉链处理、分区处理
- GDM:通用聚合
- ADM:高度聚合
- 先把数据从源数据库中抽取加载到 BDM 层中,然后 FDM 层根据 BDM 层的数据按天分区
-
================Java API 操作 Hive==============
- 1.创建一个maven项目,pom.xml文件配置如下
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.bigdata.hadoop</groupId>
- <artifactId>hive</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-jdbc</artifactId>
- <version>2.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.9</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.5.1</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
-
-
- 2.创建测试类HiveJDBC,代码如下
- 官网参考:https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients
-
- package com.bigdata.hadoop.hive;
-
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
-
- import java.sql.*;
-
- /*
- JDBC 操作 Hive(注:JDBC 访问 Hive 前需要先启动HiveServer2)
- 启动本地路径下的hive/bin中的hiveserver2服务器,供外部的linux远程连接访问当前linux下的hive
- 1.前台模式:
- cd /root/hive/bin
- ./hiveserver2
- 使用 jps 命令查看如果显示有RunJar,表示hiveserver2服务器启动成功
- 2.后台模式:
- cd /root/hive/bin
- nohup ./hiveserver2 1>/var/log/hiveserver.log 2>/var/log/hiveserver.err &
- 然后会返回hiveserver2服务器的进程号
- 、
- */
- public class HiveJDBC {
-
- private static String driverName = "org.apache.hive.jdbc.HiveDriver";
- private static String url = "jdbc:hive2://NODE1:10000/数据库名"; //连接url,可以使用IP,也可以使用主机名,端口默认为10000
- private static String user = "root"; //连接的用户名(注:不是登录hive的用户名,是hive所在服务器登录用户名)
- private static String password = ""; //密码,可以不用输入
-
- private static Connection conn = null;
- private static Statement stmt = null;
- private static ResultSet rs = null;
-
- // 加载驱动、创建连接
- @Before
- public void init() throws Exception {
- Class.forName(driverName);
- conn = DriverManager.getConnection(url,user,password);
- stmt = conn.createStatement();
- }
-
- // 创建数据库
- @Test
- public void createDatabase() throws Exception {
- String sql = "create database hive_jdbc_test";
- System.out.println("Running: " + sql);
- stmt.execute(sql);
- }
-
- // 查询所有数据库
- @Test
- public void showDatabases() throws Exception {
- String sql = "show databases";
- System.out.println("Running: " + sql);
- rs = stmt.executeQuery(sql);
- while (rs.next()) {
- System.out.println(rs.getString(1));
- }
- }
-
- // 创建表
- @Test
- public void createTable() throws Exception {
- String sql = "create table emp(\n" +
- "empno int,\n" +
- "ename string,\n" +
- "job string,\n" +
- "mgr int,\n" +
- "hiredate string,\n" +
- "sal double,\n" +
- "comm double,\n" +
- "deptno int\n" +
- ")\n" +
- "row format delimited fields terminated by '\\t'";
- System.out.println("Running: " + sql);
- stmt.execute(sql);
- }
-
- // 查询所有表
- @Test
- public void showTables() throws Exception {
- String sql = "show tables";
- System.out.println("Running: " + sql);
- rs = stmt.executeQuery(sql);
- while (rs.next()) {
- System.out.println(rs.getString(1));
- }
- }
-
- // 查看表结构
- @Test
- public void descTable() throws Exception {
- String sql = "desc emp";
- System.out.println("Running: " + sql);
- rs = stmt.executeQuery(sql);
- while (rs.next()) {
- System.out.println(rs.getString(1) + "\t" + rs.getString(2));
- }
- }
-
- // 加载数据
- @Test
- public void loadData() throws Exception {
- String filePath = "/home/hadoop/data/emp.txt";
- String sql = "load data local inpath '" + filePath + "' overwrite into table emp";
- System.out.println("Running: " + sql);
- stmt.execute(sql);
- }
-
- // 查询数据
- @Test
- public void selectData() throws Exception {
- String sql = "select * from emp";
- System.out.println("Running: " + sql);
- rs = stmt.executeQuery(sql);
- System.out.println("员工编号" + "\t" + "员工姓名" + "\t" + "工作岗位");
- while (rs.next()) {
- System.out.println(rs.getString("empno") + "\t\t" + rs.getString("ename") + "\t\t" + rs.getString("job"));
- }
- }
-
- // 统计查询(会运行mapreduce作业)
- @Test
- public void countData() throws Exception {
- String sql = "select count(1) from emp";
- System.out.println("Running: " + sql);
- rs = stmt.executeQuery(sql);
- while (rs.next()) {
- System.out.println(rs.getInt(1) );
- }
- }
-
- // 删除数据库
- @Test
- public void dropDatabase() throws Exception {
- String sql = "drop database if exists hive_jdbc_test";
- System.out.println("Running: " + sql);
- stmt.execute(sql);
- }
-
- // 删除数据库表
- @Test
- public void deopTable() throws Exception {
- String sql = "drop table if exists emp";
- System.out.println("Running: " + sql);
- stmt.execute(sql);
- }
-
- // 释放资源
- @After
- public void destory() throws Exception {
- if ( rs != null) {
- rs.close();
- }
- if (stmt != null) {
- stmt.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
- }
- show databases;
- use rimengshe;
- show tables;
- select * from nagisa;
- set hive.exec.mode.local.auto=true; # 设置本地模式(仅需当前机器)执行查询语句,不设置的话则需要使用yarn集群(多台集群的机器)执行查询语句
- select count(*) from nagisa;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。