赞
踩
熟悉Hive命令,通过编写HiveQL脚本初步掌握更高层次的ETL操作。
联合使用MapReduce+Hive,计算目标数据信息。
(选做)初步掌握UDF/UDAF等自定义精细化数据计算操作,为后续学习SparkSQL和类似计算框架类SQL使用做好准备。
Hive是一种常用的数据仓库工具,帮助不熟悉Java编程的用户,依靠熟练的SQL操作实现MR程序的编写。
Hive组件内置了解释器、编译器和优化器,能够通过预置的MR程序模板将用户编写的HiveSQL操作“翻译”成MR程序,并交给集群执行计算。
Hive本身不存储和计算数据,它完全依赖于HDFS和MapReduce,它仅仅是一种纯逻辑操作;本质上看,Hive处理的就是HDFS上的数据,可以认为是map-reduce的一种包装。
Hive也需要缓存元数据(Metastore ),且本次实验配合MySQL存储元数据;当然也可以直接使用Hive内嵌的Derby。
完成实验一,搭建好伪分布式环境
完成实验二,确保HDFS可用
完成实验三,掌握MR程序的编写方法
安装成功截图:
安装好hadoop2.7.3(Linux系统下),参考:Ubuntu下安装Hadoop
安装好hive2.3.6(Linux系统下),参考:安装Hive
cd soft/
$ nano emp.csv
输入内容如下,保存退出
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10
$ nano dept.csv
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
1.在hdfs中创建一个路径(用来存储表)
hdfs dfs -mkdir /001/hive
2.上传emp.csv和dept.csv文件到hdfs中去。
hdfs dfs -put dept.csv /001/hive
hdfs dfs -put emp.csv /001/hive
3.查看是否上传成功:
(2)创建员工表(emp+学号,如:emp001)注意:在hive命令行下输入启动hadoop
start-all.sh
进入hive:
hive
新建员工表(emp+学号,如:emp001)
create table emp001(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int) row format delimited fields terminated by ',';
(3)创建部门表(dept+学号,如:dept001)
create table dept001(deptno int,dname string,loc string) row format delimited fields terminated by ',';
(4)导入数据
load data inpath '/001/hive/emp.csv' into table emp001;
load data inpath '/001/hive/dept.csv' into table dept001;
(5)查询表数据
查看表emp001的数据:
select * from emp001;
查看表dept001的数据;
select * from dept001;
(6)根据员工的部门号创建分区,表名emp_part+学号,如:emp_part001
create table emp_part001(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int)partitioned by (deptno int)row format delimited fields terminated by ',';
往分区表中插入数据:指明导入的数据的分区(通过子查询导入数据)。
insert into table emp_part001 partition(deptno=10) select empno,ename,job,mgr,hiredate,sal,comm from emp001 where deptno=10;
insert into table emp_part001 partition(deptno=20) select empno,ename,job,mgr,hiredate,sal,comm from emp001 where deptno=20;
insert into table emp_part001 partition(deptno=30) select empno,ename,job,mgr,hiredate,sal,comm from emp001 where deptno=30;
查看emp_part001表的分区情况
show partitions emp_part001;
在hive中查看分区表在HDFS中存储时对应的目录
dfs -ls /user/hive/warehouse/emp_part001;
在hdfs中也能查看分区表在HDFS中存储时对应的目录
hdfs dfs -ls /user/hive/warehouse/emp_part001;
查询某个分区的数据:
dfs -cat /user/hive/warehouse/emp_part001/deptno=10/000000_0;
dfs -cat /user/hive/warehouse/emp_part001/deptno=20/000000_0;
dfs -cat /user/hive/warehouse/emp_part001/deptno=30/000000_0;
(7)创建一个桶表,表名emp_bucket+学号,如:emp_bucket001,根据员工的职位(job)进行分桶
设置开启桶表
set hive.enforce.bucketing = true;
创建桶表
create table emp_bucket001(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int)clustered by (job) into 4 buckets row format delimited fields terminated by ',';
Hive桶表是指将数据按照指定列的哈希值分成若干个桶(Bucket),并将每个桶的数据存储在独立的文件中的表。桶表可以提高查询性能,尤其是对于大型数据集的查询。
Hive创建桶表的作用:
查询性能优化:Hive桶表可以提高查询性能,尤其是对于大型数据集的查询。当使用桶表时,数据可以被分成较小的块,使查询更加高效。此外,桶表还可以使查询更加均衡,因为桶的大小被定义为相等的大小。
数据倾斜处理:对于某些列数据倾斜的情况,使用桶表可以有效地解决数据倾斜的问题。将数据分成若干个桶可以避免在某些桶中出现数据倾斜的情况,因为数据被均匀地分布在各个桶中。
数据压缩:使用桶表可以将数据压缩到更小的单元中,从而减少存储空间。
需要注意的是,创建桶表需要考虑桶的数量和桶内数据的均衡性。桶的数量应该根据数据大小和查询需求进行调整,以确保查询性能和数据均衡性。
通过子查询插入数据:
insert into emp_bucket001 select * from emp001;
查询分桶数据
dfs -ls /user/hive/warehouse/emp_bucket001;
(8)查询员工信息:员工号 姓名 薪水
select empno,ename,sal from emp001;
(9)多表查询(hive会自动启动mapreduce程序实现多表链接查询)
select dept001.dname,emp001.ename from emp001,dept001 where emp001.deptno=dept001.deptno;
(10)做报表,根据职位给员工涨工资,把涨前、涨后的薪水显示出来
按如下规则涨薪,PRESIDENT涨1000元,MANAGER涨800元,其他人员涨400元
select empno,ename,job,sal,
case job when 'PRESIDENT' then sal+1000
when 'MANAGER' then sal+800
else sal+400
end
from emp001;
这是一条SQL查询语句,用于从名为
emp001
的表中选择员工编号(empno
)、员工姓名(ename
)、职位(job
)和薪水(sal
),并根据不同的职位添加不同的薪水补贴。具体解释如下:
SELECT empno,ename,job,sal, ...
:选择查询结果中要包含的列,分别是员工编号、员工姓名、职位和薪水。
CASE job WHEN 'PRESIDENT' THEN sal+1000 WHEN 'MANAGER' THEN sal+800 ELSE sal+400 END
:这是一个 CASE
表达式,根据不同的职位为每个员工计算新的薪水。如果职位是总裁(PRESIDENT
),则将薪水加上1000;如果职位是经理(MANAGER
),则将薪水加上800;否则,将薪水加上400。
FROM emp001
:指定查询的数据来源是名为 emp001
的表。
因此,这个查询语句将返回包含员工编号、员工姓名、职位、薪水和经过薪水补贴计算后的新薪水的查询结果集。
答:在Hive中,可以将表分为内部表和外部表,其中内部表又可以分为托管表和管理表。下面分别对这些概念进行解释: 内部表(Internal
Table):内部表是由Hive管理的表,表数据和表元数据都由Hive管理。在创建内部表时,Hive会在HDFS上为表创建一个目录,表数据存储在该目录下。内部表的元数据由Hive的元数据存储服务管理,包括表名、列名、数据类型、分区信息等。
外部表(External
Table):外部表也是由Hive管理的表,但是表数据不由Hive管理。在创建外部表时,Hive只会为表创建一个表结构,而不会在HDFS上为表创建目录。表数据存储在用户指定的位置上,可以是HDFS上的任何位置或者本地文件系统上的任何位置。外部表的元数据由Hive的元数据存储服务管理,包括表名、列名、数据类型、分区信息等。
托管表(Managed
Table):托管表是一种特殊的内部表,表数据和元数据都由Hive管理。在创建托管表时,Hive会自动在HDFS上为表创建一个目录,并将表数据存储在该目录下。当删除托管表时,Hive会同时删除表数据和元数据。托管表适合用于需要频繁操作、临时存储等场景。
管理表(Unmanaged
Table):管理表也是一种内部表,但是表数据由用户管理,Hive只管理表元数据。在创建管理表时,Hive只会在元数据中记录表数据的位置,而不会在HDFS上为表创建目录。当删除管理表时,只会删除表元数据,而不会删除表数据。管理表适合用于已经存在的数据,并且需要持久存储的场景。
这些概念之间的区别和联系如下:
1.内部表和外部表的区别在于表数据的存储方式,内部表的数据由Hive管理,而外部表的数据不由Hive管理。
2.托管表和管理表的区别在于表数据的管理方式,托管表的数据和元数据都由Hive管理,而管理表的数据由用户管理,元数据由Hive管理。
3.内部表和外部表都可以是托管表或管理表,因此,托管表和管理表是内部表和外部表的子集。
4.外部表适合用于需要和其他系统共享数据的场景,例如跨部门的数据共享等。而内部表适合用于需要频繁操作、临时存储等场景。
5.托管表适合用于需要频繁操作、修改、删除等场景。而管理表适合用于已经存在的数据,并且需要持久存储的场景。
答:LOAD DATA
INPATH命令的作用是将指定路径下的数据文件加载到指定的Hive表中。在执行该命令时,Hive会自动将数据文件移动到HDFS上的指定目录。默认情况下,数据文件将被移动到HDFS上的/user/hive/warehouse目录下,该目录是Hive表的默认存储位置。
因此,数据文件dept.csv和emp.csv应该被移动到HDFS上的/user/hive/warehouse/dept001目录下,而不是转移到本地文件系统的目录/001/hive/下。
答:我们使用了CASE表达式对job列的值进行了修改,并根据不同的job值进行了不同的加减计算。但是,这种修改是在查询时进行的,不会对表中的数据进行实际的修改。因此,再次查询数据时,表中的数据并没有发生实际的变化,仍然是原始数据。在Hive中,查询语句不会对表中的数据进行实际的修改,只是在查询时对数据进行了加工、筛选、排序等操作,生成了新的查询结果。如果要对表中的数据进行实际的修改,可以使用INSERT、UPDATE、DELETE等命令进行操作。
答:Hive使用MySQL作为元数据存储服务,将Hive表的元数据信息、数据存储位置、表分区信息、索引信息等存储在MySQL数据库中。举例子:
1.TBLS表:存储Hive中所有表的基本信息,包括表名、数据库名、表类型、表所有者、创建时间、表参数等。
2.COLUMNS_V2表:存储Hive表的列信息,包括列名、数据类型、注释、列ID等。
3.PARTITIONS表:存储Hive表的分区信息,包括分区值、分区所在的目录路径、分区的创建时间等。
4.PARTITION_KEYS表:存储Hive表的分区键信息,包括分区键的列名、数据类型等。
5.SDS表:存储Hive表的存储信息,包括存储格式、存储路径、输入输出格式、输入输出路径等。
6.SERDES表:存储Hive表的序列化/反序列化器信息,包括序列化/反序列化器类型、序列化/反序列化器参数等。
7.IDX_COLUMNS表:存储Hive表的索引信息,包括索引名称、索引列、索引类型等。
安装好hadoop2.7.3(Linux系统下)
安装好MySQL(Windows或Linux系统下)
安装好Hive(Linux系统下)参考:Hive安装配置
从搜狗实验室下载搜索数据进行分析
下载的数据包含6个字段,数据格式说明如下:
访问时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL
1.字段分隔符:字段分隔符是个数不等的空格;
2.字段个数:有些行有6个字段,有些达不到6个字段。
思路:用MapReduce做数据清洗,用Hive来分析数据。
打开搜狗实验室链接
http://www.sogou.com/labs/resource/q.php
我下载的文件:
我使用的是Xftp
hdfs dfs -put SogouQ1.txt /data
因为原始数据中有些行的字段数不为6,且原始数据的字段分隔符不是Hive表规定的逗号’,',所以需要对原始数据进行数据清洗。
通过编写MapReduce程序完成数据清洗:
a.将不满足6个字段的行删除
b.将字段分隔符由不等的空格变为逗号‘,’分隔符
首先Eclipse新建Maven工程:Zongheshiyan
修改pom.xml文件
设置主类:在一行之前添加如下语句
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- main()所在的类,注意修改为包名+主类名 --> <mainClass>com.Zongheshiyan.App</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
添加依赖:在 一行之前添加如下语句
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency>
最终的pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com</groupId> <artifactId>Zongheshiyan</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>Zongheshiyan</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.3</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- main()所在的类,注意修改为包名+主类名 --> <mainClass>com.Zongheshiyan.Main</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
3.4 编写代码
编写SogouMapper类(用来数据清洗)
package com.Zongheshiyan; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class SogouMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override /** * 在任务开始时,被调用一次。且只会被调用一次。 */ protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 避免乱码 // 数据格式:20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 // http://www.qiyi.com/ // 字节数组转化为GBK编码的字符串, String data = new String(v1.getBytes(), 0, v1.getLength(), "UTF-8"); // 还有一个"\"是转义,"\s+"表示匹配一个或多个连续的空白字符 String words[] = data.split("\\s+"); // 判断没有6个字段就删除哪一行。 if (words.length != 6) { return;// 退出程序。 } // 用逗号代替空白字符。 String newdata = data.replace("\\s+", ",");// 替换成逗号 // 输出 context.write(new Text(newdata), NullWritable.get()); } @Override /** * 在任务结束时,被调用一次。且只会被调用一次。 */ protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } }
Main类(用来启动一个mapreduce程序)
package com.Zongheshiyan; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Hello world! * */ public class Main { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Main.class); // 指定map输出 job.setMapperClass(SogouMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 指定reduce的输出 job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); // 指定输入、输出 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交job,等待结束 job.waitForCompletion(true); } }
打包工程:mvn install package
在使用Xftp 7将windows下的jar包上传到linux中去。
开启hadoop
start-all.sh
输入jps看是否开启成功
jps
同时也把mr历史服务器进程打开
mr-jobhistory-daemon.sh start historyserver
mr-jobhistory-daemon.sh start historyserver 命令的作用是启动Hadoop MapReduce作业历史服务器(JobHistory Server)的守护进程。
在Hadoop集群中,JobHistory Server是一个可以收集和存储MapReduce作业历史信息的服务。启动JobHistory Server守护进程后,它将开始监听指定的端口并等待来自客户端的查询请求。您可以使用Web界面或API来查询历史信息,包括作业启动时间、完成时间、作业配置信息、输入输出路径、任务状态等等。这些历史信息对于作业调试、性能优化和资源管理非常有用。
因此,当您使用Hadoop MapReduce框架运行作业时,启动JobHistory Server守护进程是一个很重要的步骤,它可以帮助您更好地管理和分析MapReduce作业的历史信息。
hadoop jar Zongheshiyan-0.0.1-SNAPSHOT.jar /data/SogouQ1.txt /out/Sougou1
查看输出结果
查看最后10行
hdfs dfs -tail /out/Sougou1/part-r-00000
查看前10行
hdfs dfs -cat /out/Sougou1/part-r-00000 | head -n 10
4.创建hive表
进入hive命令行
hive
创建sougoulog_1表:
create table sogoulog_1(accesstime string,useID string,keyword string,no1 int,clickid int,url string) row format delimited fields terminated by ',';
5.将MapReduce清洗后的数据导入Hive sogoulog_1表中
load data inpath '/out/Sougou1/part-r-00000' into table sogoulog_1;
Loading data to table default.sogoulog_1
使用SQL查询满足条件的数据(只显示前10条)
查找用户第一个点击的网址,这个网址在搜索界面排名第二。
select * from sogoulog_1 where no1=2 and clickid=1 limit 10;
查看 sogoulog_1表结构
一天内,一共搜索关键词的个数(自动启动mapreduce)
select count(keyword) from sogoulog_1;
第一次点击的次数来看,排名越靠前,点击次数越多。
这是第一次点击排名第一的网址
select count(keyword) from sogoulog_1 where no1=1 and clickid=1;
这是第一次点击排名第二的网址
select count(keyword) from sogoulog_1 where no1=2 and clickid=1;
这是第一次点击排名第三的网址
select count(keyword) from sogoulog_1 where no1=3 and clickid=1;
从排名第一URL来看,点击顺序越小越多(首先被点到的可能性就越大)。
这是排名第一的网址第一次被点到
select count(keyword) from sogoulog_1 where no1=1 and clickid=1;
这是排名第一的网址第二次才被点到
select count(keyword) from sogoulog_1 where no1=1 and clickid=2;
这是排名第一的网址第三次才被点到
select count(keyword) from sogoulog_1 where no1=1 and clickid=3;
通过这次实验我知道了使用MapReduce对搜狗一天的数据进行数据清洗可以分为以下几个步骤:
1.数据输入 将搜狗一天的数据作为输入,可以使用Hadoop的map来一行一行的读取数据。
2.数据清洗 对于个数不等的空格,可以使用MapReduce中的Mapper来处理,将一行数据中多个空格替换成一个逗号。对于有些行有6个字段,有些达不到6个字段的情况,可以使用MapReduce中的Reducer来处理,将这些不符合要求的行过滤掉,只保留符合要求的行,然后将数据按照字段进行切分存储。
3.数据存储 将清洗后的数据按照字段分割,存储成表格,并将表格存储到Hive中,以便进行后续的数据分析。
4.数据分析 使用Hive的查询语句来统计一天内搜索关键词的个数以及分析结果第一次点击的次数。可以使用COUNT函数来计算搜索关键词的个数,使用ORDER
BY和LIMIT语句来统计排名靠前的点击次数。
总的来说,使用MapReduce对搜狗一天的数据进行数据清洗,可以大大提高数据的处理效率和准确性,并且方便进行后续的数据分析。
然后我还知道了在使用 Hive 进行数据分析时,我们需要将数据清洗后存储到 Hive 表中。这可以通过将 MapReduce 作业的输出写入
HDFS 或直接将处理后的数据上传到 Hive 表中实现。在上传数据时,我们需要指定数据的分隔符和字段的名称,以确保 Hive
正确解析数据。 其次,在编写 Hive
查询语句时,我们需要注意使用合适的聚合函数和运算符来处理数据。例如,要统计搜索关键词的个数,我们可以使用 COUNT
函数;要按点击次数排序,我们可以使用 ORDER BY 子句。此外,我们还需要使用合适的 GROUP BY
子句来按照关键词分组,以便进行聚合计算。
最后,在处理大型数据集时,需要考虑查询性能和资源利用率。为了提高查询性能,可以使用分区分桶和索引来优化查询。而为了充分利用资源,可以使用动态分区和桶化技术来减少数据的扫描量和存储空间。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。