赞
踩
(1)Hadoop是一个由Apache基金会所开发的分布式系统基础架构
(2)主要解决,海量数据的存储和海量数据的分析计算问题
(3)广义上来说,HADOOP通常是指一个更广泛的概念——HADOOP生态圈
(1)Apache版本最原始(最基础)的版本,对于入门学习最好
(2)Cloudera在大型互联网企业中用的较多
(3)Hortonworks文档较好
(1)高可靠性:Hadoop底层维护多个数据副本,所以即使Hadoop某个计算元素或存储出现故障,也不会导致 数据的丢失
(2)高扩展性:在集群间分配任务数据,可方便的扩展数以千计的节点
(3)高效性:在MapReduce的思想下,Hadoop是并行工作的,以加快任务处理速度
(4)高容错性:能够自动将失败的任务重新分配
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ltcMtLMq-1595805237758)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1595471424057.png)]
一个高可靠、高吞吐量的分布式文件系统
一个分布式的离线并行计算框架
作业调度与集群资源管理的框架
RDBMS | Hadoop | |
---|---|---|
格式 | 写数据时要求 | 读数据时要求 |
速度 | 读数据速度快 | 写数据速度快 |
数据监管 | 标准结构化 | 任意结构数据 |
数据处理 | 有限的处理能力 | 强大的处理能力 |
数据类型 | 结构化数据 | 结构化、半结构化、非结构化 |
应用场景 | 交互式OLAP分析ACID事务处理企业业务系统 | 处理非结构化数据 海量数据存储计算 |
前期准备:
(1)准备一台虚拟机,操作系统 centos7.0
(2)配置虚拟机的静态 ip
vi /etc/sysconfig/network-scripts/ifcfg-eno16777736 ----------------------------------------------------- TYPE=Ethernet BOOTPROTO=none DEFROUTE=yes IPV4_FAILURE_FATAL=no IPV6INIT=yes IPV6_AUTOCONF=yes IPV6_DEFROUTE=yes IPV6_FAILURE_FATAL=no NAME=eno16777736 UUID=f19fae49-46da-4b52-b704-6e1ec4c0470e ONBOOT=yes HWADDR=00:0C:29:EC:14:1F IPADDR0=虚拟机地址 PREFIX0=24 GATEWAY0=网关地址 IPV6_PEERDNS=yes IPV6_PEERROUTES=yes DNS1=114.114.114.114 DNS2=8.8.8.8
(3)修改主机名,例如为hadoop1
vi /etc/hosts
-----------------------------------------------------
虚拟机地址 hadoop1
(4)关防火墙:systemctl stop firewalld
(5)创建 hadoop 用户,并配置 Hadoop 用户具有 root 权限
(6)创建指定自己的软件安装包存放路径与安装路径
(7)设置免密登录
ssh-keygen
#3次回车
#拷贝密钥
ssh-copy-id hadoop1
(7)确保jdk安装成功
(8)安装hadoop并配置环境变量
下载安装包、上传、解压、设置软链接、在/etc/profile中设置
export HADOOP_HOME=安装路径
export PATH=
P
A
T
H
:
PATH:
PATH:HADOOP_HOME/bin
export PATH=
P
A
T
H
:
PATH:
PATH:HADOOP_HOME/sbin
再 source /etc/profile 一下
测试hadoop是否安装成功: hadoop version
一个演示示例
mkdir input
cp etc/hadoop/*.xml input
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.14.2.jar grep input output 'dfs[a-z.]+'
cat output/*
export JAVA_HOME=/opt/install/jdk1.8.0_171
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop1:9000</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
hadoop1
bin/hdfs namenode -format
cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml
--------------------------
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
cp etc/hadoop/mapred-site.xml.template etc/hadoop/yarn-site.xml
--------------------------
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
存放启动或停止hadoop相关服务的脚本
总启动HDFS:sbin/start-dfs.sh start-all.sh
分别启动: hadoop-daemons.sh start|stop datanode/namenode/
yarn-daemon.sh start |stop resourcemanager|nodemanager
start-dfs.sh
start-yarn.sh
存放对hadoop相关服务(HDFS,YARN)进行操作的脚本
hadoop的配置文件目录,存放hadoop的配置文件
Hadoop核心全局配置文件,可以其他配置文件中引用该文件中定义的属性,如在hdfs-site.xml及mapred-site.xml中会引用该文件的属性;该文件的模板文件存在于$HADOOP_HOME/src/core/core-default.xml,可将模板文件复制到conf目录,再进行修改。
Hadoop环境变量
HDFS配置文件,该模板的属性继承于core-site.xml;该文件的模板文件存$HADOOP_HOME/src/hdfs/hdfs-default.xml,可将模板文件复制到conf目录,再进行修改
yarn的配置文件,该模板的属性继承于core-site.xml;该文件的模板文件存于$HADOOP_HOME/src/mapred/mapredd-default.xml,可将模板文件复制到conf目录,再进行修改
用于设置所有的slave的名称或IP,每行存放一个。如果是名称,那么设置的slave名称必须在/etc/hosts有IP映射配置
该目录下存放的是Hadoop运行时依赖的jar包,Hadoop在执行时会把lib目录下面的jar全部加到classpath中。
该目录存放的是Hadoop运行的日志,查看日志对寻找Hadoop运行错误非常有帮助
存放hadoop的依赖jar包和文档,文档可以被删除掉
存放hadoop的本地库(对数据进行压缩解压缩功能)
随着数据量越来越大,在一个操作系统管辖的范围内存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统。HDFS只是分布式文件管理系统中的一种。HDFS的设计适合一次写入,多次读出的场景,且不支持文件的修改。适合用来做数据分析,并不适合用来做网盘应用。
关键词:文件管理系统、分布式、目录树结构
Hadoop1与Hadoop2的区别Hadoop1.x 主要由 HDFS 和 MapReduce 构成
Hadoop2.x 主要由 HDFS、YARN、MapReduce 构成
在 Hadoop1.x 时代,Hadoop 中的 MapReduce 同时处理业务逻辑运算和资源的调度,耦合性较大。还有一个比较大的问题是 Hadoop1.x 只能运行 MapReduce程序。
在 Hadoop2.x 时代,分离了 MapReduce 部分功能,将资源调度和运算分开,增加了 Yarn。Yarn 只负责资源的调度,MapReduce 只负责运算。而且 Yarn 不仅仅能运行 MapReduce 程序,还可以运行后面会学习Spark 应用程序。可以说Yarn 目前发展成为一个通用的资源调度框架。很多计算框架都支持在 Yarn 上运行。
数据自动保存多个副本。它通过增加副本的形式,提高容错性。某一个副本丢失以后,它可以自动恢复。
数据规模:能达PB级别 文件规模:百万级别
能保证数据的一致性
通过多副本机制,提高可靠性
存储大量小文件的话,它会占用 NameNode大量的内存来存储文件、目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。小文件存储的寻道时间会超过读取时间,它违反了HDFS的设计目标。
一个文件只能有一个写,不允许多个线程同时写。仅支持数据 append(追加),不支持文件的随机修改。
Master,它是一个主管、管理者。也叫 HDFS 的元数据节点。集群中只能有一个活动的 NameNode 对外提供服务。它管理诸如数据块的映射、副本分配等信息,又叫元数据信息,这些信息存储于内存而不是磁盘中。
HDFS 很方便的一点就是对于用户来说很友好,用户不考虑细节的话,看到的目录结构和我们使用 Window 和Linux 文件系统很像。
一个文件对应的块的名字以及块被存储在哪些数据节点(datanode)
每一个文件备份有多少,以及存储在哪里
就是 Slave。实际存储数据块的节点,NameNode 下达命令,DataNode 执行实际的操作
并非 NameNode 的热备。当 NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务
secondary namenode的具体工作
Secondary NameNode 的工作与 HDFS 设计是相关的,主要针对元数据设计的。它维护了两种文件 Fsimage 和 Edits,Fsimage 镜像文件,是元数据在某个时间段的快照,Edits 记录了生成快照之后的一些列操作。HDFS 在最初格式化启动时,创建 Edits 和 Fsimage 文件,并在内存中维护一版元数据信息,这时候,Fsimage和内存中的元数据信息是相同的。后续每一次客户端操作时,会先记录客户端执行的操作,这个操作是记录 Edits 在文件中的,然后再更新内存中对应的目录树结构,比如用户删除一个文件,会先在 Edits 文件中记录一个 delete 操作,然后在内存中真正删除改文件。也就是说,内存中的元数据信息是完整的。前面生成的快照 Fsimage 只是元数据的一部分,执行完 Edits 文件中相关操作才能与内存中元数据相同。为什么要这么设计呢?首先,为什么不直接更新 Fsimage,而是要新添加 Edits 文件。这里就需要明确Fsimage里面存的是元数据目录树信息,其实是一个内存对象序列化后的内容。要更新这个文件,首先得反序列化对象加载到内存中,在实际工作,这个文件是很大,序列化和反序列化过程会很繁重,速度会很慢。而 Edits 文件只需要 append操作记录即可。这样既保证了元数据不会丢失,也提高了性能。
SecondaryNameNode 具体干什么事情?当 HDFS 运行一段时间后,需要重启动时,需要将 Fsimage 加载到内存中,并把 Eidts 文件中的操作执行一遍,才是完整的元数据信息。假如操作记录比较频繁或者长时间没有重启过,Edits 文件会很大。重启的时候合并 Fsimage+Edits文件的操作也是很耗时的,增加了启动时间。SecondaryNameNode 是一个独立的进程,定期(满足一定条件)会将 Fsimage+Edits 合并成一个新的Fsimage,减少 HDFS 重启时间。
自己编写的代码+Hadoop API
文件上传 HDFS 的时候,Client 将文件切分成一个一个的Block,然后进行存储。
HDFS 中的文件在物理上是分块存储(block),块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在 Hadoop2.x 版本中是 128M,Hadoop1.x 版本中是 64M。HDFS 的块比磁盘的块大,其目的是为了最小化寻址开销。如果块设置得足够大,从磁盘传输数据的时间会明显大于定位这个块开始位置所需的时间。因而,传输一个由多个块组成的文件的时间取决于磁盘传输速率。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-offZUijk-1595805237761)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1595490052431.png)]
默认副本数为3,分别子本地机架节点,同一个机架不同节点,不同机架通过配置可以自己调整。
hdfs dfs
[-appendToFile … ] 把本地文件内容追加到hdfs中的指定文件
[-cat [-ignoreCrc] …]
[-checksum …]
[-chgrp [-R] GROUP PATH…]
[-chmod [-R] <MODE[,MODE]… | OCTALMODE> PATH…]
[-chown [-R] [OWNER][:[GROUP]] PATH…]
[-copyFromLocal [-f] [-p] [-l] … ]
[-copyToLocal [-p] [-ignoreCrc] [-crc] … ]
[-count [-q] [-h]
[-cp [-f] [-p | -p[topax]] … ]
[-createSnapshot []]
[-deleteSnapshot ]
[-df [-h] [
[-du [-s] [-h]
[-expunge]
[-get [-p] [-ignoreCrc] [-crc] … ]
[-getfacl [-R]
[-getfattr [-R] {-n name | -d} [-e en]
[-getmerge [-nl] ]
[-help [cmd …]]
[-ls [-d] [-h] [-R] [
[-mkdir [-p]
[-moveFromLocal … ] 上传文件(剪切)
[-moveToLocal ] 下载文件
[-mv … ] 剪切文件
[-put [-f] [-p] [-l] … ] 上传文件(复制)
[-renameSnapshot ]
[-rm [-f] [-r|-R] [-skipTrash] …]
[-rmdir [–ignore-fail-on-non-empty]
windows下安装hadoop,配置环境变量,创建Maven工程,导入依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0-cdh5.14.2</version>
</dependency>
创建包名:com.kgc.hdfs,创建 TestHadoop 类,使用 Junit 方式测试
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Test; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; public class TestHadoop { @Test public void testMkdirs() throws URISyntaxException, IOException, InterruptedException { //1.创建配置 Configuration conf = new Configuration(); //2.获取文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"),conf,"root"); //3.调用API操作 fs.mkdirs(new Path("/output")); //4.关闭资源 fs.close(); } @Test public void testCopyFromLocalFile() throws URISyntaxException, IOException, InterruptedException { // 1.创建配置文件 Configuration conf = new Configuration(); // 2.获取文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), conf,"root"); // 3.调用API操作 fs.copyFromLocalFile(new Path("E:\\ideaProjects\\hadoopLearn\\data\\mobile.txt"), new Path("/hdfs/mobile2.txt")); // 4.关闭资源 fs.close(); } @Test public void testCopyToLocalFile() throws URISyntaxException, IOException, InterruptedException { // 1.创建配置文件 Configuration conf = new Configuration(); // 2.获取文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), conf,"root"); // 3.调用API操作 fs.copyToLocalFile(new Path("/hdfs/mobile2.txt"),new Path("E:\\ideaProjects\\hadoopLearn\\data")); // 4.关闭资源 fs.close(); } @Test public void testDelete() throws URISyntaxException, IOException, InterruptedException { // 1.创建配置文件 Configuration conf = new Configuration(); // 2.获取文件系统 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), conf,"root"); // 3.调用API操作 // b: 代表是否递归删除 fs.delete(new Path("/hdfs"),true); // 4.关闭资源 fs.close(); } }
客户端通过 Distributed FileSystem 模块向 NameNode 请求上传文件,NameNode 检查目标文件是否已存在,父目录是否存在,然后告知客户端。
确定可以上传,则客户端请求第一个 block 上传到哪几个 datanode 服务器上。
假定分别为 dn1、dn2、dn3
客户端通过 FSDataOutputStream 模块请求 dn1 上传数据,dn1 收到请求会继续调用 dn2,然后 dn2 调用 dn3,将这个通信管道建立完成。
客户端开始往 dn1 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet(64KB)为单位,dn1 收到一个 packet 就会传给 dn2,dn2传给 dn3;dn1 每传一个 packet 会放入一个应答队列等待应答。
当一个 block 传输完成之后,客户端再次请求 NameNode 上传第二个 block的服务器(重复执行 3-7 步)。每个block所上传的节点位置是不同的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-G0KRvGj3-1595805237763)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1595493612887.png)]
首先调用 FileSystem.open()方法,获取到 DistributedFileSystem 实例。DistributedFileSystem 向 Namenode 发起 RPC(远程过程调用)请求获得文件的开始部分或全部 block 列表,对于每个返回的块,都包含块所在的DataNode 地址。这些 DataNode 会按照 Hadoop 定义的集群拓扑结构得出客户端的距离,然后再进行排序。如果客户端本身就是一个 DataNode,那么他将从本地读取文件。
DistributedFileSystem 会向客户端 client 返回一个支持文件定位的输入流对象 FSDataInputStream,用于客户端读取数据。FSDataInputStream 包含一个 DFSInputStream 对象,这个对象用来管理 DataNode 和NameNode 之间的 I/O。客户端调用 read()方法,DFSInputStream 就会找出离客户端最近的datanode 并连接datanode。
DFSInputStream 对象中包含文件开始部分的数据块所在的 DataNode 地址,首先它会连接包含文件第一个块最近 DataNode。随后,在数据流中重复调用 read()函数,直到这个块全部读完为止。如果第一个 block 块的数据读完,就会关闭指向第一个 block 块的 datanode 连接,接着读取下一个 block 块。
如果第一批 block 都读完了,DFSInputStream 就会去 NameNode 拿下一批blocks 的 location,然后继续读,如果所有的 block 块都读完,这时就会关闭掉所有的流。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hHgWQc6v-1595805237765)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1595493628101.png)]
1.第一次启动namenode格式化后,创建fsimage和edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
2.客户端对元数据进行增删改的请求
3.namenode记录操作日志,更新滚动日志。
4.namenode在内存中对数据进行增删改
1.Secondary NameNode询问namenode是否需要checkpoint。直接带回namenode是否检查结果。
2.Secondary NameNode请求执行checkpoint。
3.namenode滚动正在写的edits日志
4.将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
5.Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
6.生成新的镜像文件fsimage.chkpoint
7.拷贝fsimage.chkpoint到namenode
8.namenode将fsimage.chkpoint重新命名成fsimage
namenode被格式化之后,将在/opt/install/hadoop/data/tmp/dfs/name/current目录中产生如下文件:
fsimage_0000000000000000000
fsimage_0000000000000000000.md5
seen_txid
VERSION
HDFS文件系统元数据的一个永久性的检查点,其中包含HDFS文件系统的所有目录和文件idnode的序列化信息。
存放HDFS文件系统的所有更新操作的路径,文件系统客户端执行的所有写操作首先会被记录到edits文件中。
保存的是一个数字,就是最后一个edits_的数字。
每次Namenode启动的时候都会将fsimage文件读入内存,并从00001开始到seen_txid中记录的数字依次执行每个edits里面的更新操作,保证内存中的元数据信息是最新的、同步的,可以看成Namenode启动的时候就将fsimage和edits文件进行了合并。
Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。 就是因为这个特点使得 MapReduce 编程变得非常流行。
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上面上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
适合 PB 级以上海量数据的离线处理。这里加红字体离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce 很难做到。
MapReduce 无法像 Mysql 一样,在毫秒或者秒级内返回结果。
流式计算的输入数据时动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
1.分布式的运算程序往往需要分成至少2个阶段。
2.第一个阶段的maptask并发实例,完全并行运行,互不相干。
3.第二个阶段的reduce task并发实例互不相干,但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出。
4.MapReduce编程模型只能包含一个map阶段和一个reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个mapreduce程序,串行运行。
一个完整的mapreduce程序在分布式运行时有三类实例进程:
负责整个程序的过程调度及状态协调
负责map阶段的整个数据处理流程
负责reduce阶段的整个数据处理流程
用户编写的程序分成三个部分:Mapper,Reducer,Driver
需要明确两点:
1.一个记录调用一次 map()方法。
2.相同的 key 调用一次 reduce()方法
(1)用户自定义的 Mapper 要继承框架提供的 Mapper 类。
(2)Mapper 的输入数据是 KV 键值对的形式(KV 的类型可自定义)。
(3)对数据的处理逻辑写在 Mapper 类中 map()方法中。
(4)Mapper 的输出数据是 KV 键值对的形式(KV 的类型可自定义)。
(5)map()方法(maptask 进程)每一个<K,V>数据执行一次。
(1)用户自定义的 Reducer 要继承框架提供的 Reducer 父类。
(2)Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV。
(3)Reducer 的业务逻辑写在 reduce()方法中。
(4)每一组相同 k 的<k,Iterator>组调用一次 reduce()方法。
整个程序需要编写一个 Drvier 来进行提交,将自定义 Mapper 和 Reducer 类
组合成一个 job,并提交 job 对象
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e9bKBhgr-1595805237768)(C:\Users\Administrator\Desktop\常用数据序列化类型.png)]
自定义 bean 对象要想序列化传输,必须实现序列化接口,需要注意以下几项:
public FlowBean() {
super();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
@Override
public int compareTo(FlowBean o) {
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
首先mapreduce会根据要运行的大文件来进行split,每个输入分片(input split)针对一个map任务,输入分片(InputSplit)存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。输入分片(InputSplit)通常和HDFS的block(块)关系很密切,假如我们设定HDFS的块的大小是128MB,我们运行的大文件是128x10MB,MapReduce会分为10个MapTask,每个MapTask都尽可能运行在block(块)所在的DataNode上,体现了移动计算不移动数据的思想。
map阶段就是执行自己编写的Mapper类中的map函数,Map过程开始处理,MapTask会接受输入分片,通过不断的调用map()方法对数据进行处理。处理完毕后,转换为新的<KEY,VALUE>键值对输出。
shuffle阶段主要负责将map端生成的数据传递给reduce端,因此shuffle分为在map端的过程和在reduce端的执行过程。具体过程如下:
(1)MapTask收集map()方法的输出<KEY,VALUE>对,放到内存缓冲区(称为环形缓冲区)中,其中环形缓冲区的大小默认是100MB。
(2)环形缓冲区到达一定阈值(环形缓冲区大小的80%)时,会将缓冲区中的数据溢出本地磁盘文件,这个过程中可能会溢出多个文件。
(3)多个溢出文件会被合并成大的溢出文件。
(4)在溢出过程中,及合并的过程中,都要调用Partitioner进行分区和针对key进行排序sort。
(5)合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程。
Reduce对Shffule阶段传来的数据进行最后的整理合并。Reduce根据自己的分区号,去各个maptask机器上取相应的结果分区数据。ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序),从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法,生成最终的输出文件。
注意:
Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认100M。
InputFormat 的主要功能就是确定每一个 map 任务需要读取哪些数据以及如何读取数据的问题,每一个 map 读取哪些数据由 InputSplit(数据切片)决定,如何读取数据由 RecordReader 来决定。InputFormat 中就有获取 InputSplit 和RecordReader 的方法。
FileInputFormat(常用)、DBInputFormat
1.找到你数据存储的目录
2.开始遍历处理(规划切片)目录下的每一个文件
3.遍历第一个文件hello.txt
(1)获取文件大小fs.sizeOf(hello.txt);
(2)计算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
(3)默认情况下,切片大小=blocksize
(4)开始切片,形成第1个切片:ss.txt—0:128M 第2个切片ss.txt—128:256M 第3个切片hello.txt—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
(5)将切片信息写到一个切片规划文件中
(6)整个切片的核心过程在getSplit()方法中完成。
(7)数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
注意:block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。
4.提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。
(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
FileSplit inputSplit = (FileSplit) context.getInputSplit();
maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,mapTask并行任务是否越多越好呢?
决定机制:一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。
MapTask通过RecordReader,从输入InputSplit中解析出一个个key/value
该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
Mapreduce确保每个reducer的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入传给reducer)称为Shuffle。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xURMUSNn-1595805237769)(C:\Users\Administrator\Desktop\Shuffle结构图.png)]
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
(1)自定义类继承Partitioner,重写getPartition()方法
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 获取电话号码的前三位 String preNum = key.toString().substring(0, 3); int partition = 4; // 2 判断是哪个省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
(2)在job驱动中,设置自定义partitioner
job.setPartitionerClass(CustomPartitioner.class);
(3)自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task
job.setNumReduceTasks(5);
如果reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000;
例如:假设自定义分区数为5,则
job.setNumReduceTasks(1);会正常运行,只不过会产生一个输出文件
job.setNumReduceTasks(2);会报错
job.setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均会对数据(按照key)进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
对于Map Task,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。
对于Reduce Task,它从每个Map Task上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,Reduce Task统一对内存和磁盘上的所有数据进行一次合并。
对reduce阶段的数据根据某一个或几个字段进行分组。
1.combiner是MR程序中Mapper和Reducer之外的一种组件
2.combiner组件的父类就是Reducer
3.combiner和reducer的区别在于运行的位置
Combiner是在每一个maptask所在的节点运行
Reducer是接收全局所有Mapper的输出结果
4.combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量
5.combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来
Mapper
3 5 7 ->(3+5+7)/3=5
2 6 ->(2+6)/2=4
Reducer
(3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
6.自定义Combiner实现步骤:
(1)自定义一个combiner继承Reducer,重写reduce方法
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable v :values){
count = v.get();
}
context.write(key, new IntWritable(count));
}
}
(2)在job驱动类中设置:
job.setCombinerClass(WordcountCombiner.class);
reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:
//默认值是1,手动设置为4
job.setNumReduceTasks(4);
(1)reducetask=0 ,表示没有reduce阶段,输出文件个数和map个数一致。
(2)reducetask默认值就是1,所以输出文件个数为一个。
(3)如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜
(4)reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask。
(5)具体多少个reducetask,需要根据集群性能而定。
(6)如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在maptask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1肯定不执行。
(1)Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
原理:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行合并就ok了。
该方法的缺点:
这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,是因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们可以充分利用mapreduce框架的特性,让他按照join key进行分区,将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
使用场景:一张表十分小、一张表很大
使用方法:
在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join (比如放到Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key/value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
数据倾斜原因:
如果是多张表的操作都是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。
解决方案:
在map端缓存多张表,提前处理业务逻辑,这样增加map端业务,减少reduce端数据的压力,尽可能的减少数据倾斜。
具体办法:采用distributedcache
(1)在mapper的setup阶段,将文件读取到缓存集合中
(2)在驱动函数中加载缓存。
job.addCacheFile(new URI("file:/d:/mapjoincache/hello.txt"));// 缓存普通文件到task运行节点
在编写mapreduce程序时,需要考虑的几个方面:
默认使用的实现类是:TextInputFormat 。TextInputFormat的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为value返回。CombineTextInputFormat可以把多个小文件合并成一个切片处理,提高处理效率。用户还可以自定义InputFormat。
用户根据业务需求实现其中三个方法:map() setup() cleanup ()
有默认实现 HashPartitioner,逻辑是根据key的哈希值和numReduces来返回一个分区号;key.hashCode()&Integer.MAXVALUE % numReduces如果业务上有特别的需求,可以自定义分区。
当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中的compareTo()方法。
部分排序:对最终输出的没一个文件进行内部排序。
全排序:对所有数据进行排序,通常只有一个Reduce。
二次排序:排序的条件有两个。
Combiner合并可以提高程序执行效率,减少io传输。但是使用时必须不能影响原有的业务处理结果。
reduceTask拿到输入数据(一个partition的所有数据)后,首先需要对数据进行分组,其分组的默认原则是key相同,然后对每一组kv数据调用一次reduce()方法,并且将这一组kv中的第一个kv的key作为参数传给reduce的key,将这一组数据的value的迭代器传给reduce()的values参数。
YARN(Yet Another Resource Negotiator)核心思想是将资源管理和任务的监控和调度分离
通用的资源管理系统,可为不同的应用(MapReduce、Spark、Flink等)提供统一的资源管理和调度它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
组成:Resource Scheduler、Application Manager
功能:
处理客户端请求、
监控NodeManager、
启动和监控ApplicationMaste,进行必要的重启、
整个系统的资源分配和调度
本节点上的资源管理和任务管理
定时向ResourceManager汇报本节点上的资源使用情况和各个Container的运行情况
接收和处理来自ResourceManager的Container启动和停止的各种命令
处理来自ApplicationMaster的指令,比如启动MapTask和ReduceTask指令
每个应用程序对应一个ApplicationMaster,负责单个应用程序的管理
功能:
负责数据切分
为应用程序向ResourceManager申请资源(Container),并分配内部任务(MapTask和ReduceTask)
与NodeManager通信来启动/停止任务,Task都是运行在Container中的
负责任务的监控和容错,当某些Task运行出错,进行容错处理
Container是YARN中的资源抽象,封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等
Container类似于一个虚拟机,可以在上面执行任务
(1)client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
(2)client向ResourceManager申请一个作业Id。
(3)ResourceManager给Client返回该job资源的提交路径(HDFS路径)和作业Id,每一个作业都有一个唯一的Id。
(4)Client发送jar包、切片信息和配置文件到指定的资源提交路径。
(5)Client提交完资源后,向ResourceManager申请运行MrAppMaster(针对该job的ApplicationMaster)。
(6)当ResourceManager收到Client的请求后,将该job添加到容量调度器(Resouce Scheduler)中。
(7)某一个空闲的NodeManager领取到该job。
(8)该NodeManager创建Container,并产生MrAppMaster。
(9)下载Client提交的资源到本地,根据分片信息生成MapTask和ReduceTask。
(10)MrAppMaster向ResouceManager申请运行多个MapTask任务资源。
(11)ResourceManager将运行MapTask任务分配给空闲的多个NodeManager,NodeManager分别领取任务并创建容器(Container)。
(12)MrAppMaster向两个接收到任务的NodeManager发送程序启动脚本,每个接收到任务的NodeManager启动MapTask,MapTask对数据进行处理,并分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向ResourceManager申请容器(Container),运行ReduceTask。
(14)程序运行完毕后,MrAppMaster会向ResourceManager申请注销自己。
(15)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。可以使用YARN WebUI查看任务执行状态。
除了向应用管理器请求作业进度外, 客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
FIFO:先进先出调度器
Capacity Scheduler:容量调度器
Fair Scheduler:公平调度器
提交任务:hadoop jar
查看正在运行的任务:yarn application -list
杀掉正在运行的任务:yarn application -kill 任务id
查看节点列表:yarn node -list
查看节点状态:yarn node -status 节点ID
Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目
Zookeeper=文件系统+通知机制
Zookeeper从设计模式上来看是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册。一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式。
# 1.解压安装包 tar zxvf zookeeper-3.4.5-cdh5.14.2.tar.gz -C /opt/install/ # 2.修改配置文件 cd /opt/install/zookeeper-3.4.5-cdh5.14.2/conf cp zoo_sample.cfg zoo.cfg vi zoo.cfg ------------------ # 数据存放目录 dataDir=/opt/install/zookeeper-3.4.5-cdh5.14.2/zkData # zookeeper集群 server.2=hadoop2:2888:3888 server.3=hadoop3:2888:3888 server.4=hadoop4:2888:3888 ------------------ # 3.创建数据存放文件夹 cd /opt/install/zookeeper-3.4.5-cdh5.14.2/ mkdir zkData # 4.分发 scp -r zookeeper-3.4.5-cdh5.14.2/ root@hadoop3:$PWD scp -r zookeeper-3.4.5-cdh5.14.2/ root@hadoop4:$PWD # 5.接下来需要在每个节点上的zkData目录下创建一个myid文件,里边写一个数字,数值不能重复 cd /opt/install/zookeeper-3.4.5-cdh5.14.2/zkData/ vi myid
启动zookeeper
bin/zkServer.sh start
启动客户端
bin/zkCli.sh
zode:
ZooKeeper数据模型的结构与Linux文件系统很像,整体上可以看作是一棵树,树的每个节点称做一个znode。每一个znode默认能够存储1MB的数据,每个znode都可以通过其路径唯一标识。
节点类型:
PERSISTENT:持久化节点,默认类型
PERSISTENT_SEQUENTIAL:持久化顺序编号节点
EPHEMERAL:临时节点
EPHEMERAL_SEQUENTIAL:临时顺序编号节点
负责进行投票的发起和决议,更新系统状态。
用于接收客户端请求并向客户端返回结果,在选主过程中参与投票。
可以接收客户端连接,将写请求转发给Leader节点。但Observer不参加投票过程,只同步Leader状态。Observer的目的是为了扩展系统,提高读取速度。
Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举:
1.服务器初始化启动 2.服务器运行期间无法和Leader保持连接
若进行Leader选举,则至少需要两台机器,这里选取3台机器组成的服务器集群为例。
<1>初始化启动时:
在集群初始化阶段,当有一台服务器ZK1启动时,其单独无法进行和完成Leader选举,当第二台服务器ZK2启动时,此时两台机器可以相互通信,每台机器都试图找到Leader,于是进入Leader选举过程。选举过程开始,过程如下:
(1) 每个Server发出一个投票。由于是初始情况,ZK1和ZK2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用**(myid, ZXID)**来表示,此时ZK1的投票为(1, 0),ZK2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行比较,规则如下
· 优先检查ZXID。ZXID比较大的服务器优先作为Leader。
· 如果ZXID相同,那么就比较myid。myid较大的服务器作为Leader服务器。
对于ZK1而言,它的投票是(1, 0),接收ZK2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时ZK2的myid最大,于是ZK2胜。ZK1更新自己的投票为(2, 0),并将投票重新发送给ZK2。
(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于ZK1、ZK2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出ZK2作为Leader。
(5) 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。当新的Zookeeper节点ZK3启动时,发现已经有Leader了,不再选举,直接将直接的状态从LOOKING改为FOLLOWING
<2>运行期Leader宕机时
假设正在运行的有ZK1、ZK2、ZK3三台服务器,当前Leader是ZK2,若某一时刻Leader挂了,此时便开始Leader选举。
集群中半数以上机器存活,集群可用
基于Hadoop的数据仓库解决方案
将结构化的数据文件映射为数据库表,提供类sql的查询语言HQL(HiveQuery Language)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2HXJRMqY-1595805237771)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1595643675184.png)]
CLI(Hive Shell)、JDBC/ODBC(Java 访问 Hive)、WEBUI(浏览器访问 Hive)
元数据包括:表名、表所属的数据库(默认是 default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的 derby 数据库中,推荐使用 MySQL 存储 Metastore,可以在安装的时候,进行相应配置,以使用 MySQL 存储元数据。
Hive 底层是使用 HDFS 进行存储,使用 MapReduce 进行计算。
(1)解析器(SQL Parser):将 SQL 字符串转换成抽象语法树 AST,这一步
一般都用第三方工具库完成,比如 antlr;对 AST 进行语法分析,比如表是否存
在、字段是否存在、SQL 语义是否有误。
(2)编译器(Physical Plan):将 AST 编译生成逻辑执行计划。
(3)优化器(Query Optimizer):对逻辑执行计划进行优化。
(4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。
对于 Hive 来说,一般常用的为 MR/TEZ/Spark。
类型 | 示例 | 类型 | 示例 |
---|---|---|---|
TINYINT | 10 | SMALLINT | 10 |
INT | 10 | BIGINT | 100L |
FLOAT | 1.342 | DOUBLE | 1.234 |
DECIMAL | 3.14 | BINARY | 1010 |
BOOLEAN | TRUE | STRING | ’Book’ or "Book" |
CHAR | ’YES’ or "YES" | VARCHAR | ’Book’ or "Book" |
DATE | ’2013-01-31’ | TIMESTAMP | ’2020-01-31 00:13:00.345’ |
类型 | 格式 | 定义 | 示例 |
---|---|---|---|
ARRAY | [‘Apple’,‘Orange’,'Mongo‘] | ARRAY | a[0] = 'Apple’ |
MAP | {‘A’:‘Apple’,‘O’:‘Orange’} | MAP<string,string> | b[‘A’] = 'Apple’ |
STRUCT | {‘Apple’,2} | STRUCTfruit:string,weight:int | c.weight = 2 |
数据结构 | 描述 | 逻辑关系 | **物理存储(**HDFS) |
---|---|---|---|
Database | 数据库 | 表的集合 | 文件夹 |
Table | 表 | 行数据的集合 | 文件夹 |
Partition | 分区 | 用于分割数据 | 文件夹 |
Buckets | 分桶 | 用于分布数据 | 文件 |
Row | 行 | 行记录 | 文件中的行 |
Columns | 列 | 列记录 | 每行中指定的位置 |
Views | 视图 | 逻辑概念,可跨越多张表 | 不存储数据 |
Index | 索引 | 记录统计数据信息 | 文件夹 |
nohup hive --service hiveserver2 &
beeline -u jdbc:hive2://localhost:1000
默认数据库路径:/home/hadoop/hive/warehouse
create database if not exists myhivebook;
use myhivebook;
show databases;
describe database default;
alter database myhivebook set owner user hive;
select current_database();
--库非空情况下强制删除
drop database if exists myhivebook cascade;
--如果库空则可直接删除
drop database myhivebook;
1.HDFS中为所属数据库目录下的子文件夹
2.数据完全由Hive管理,删除表(元数据)会删除数据
3.内部表又称管理表。Hive 默认情况下会将这些表的数据存储在由配置项hive.metastore.warehouse.dir(例如,/home/hadoop/hive/warehouse)所定义的目录的子目录下。 当我们删除一个管理表时,Hive 也会删除这个表中数据。
4.如果新创建表 student 在默认数据库 default 下,创建时默认路径为:/home/hadoop/hive/warehouse/student
5.如果是在其他数据库下,比如 test1 数据库,创建表时默认存储路径为:
/home/hadoop/hive/warehouse/test1.db/student
1.数据保存在指定位置的HDFS路径中
2.Hive不完全管理数据,删除表(元数据)不会删除数据
创建内部表:
create table if not exists student(id int, name string)
row format delimited fields terminated by '\t'
stored as textfile
location '/home/hadoop/hive/warehouse/student';
创建外部表:
CREATE EXTERNAL TABLE IF NOT EXISTS employee_external (
name string,
work_place ARRAY<string>,
sex_age STRUCT<sex:string,age:int>,
skills_score MAP<string,int>,
depart_title MAP<STRING,ARRAY<STRING>>
)
COMMENT 'This is an external table'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
STORED AS TEXTFILE/SEQUENCEFILE/RCFILE
LOCATION '/home/hadoop/hive/warehouse/employee';
CTAS – as select方式建表
CREATE TABLE ctas_employee as SELECT * FROM employee;
CTE
CREATE TABLE cte_employee AS
WITH
r1 AS (SELECT name FROM r2 WHERE name = 'Michael'),
r2 AS (SELECT name FROM employee WHERE sex_age.sex= 'Male'),
r3 AS (SELECT name FROM employee WHERE sex_age.sex= 'Female')
SELECT * FROM r1 UNION ALL SELECT * FROM r3;
like(复制表结构,不复制数据)
CREATE TABLE employee_like LIKE employee;
desc formatted student;
desc student;
DROP TABLE IF EXISTS employee [With PERGE];
--With PERGE直接删除(可选),否则会放到 .Trash目录
TRUNCATE TABLE employee;
ALTER TABLE employee RENAME TO new_employee;--修改表名
ALTER TABLE c_employee SET TBLPROPERTIES ('comment'='New name, comments');
ALTER TABLE employee_internal SET SERDEPROPERTIES ('field.delim' = '$');
ALTER TABLE c_employee SET FILEFORMAT RCFILE; -- 修正表文件格式
-- 修改表的列的操作
ALTER TABLE employee_internal CHANGE old_name new_name STRING; -- 修改列名
ALTER TABLE c_employee ADD COLUMNS (work string); -- 添加列
ALTER TABLE c_employee REPLACE COLUMNS (name string); -- 替换列
LOAD DATA LOCAL INPATH '/home/dayongd/Downloads/employee.txt'
OVERWRITE INTO TABLE employee;
-- 加LOCAL关键字,表示原始文件位于Linux本地,执行后为拷贝数据
LOAD DATA LOCAL INPATH '/home/dayongd/Downloads/employee.txt'
OVERWRITE INTO TABLE employee_partitioned PARTITION (year=2014, month=12);
LOAD DATA INPATH '/tmp/employee.txt'
-- 没有LOCAL关键字,表示文件位于HDFS文件系统中,执行后为直接移动数据
表只对当前 session 有效,session 退出后,表自动删除如果创建的临时表表名已存在,那么当前 session 引用到该表名时实际用的是临时表,只有 drop 或 rename 临时表名才能使用原始表临时表限制:不支持分区字段和创建索引。
分区表实际上就是对应一个 HDFS 文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区,这样的查询效率会提高很多。
create table dept_partition( deptno int, dname string, loc string)
partitioned by (month string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/datas/dept.txt' into table dept_partition
partition(month='201907');
load data local inpath '/opt/datas/dept.txt' into table dept_partition
partition(month='201908');
load data local inpath '/opt/datas/dept.txt' into table dept_partition
partition(month='201909');
select * from dept_partition where month='201909';
select * from dept_partition where month='201909'
union all
select * from dept_partition where month='201908'
union all
select * from dept_partition where month='201907';
alter table dept_partition
add partition(month='201905') partition(month='201904');
alter table dept_partition drop partition (month='201905'), partition
(month='201906');
show partitions dept_partition;
desc formatted dept_partition;
create table dept_partition2(deptno int, dname string, loc string)
partitioned by (month string, day string)
row format delimited fields terminated by '\t';
当使用静态分区时,在向分区表中插入数据时,我们需要指定具体分区列的值。此外,hive 还支持动态提供分区值(即在插入数据时,不指定具体的分区列值,而是仅仅指定分区字段)。
(1)前提
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
(2)创建
create table dynamic_people(id int,name string,age int,start_date date)
partitioned by (year string,month string)
row format delimited
fields terminated by ‘,’;
(3)插入数据(只能通过insert方式)
insert into dynamic_people
partition(year,month)
select id, name, age, start_date, year(start_date), month(start_date) from people;
(4)查询
select * from dynamic_people where year = 2018;
select * from dynamic_people where year = 2017 and month = 11;
(1)创建
create table test_bucket(id int, name string, age int)
clustered by (age) into 4 buckets
row format delimited
fields terminated by ’ ';
(2)插入数据
insert into table test_bucket
select id, name, age
from people;
1.通过隐藏子查询、连接和函数来简化查询的逻辑结构
2.只保存定义,不存储数据
3.如果删除或更改基础表,则查询视图将失败
4.视图是只读的,不能插入或装载数据
支持 CTE, ORDER BY, LIMIT, JOIN,等
CREATE VIEW view_name AS SELECT statement;
SHOW TABLES;
SHOW CREATE TABLE view_name;
DROP view_name;
ALTER VIEW view_name SET TBLPROPERTIES (‘comment’ = ‘This is a view’);
ALTER VIEW view_name AS SELECT statement;
与表生成函数结合使用,将函数的输入和输出连接
OUTER关键字:即使output为空也会生成结果
select name,work_place,loc from employee lateral view outer explode(split(null,',')) a as loc;
支持多层级
select name,wps,skill,score from employee
lateral view explode(work_place) work_place_single as wps
lateral view explode(skills_score) sks as skill,score;
通常用于规范化行或解析JSON
(1)select
(2)from
(3)join on
(4) where
(5)group by
(6)having
(7)distribute by/cluster by
(8) sort by
(9) order by
(10) limit
(11) union(去重不排序)/union all(不去重不排序)
(1)from
(2)on
(3)join
(4)where
(5)group by
(6)having
(7)select
(8)distinct
(9)distribute by /cluster by
(10)sort by
(11) order by
(12) limit
(13) union /union all
with table1 as (select …)
select * from table1;
SELECT * FROM (SELECT * FROM employee) a;
内连接:INNER JOIN
外连接:OUTER JOIN
RIGHT JOIN, LEFT JOIN, JOIN
交叉连接:CROSS JOIN
隐式连接:select … from table1, table2 on table1.xxx=table2.xxx;
用来合并多个select的查询结果,需要保证select中字段须一致,
每个select语句返回的列的数量和名字必须一样
UNION ALL:合并后保留重复项
UNION:合并后删除重复项
例:
select key from
(select key from table1) sub
union all
select key from table2
insert into employee select * from ctas_employee;
from ctas_employee
insert overwrite table employee select *
insert overwrite table employee_internal select *;
--(同时插入两张表)
from ctas_patitioned
insert overwrite table employee PARTITION (year, month)
select *,'2018','09';
--通过指定列插入(insert into可以省略table关键字)
insert into employee(name) select 'John' from test limit 1;
insert into employee(name) value('Judy'),('John');
from ctas_employee
insert overwrite local directory '/tmp/out1' select *
from ctas_employee
insert overwrite directory '/tmp/out1' select *
EXPORT TABLE employee TO '/tmp/output3';
EXPORT TABLE employee_partitioned partition (year=2014, month=11) TO '/tmp/output5';
IMPORT TABLE employee FROM '/tmp/output3';
IMPORT TABLE employee_partitioned partition (year=2014, month=11) FROM '/tmp/output5';
select * from offers order by
case when offerid = 1 then 1 else 0
end;
SELECT department_id , name, employee_id, evaluation_score
FROM employee_hr
DISTRIBUTE BY department_id SORT BY evaluation_score DESC;
不支持ASC|DESC
排序列必须出现在SELECT column列表中
为了充分利用所有的Reducer来执行全局排序,可以先使用CLUSTER BY,然后使用ORDER BY
SELECT name, employee_id FROM employee_hr CLUSTER BY name;
select category, max(offervalue) from offers group by category;
select if(category > 4000, 'GOOD', 'BAD') as newcat,max(offervalue) from offers group by category if(category > 4000, 'GOOD', 'BAD');
select sex_age.age from employee group by sex_age.age having count(*) <= 1;
--同情况下使用子查询
select a.age from ( select count(*) as cnt, sex_age.age
from employee group by sex_age.age ) a where a.cnt <= 1;
max, min, count, sum, avg
含义:分组,但是又不合并
语法:Function (arg1,…, arg n) OVER ([PARTITION BY <…>] [ORDER BY <…>] [<window_clause>])
解释:
PARTITION BY类似于GROUP BY
可同时使用多个窗口函数
函数:
(1)ROW_NUMBER() 1,2,3
(2)RANK() 1,1,3
(3)DENSE_RANK() 1,1,2
可以用row_number()进行去重:
select * from
(
select row_number over(partition by user_id) rn from table1
) as table2
where table2.rn =1;
--等价于
select distinct user_id from table1;
CH1 (1).Hive不支持join的非等值连接,不支持or 分别举例如下及实现解决办法。 不支持不等值连接 错误:select * from a inner join b on a.id<>b.id 替代方法:select * from a inner join b on a.id=b.id and a.id is null; 不支持or 错误:select * from a inner join b on a.id=b.id or a.name=b.name 替代方法:select * from a inner join b on a.id=b.id union all select * from a inner join b on a.name=b.name 两个sql union all的字段名必须一样或者列别名要一样。 (2).分号字符:不能智能识别concat(‘;’,key),只会将‘;’当做SQL结束符号。 •分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如: •select concat(key,concat(';',key)) from dual; •但HiveQL在解析语句时提示: FAILED: Parse Error: line 0:-1 mismatched input '<EOF>' expecting ) in function specification •解决的办法是,使用分号的八进制的ASCII码进行转义,那么上述语句应写成: •select concat(key,concat('\073',key)) from dual; (3).不支持INSERT INTO 表 Values(), UPDATE, DELETE等操作.这样的话,就不要很复杂的锁机制来读写数据。 INSERT INTO syntax is only available starting in version 0.8。INSERT INTO就是在表或分区中追加数据。 (4).HiveQL中String类型的字段若是空(empty)字符串, 即长度为0, 那么对它进行IS NULL的判断结果是False,使用left join可以进行筛选行。 (5).不支持 ‘< dt <’这种格式的范围查找,可以用dt in(”,”)或者between替代。 (6).Hive不支持将数据插入现有的表或分区中,仅支持覆盖重写整个表,示例如下: INSERT OVERWRITE TABLE t1 SELECT * FROM t2; (7).group by的字段,必须是select后面的字段,select后面的字段不能比group by的字段多. 如果select后面有聚合函数,则该select语句中必须有group by语句 而且group by后面不能使用别名 (8).hive的0.13版之前select , where 及 having 之后不能跟子查询语句(一般使用left join、right join 或者inner join替代) (9).先join(及inner join) 然后left join或right join (10).hive不支持group_concat方法,可用 concat_ws('|', collect_set(str)) 实现 (11).not in 和 <> 不起作用,可用left join tmp on tableName.id = tmp.id where tmp.id is null 替代实现 ... ... CH2 (1).不支持非等值连接,一般使用left join、right join 或者inner join替代。 •SQL中对两表内联可以写成: select * from dual a,dual b where a.key = b.key; •Hive中应为: select * from dual a join dual b on a.key = b.key; 而不是传统的格式: SELECT t1.a1 as c1, t2.b1 as c2 FROM t1, t2 WHERE t1.a2 = t2.b2 (2).分号字符:不能智能识别concat(‘;’,key),只会将‘;’当做SQL结束符号。 •分号是SQL语句结束标记,在HiveQL中也是,但是在HiveQL中,对分号的识别没有那么智慧,例如: •select concat(key,concat(';',key)) from dual; •但HiveQL在解析语句时提示: FAILED: Parse Error: line 0:-1 mismatched input '<EOF>' expecting ) in function specification •解决的办法是,使用分号的八进制的ASCII码进行转义,那么上述语句应写成: •select concat(key,concat('\073',key)) from dual; (3).不支持INSERT INTO 表 Values(), UPDATE, DELETE等操作.这样的话,就不要很复杂的锁机制来读写数据。 INSERT INTO syntax is only available starting in version 0.8。INSERT INTO就是在表或分区中追加数据。 (4).HiveQL中String类型的字段若是空(empty)字符串, 即长度为0, 那么对它进行IS NULL的判断结果是False,使用left join可以进行筛选行。 (5).不支持 ‘< dt <’这种格式的范围查找,可以用dt in(”,”)或者between替代。 (6).Hive不支持将数据插入现有的表或分区中,仅支持覆盖重写整个表,示例如下: INSERT OVERWRITE TABLE t1 SELECT * FROM t2; (7).group by的字段,必须是select后面的字段,select后面的字段不能比group by的字段多. 如果select后面有聚合函数,则该select语句中必须有group by语句; 而且group by后面不能使用别名; 有聚合函数存在就必须有group by. (8).select , where 及 having 之后不能跟子查询语句(一般使用left join、right join 或者inner join替代) (9).先join(及inner join) 然后left join或right join (10).hive不支持group_concat方法,可用 concat_ws('|', collect_set(str)) 实现 (11).not in 和 <> 不起作用,可用left join tmp on tableName.id = tmp.id where tmp.id is null 替代实现 (12).hive 中‘不等于’不管是用! 或者<>符号实现,都会将空值即null过滤掉,此时要用 where (white_level<>'3' or white_level is null) 或者 where (white_level!='3' or white_level is null ) 来保留null 的情况。 (13).union all 后面的表不加括号,不然执行报错; hive也不支持顶层的union all,使用子查询来解决; union all 之前不能有DISTRIBUTE BY | SORT BY| ORDER BY | LIMIT 等查询条件 CH3 1.case when ... then ... else ... end 2.length(string) 3.cast(string as bigint) 4.rand() 返回一个0到1范围内的随机数 5.ceiling(double) 向上取整 6.substr(string A, int start, int len) 7.collect_set(col)函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段 8.concat()函数 1、功能:将多个字符串连接成一个字符串。 2、语法:concat(str1, str2,...) 返回结果为连接参数产生的字符串,如果有任何一个参数为null,则返回值为null。 9.concat_ws()函数 1、功能:和concat()一样,将多个字符串连接成一个字符串,但是可以一次性指定分隔符~(concat_ws就是concat with separator) 2、语法:concat_ws(separator, str1, str2, ...) 说明:第一个参数指定分隔符。需要注意的是分隔符不能为null,如果为null,则返回结果为null。 10.nvl(expr1, expr2):空值转换函数 nvl(x,y) Returns y if x is null else return x 11.if(boolean testCondition, T valueTrue, T valueFalse) 12.row_number()over()分组排序功能,over()里头的分组以及排序的执行晚于 where group by order by 的执行。 13.获取年、月、日、小时、分钟、秒、当年第几周 select year('2018-02-27 10:00:00') as year ,month('2018-02-27 10:00:00') as month ,day('2018-02-27 10:00:00') as day ,hour('2018-02-27 10:00:00') as hour ,minute('2018-02-27 10:00:00') as minute ,second('2018-02-27 10:00:00') as second ,weekofyear('2018-02-27 10:00:00') as weekofyear 获取当前时间: 1).current_timestamp 2).unix_timestamp() 3).from_unixtime(unix_timestamp()) 4).CURRENT_DATE
not only SQL,非关系型数据库
指不遵循传统RDBMS模型的数据库
数据是非关系的,且不使用SQL作为主要查询语言
解决数据库的可伸缩性和可用性问题
不针对原子性或一致性问题
对比 | NoSQL | 关系型数据库 |
---|---|---|
常用数据库 | HBase、MongoDB、Redis | Oracle、DB2、MySQL |
存储格式 | 文档、键值对、图结构 | 表格式,行和列 |
存储规范 | 鼓励冗余 | 规范性,避免重复 |
存储扩展 | 横向扩展,分布式 | 纵向扩展(横向扩展有限) |
查询方式 | 结构化查询语言SQL | 非结构化查询 |
事务 | 不支持事务一致性 | 支持事务 |
性能 | 读写性能高 | 读写性能差 |
成本 | 简单易部署,开源,成本低 | 成本高 |
HBase是一个领先的NoSQL数据库
是一个面向列存储的NoSQL数据库
是一个分布式Hash Map,底层数据是Key-Value格式
基于Google Big Table论文
使用HDFS作为存储并利用其可靠性
HBase特点
数据访问速度快,响应时间约2-20毫秒
支持随机读写,每个节点20k~100k+ ops/s
可扩展性,可扩展到20,000+节点
高并发
Hbase应用场景:
1.增量数据-时间序列数据
高容量,高速写入
HBase之上有OpenTSDB模块,可以满足时序类场景
2.信息交换-消息传递
高容量,高速读写
通信、消息同步的应用构建在HBase之上,比如email,FaceBook等
3.内容服务-Web后端应用程序
高容量,高速读写
头条类、新闻类的的新闻、网页、图片存储在HBase中
启动Zookeeper、Hadoop、start-hbase.sh
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2PignwMO-1595805237772)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1595668080293.png)]
多个字符串连接成一个字符串。
2、语法:concat(str1, str2,…)
返回结果为连接参数产生的字符串,如果有任何一个参数为null,则返回值为null。
9.concat_ws()函数
1、功能:和concat()一样,将多个字符串连接成一个字符串,但是可以一次性指定分隔符~(concat_ws就是concat with separator)
2、语法:concat_ws(separator, str1, str2, ...)
说明:第一个参数指定分隔符。需要注意的是分隔符不能为null,如果为null,则返回结果为null。
10.nvl(expr1, expr2):空值转换函数 nvl(x,y) Returns y if x is null else return x
11.if(boolean testCondition, T valueTrue, T valueFalse)
12.row_number()over()分组排序功能,over()里头的分组以及排序的执行晚于 where group by order by 的执行。
13.获取年、月、日、小时、分钟、秒、当年第几周
select
year(‘2018-02-27 10:00:00’) as year
,month(‘2018-02-27 10:00:00’) as month
,day(‘2018-02-27 10:00:00’) as day
,hour(‘2018-02-27 10:00:00’) as hour
,minute(‘2018-02-27 10:00:00’) as minute
,second(‘2018-02-27 10:00:00’) as second
,weekofyear(‘2018-02-27 10:00:00’) as weekofyear
获取当前时间:
1).current_timestamp
2).unix_timestamp()
3).from_unixtime(unix_timestamp())
4).CURRENT_DATE
# Hbase ## 一、Hbase简介 ### 1、NoSQL not only SQL,非关系型数据库 指不遵循传统RDBMS模型的数据库 数据是非关系的,且不使用SQL作为主要查询语言 解决数据库的可伸缩性和可用性问题 不针对原子性或一致性问题 | **对比** | **NoSQL** | **关系型数据库** | | ---------- | ------------------------ | ------------------------ | | 常用数据库 | HBase、MongoDB、Redis | Oracle、DB2、MySQL | | 存储格式 | 文档、键值对、图结构 | 表格式,行和列 | | 存储规范 | 鼓励冗余 | 规范性,避免重复 | | 存储扩展 | 横向扩展,分布式 | 纵向扩展(横向扩展有限) | | 查询方式 | 结构化查询语言SQL | 非结构化查询 | | 事务 | 不支持事务一致性 | 支持事务 | | 性能 | 读写性能高 | 读写性能差 | | 成本 | 简单易部署,开源,成本低 | 成本高 | ### 2、Hbase HBase是一个领先的NoSQL数据库 是一个面向列存储的NoSQL数据库 是一个分布式Hash Map,底层数据是Key-Value格式 基于Google Big Table论文 使用HDFS作为存储并利用其可靠性 HBase特点 数据访问速度快,响应时间约2-20毫秒 支持随机读写,每个节点20k~100k+ ops/s 可扩展性,可扩展到20,000+节点 高并发 Hbase应用场景: 1.增量数据-时间序列数据 高容量,高速写入 HBase之上有OpenTSDB模块,可以满足时序类场景 2.信息交换-消息传递 高容量,高速读写 通信、消息同步的应用构建在HBase之上,比如email,FaceBook等 3.内容服务-Web后端应用程序 高容量,高速读写 头条类、新闻类的的新闻、网页、图片存储在HBase中 ## 二、启动Hbase 启动Zookeeper、Hadoop、start-hbase.sh ## 三、Hbase物理架构 [外链图片转存中...(img-2PignwMO-1595805237772)]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。