当前位置:   article > 正文

hdfs、MapReduce、yarn、hive、sqoop、HBASE、Redis、kafka、ES_es和yarn

es和yarn

目录

大数据开发复习课程

1、Hadoop

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-c4rW1xun-1630247234617)(assert/1582111848898.png)]

1.1、介绍Hadoop

  • 广义上来说,Hadoop通常是指一个更广泛的概念——Hadoop生态圈。
  • 狭义上说,Hadoop指Apache这款开源框架,它的核心组件有:
    • HDFS(分布式文件系统):解决海量数据存储
    • YARN(作业调度和集群资源管理的框架):解决资源任务调度
    • MAPREDUCE(分布式运算编程框架):解决海量数据计算

1.2、Hadoop特性优点

  • 扩容能力(Scalable):Hadoop是在可用的计算机集群间分配数据并完成计算任务的,这些集群可用方便的扩展到数以千计的节点中。
  • 成本低(Economical):Hadoop通过普通廉价的机器组成服务器集群来分发以及处理数据,以至于成本很低。
  • 高效率(Efficient):通过并发数据,Hadoop可以在节点之间动态并行的移动数据,使得速度非常快。
  • 可靠性(Rellable):能自动维护数据的多份复制,并且在任务失败后能自动地重新部署(redeploy)计算任务。所以Hadoop的按位存储和处理数据的能力值得人们信赖。

1.3、hadoop集群中hadoop都需要启动哪些进程,他们的作用分别是什么?

  • namenode =>HDFS的守护进程,负责维护整个文件系统,存储着整个文件系统的元数据信息,image+edit log
  • datanode =>是具体文件系统的工作节点,当我们需要某个数据,namenode告诉我们去哪里找,就直接和那个DataNode对应的服务器的后台进程进行通信,由DataNode进行数据的检索,然后进行具体的读/写操作
  • secondarynamenode =>一个守护进程,相当于一个namenode的元数据的备份机制,定期的更新,和namenode进行通信,将namenode上的image和edits进行合并,可以作为namenode的备份使用
  • resourcemanager =>是yarn平台的守护进程,负责所有资源的分配与调度,client的请求由此负责,监控nodemanager
  • nodemanager => 是单个节点的资源管理,执行来自resourcemanager的具体任务和命令
  • DFSZKFailoverController高可用时它负责监控NN的状态,并及时的把状态信息写入ZK。它通过一个独立线程周期性的调用NN上的一个特定接口来获取NN的健康状态。FC也有选择谁作为Active NN的权利,因为最多只有两个节点,目前选择策略还比较简单(先到先得,轮换)。
  • 7)JournalNode 高可用情况下存放namenode的editlog文件

1.4、Hadoop主要的配置文件

  • hadoop-env.sh

    • 文件中设置的是Hadoop运行时需要的环境变量。JAVA_HOME是必须设置的,即使我们当前的系统中设置了JAVA_HOME,它也是不认识的,因为Hadoop即使是在本机上执行,它也是把当前的执行环境当成远程服务器。
  • core-site.xml

    • 设置Hadoop的文件系统地址

      <property>
      		<name>fs.defaultFS</name>
      		<value>hdfs://node-1:9000</value>
      </property>
      
      • 1
      • 2
      • 3
      • 4
  • hdfs-site.xml

    • 指定HDFS副本的数量

    • secondary namenode 所在主机的ip和端口

    <property>
    		<name>dfs.replication</name>
    		<value>2</value>
        </property>
    
        <property>
     		 <name>dfs.namenode.secondary.http-address</name>
      		 <value>node-2:50090</value>
        </property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

  • mapred-site.xml

    • 指定mr运行时框架,这里指定在yarn上,默认是local

      <property>
      		<name>mapreduce.framework.name</name>
      		<value>yarn</value>
      </property>
      
      • 1
      • 2
      • 3
      • 4

  • yarn-site.xml

    • 指定YARN的主角色(ResourceManager)的地址

      <property>
      		<name>yarn.resourcemanager.hostname</name>
      		<value>node-1</value>
      </property>
      
      • 1
      • 2
      • 3
      • 4

1.5、Hadoop集群重要命令

  • 初始化

    • hadoop namenode –format
  • 启动dfs

    • start-dfs.sh
  • 启动yarn

    • start-yarn.sh
  • 启动任务历史服务器

    • mr-jobhistory-daemon.sh start historyserver
  • 一键启动

    • start-all.sh
  • 启动成功后:

    • NameNode http://nn_host:port/ 默认50070.
      • ResourceManagerhttp://rm_host:port/ 默认 8088

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bfdg6iP4-1630247234618)(assert/1582117867258.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aN7yEjmz-1630247234620)(assert/1582117880300.png)]

选项名称使用格式含义
-ls-ls <路径>查看指定路径的当前目录结构
-lsr-lsr <路径>递归查看指定路径的目录结构
-du-du <路径>统计目录下个文件大小
-dus-dus <路径>汇总统计目录下文件(夹)大小
-count-count [-q] <路径>统计文件(夹)数量
-mv-mv <源路径> <目的路径>移动
-cp-cp <源路径> <目的路径>复制
-rm-rm [-skipTrash] <路径>删除文件/空白文件夹
-rmr-rmr [-skipTrash] <路径>递归删除
-put-put <多个linux上的文件> <hdfs路径>上传文件
-copyFromLocal-copyFromLocal <多个linux上的文件> <hdfs路径>从本地复制
-moveFromLocal-moveFromLocal <多个linux上的文件> <hdfs路径>从本地移动
-getmerge-getmerge <源路径> <linux路径>合并到本地
-cat-cat <hdfs路径>查看文件内容
-text-text <hdfs路径>查看文件内容
-copyToLocal-copyToLocal [-ignoreCrc] [-crc] [hdfs源路径] [linux目的路径]从本地复制
-moveToLocal-moveToLocal [-crc] <hdfs源路径> <linux目的路径>从本地移动
-mkdir-mkdir <hdfs路径>创建空白文件夹
-setrep-setrep [-R] [-w] <副本数> <路径>修改副本数量
-touchz-touchz <文件路径>创建空白文件
-stat-stat [format] <路径>显示文件统计信息
-tail-tail [-f] <文件>查看文件尾部信息
-chmod-chmod [-R] <权限模式> [路径]修改权限
-chown-chown [-R] [属主][:[属组]] 路径修改属主
-chgrp-chgrp [-R] 属组名称 路径修改属组
-help-help [命令选项]帮助

1.6、HDFS的垃圾桶机制

  • 修改core-site.xml

      <property>
            <name>fs.trash.interval</name>
            <value>1440</value>
       </property>
    
    • 1
    • 2
    • 3
    • 4
  • 这个时间以分钟为单位,例如1440=24h=1天。HDFS的垃圾回收的默认配置属性为 0,也就是说,如果你不小心误删除了某样东西,那么这个操作是不可恢复的。

1.7、HDFS写数据流程

HDFS dfs -put a.txt /

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SdtjV7LF-1630247234621)(assert/1582122846737.png)]

详细步骤

  • 1)客户端通过Distributed FileSystem模块向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。
  • 2)namenode返回是否可以上传。
  • 3)客户端请求第一个 block上传到哪几个datanode服务器上。
  • 4)namenode返回3个datanode节点,分别为dn1、dn2、dn3。
  • 5)客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个通信管道建立完成。
  • 6)dn1、dn2、dn3逐级应答客户端。
  • 7)客户端开始往dn1上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位(大小为64k),dn1收到一个packet就会传给dn2,dn2传给dn3;dn1每传一个packet会放入一个应答队列等待应答。
  • 8)当一个block传输完成之后,客户端再次请求namenode上传第二个block的服务器。

1.8、Hadoop读数据流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7mghPVRC-1630247234622)(assert/1582123099100.png)]

详细步骤

  • 1)客户端通过Distributed FileSystem向namenode请求下载文件,namenode通过查询元数据,找到文件块所在的datanode地址。
  • 2)挑选一台datanode(就近原则,然后随机)服务器,请求读取数据。
  • 3)datanode开始传输数据给客户端(从磁盘里面读取数据输入流,以packet为单位来做校验,大小为64k)。
  • 4)客户端以packet为单位接收,先在本地缓存,然后写入目标文件。作者:李小李的路

1.9、SecondaryNameNode的作用

​ NameNode职责是管理元数据信息,DataNode的职责是负责数据具体存储,那么SecondaryNameNode的作用是什么?

答:它的职责是合并NameNode的edit logs到fsimage文件

​ 每达到触发条件 [达到一个小时,或者事物数达到100万],会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(这个过程称为checkpoint),如下图所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aohzwa7I-1630247234623)(assert/1582387636055.png)]

1.10、HDFS的扩容、缩容(面试)

1.动态扩容

​ 随着公司业务的增长,数据量越来越大,原有的datanode节点的容量已经不能满足存储数据的需求,需要在原有集群基础上动态添加新的数据节点。也就是俗称的动态扩容

​ 有时候旧的服务器需要进行退役更换,暂停服务,可能就需要在当下的集群中停止某些机器上hadoop的服务,俗称动态缩容

1.1. 基础准备

在基础准备部分,主要是设置hadoop运行的系统环境

修改新机器系统hostname(通过/etc/sysconfig/network进行修改)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ii8LT3vd-1630247234623)(assert/1582387683421.png)]

修改hosts文件,将集群所有节点hosts配置进去(集群所有节点保持hosts文件统一)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yQVQqfKq-1630247234624)(assert/1582387736001.png)]

设置NameNode到DataNode的免密码登录(ssh-copy-id命令实现)

修改主节点slaves文件,添加新增节点的ip信息(集群重启时配合一键启动脚本使用)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4oRZVnM0-1630247234624)(assert/1582387749216.png)]

在新的机器上上传解压一个新的hadoop安装包,从主节点机器上将hadoop的所有配置文件,scp到新的节点上。

1.2. 添加datanode
  • 在namenode所在的机器的

/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop目录下创建dfs.hosts文件

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim dfs.hosts

添加如下主机名称(包含新服役的节点)

node-1

node-2

node-3

node-4

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 在namenode机器的hdfs-site.xml配置文件中增加dfs.hosts属性

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim hdfs-site.xml

<property>
  <name>dfs.hosts</name>
  <value>/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/dfs.hosts</value>
</property>
  • 1
  • 2
  • 3
  • 4

​ dfs.hosts属性的意义:命名一个文件,其中包含允许连接到namenode的主机列表。必须指定文件的完整路径名。如果该值为空,则允许所有主机。相当于一个白名单,也可以不配置。

在新的机器上单独启动datanode: hadoop-daemon.sh start datanode

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AmHu6swW-1630247234625)(assert/1582387814523.png)]

刷新页面就可以看到新的节点加入进来了

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WtQF7jI8-1630247234625)(assert/1582387823813.png)]

1.3.datanode负载均衡服务

新加入的节点,没有数据块的存储,使得集群整体来看负载还不均衡。因此最后还需要对hdfs负载设置均衡,因为默认的数据传输带宽比较低,可以设置为64M,即hdfs dfsadmin -setBalancerBandwidth 67108864即可

默认balancer的threshold为10%,即各个节点与集群总的存储使用率相差不超过10%,我们可将其设置为5%。然后启动Balancer,

sbin/start-balancer.sh -threshold 5,等待集群自均衡完成即可。

1.4.添加nodemanager

在新的机器上单独启动nodemanager:

yarn-daemon.sh start nodemanager

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LmvPnUGc-1630247234626)(assert/1582388108165.png)]

在ResourceManager,通过yarn node -list查看集群情况

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cLINFVyU-1630247234626)(assert/1582388119076.png)]

2.动态缩容
2.1.添加退役节点

在namenode所在服务器的hadoop配置目录etc/hadoop下创建dfs.hosts.exclude文件,并添加需要退役的主机名称。

注意:该文件当中一定要写真正的主机名或者ip地址都行,不能写node-4

node04.hadoop.com

在namenode机器的hdfs-site.xml配置文件中增加dfs.hosts.exclude属性

cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop

vim hdfs-site.xml

<property> 
        <name>dfs.hosts.exclude</name>
        <value>/export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/dfs.hosts.exclude</value>
</property>
  • 1
  • 2
  • 3
  • 4

dfs.hosts.exclude属性的意义:命名一个文件,其中包含不允许连接到namenode的主机列表。必须指定文件的完整路径名。如果值为空,则不排除任何主机。

2.2.刷新集群

在namenode所在的机器执行以下命令,刷新namenode,刷新resourceManager。

hdfs dfsadmin -refreshNodes

yarn rmadmin –refreshNodes

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SyT9g8BN-1630247234627)(assert/1582388522658.png)]

等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役。

node-4执行以下命令,停止该节点进程

cd /export/servers/hadoop-2.6.0-cdh5.14.0

sbin/hadoop-daemon.sh stop datanode

sbin/yarn-daemon.sh stop nodemanager

namenode所在节点执行以下命令刷新namenode和resourceManager

hdfs dfsadmin –refreshNodes

yarn rmadmin –refreshNodes

namenode所在节点执行以下命令进行均衡负载

cd /export/servers/hadoop-2.6.0-cdh5.14.0/

sbin/start-balancer.sh

1.11、HDFS安全模式

​ 安全模式是HDFS所处的一种特殊状态,在这种状态下,文件系统只接受读数据请求,而不接受删除、修改等变更请求,是一种保护机制,用于保证集群中的数据块的安全性。

​ 在NameNode主节点启动时,HDFS首先进入安全模式,集群会开始检查数据块的完整性。DataNode在启动的时候会向namenode汇报可用的block信息,当整个系统达到安全标准时,HDFS自动离开安全模式。

  • 手动进入安全模式

    hdfs dfsadmin -safemode enter
    
    • 1
  • 手动离开安全模式

    hdfs dfsadmin -safemode leave
    
    • 1

1.12、机架感知

​ hadoop自身是没有机架感知能力的,必须通过人为的设定来达到这个目的。一种是通过配置一个脚本来进行映射;另一种是通过实现DNSToSwitchMapping接口的resolve()方法来完成网络位置的映射。

  • 1、写一个脚本,然后放到hadoop的core-site.xml配置文件中,用namenode和jobtracker进行调用。
#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys

rack = {"hadoop-node-31":"rack1",
                                "hadoop-node-32":"rack1",
                                "hadoop-node-33":"rack1",
                                "hadoop-node-34":"rack1",
                                "hadoop-node-49":"rack2",
                                "hadoop-node-50":"rack2",
                                "hadoop-node-51":"rack2",
                                "hadoop-node-52":"rack2",
                                "hadoop-node-53":"rack2",
                                "hadoop-node-54":"rack2",
                                "192.168.1.31":"rack1",
                                "192.168.1.32":"rack1",
                                "192.168.1.33":"rack1",
                                "192.168.1.34":"rack1",
                                "192.168.1.49":"rack2",
                                "192.168.1.50":"rack2",
                                "192.168.1.51":"rack2",
                                "192.168.1.52":"rack2",
                                "192.168.1.53":"rack2",
                                "192.168.1.54":"rack2",
                                }

if __name__=="__main__":
        print "/" + rack.get(sys.argv[1],"rack0")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 2、将脚本赋予可执行权限chmod +x RackAware.py,并放到bin/目录下。

  • 3、然后打开conf/core-site.html

        <property>
            <name>topology.script.file.name</name>
            <value>/opt/modules/hadoop/hadoop-1.0.3/bin/RackAware.py</value>
    <!--机架感知脚本路径-->
        </property>
        <property>
            <name>topology.script.number.args</name>
            <value>20</value>
    <!--机架服务器数量,由于我写了20个,所以这里写20-->
        </property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 4、重启Hadoop集群

  • namenode日志

    2012-06-08 14:42:19,174 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from 192.168.1.49:50010 storage DS-1155827498-192.168.1.49-50010-1338289368956
    2012-06-08 14:42:19,204 INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack2/192.168.1.49:50010
    2012-06-08 14:42:19,205 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from 192.168.1.53:50010 storage DS-1773813988-192.168.1.53-50010-1338289405131
    2012-06-08 14:42:19,226 INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack2/192.168.1.53:50010
    2012-06-08 14:42:19,226 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from 192.168.1.34:50010 storage DS-2024494948-127.0.0.1-50010-1338289438983
    2012-06-08 14:42:19,242 INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack1/192.168.1.34:50010
    2012-06-08 14:42:19,242 INFO org.apache.hadoop.hdfs.StateChange: BLOCK* NameSystem.registerDatanode: node registration from 192.168.1.54:50010 storage DS-767528606-192.168.1.54-50010-1338289412267
      2012-06-08 14:42:49,492 INFO org.apache.hadoop.hdfs.StateChange: STATE* Network topology has 2 racks and 10 datanodes
      2012-06-08 14:42:49,492 INFO org.apache.hadoop.hdfs.StateChange: STATE* UnderReplicatedBlocks has 0 blocks
      2012-06-08 14:42:49,642 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: ReplicateQueue QueueProcessingStatistics: First cycle completed 0 blocks in 0 msec
      2012-06-08 14:42:49,642 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem: ReplicateQueue QueueProcessingStatistics: Queue flush completed 0 blocks in 0 msec processing time, 0 msec clock time, 1 cycles
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

2、MapReduce

2.1、介绍MapReduce

​ MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。

​ Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

​ Reduce负责“合”,即对map阶段的结果进行全局汇总。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YmxyDt0t-1630247234628)(assert/1582125207249.png)]

​ 图:MapReduce思想模型

2.2、会写Wordcount

  • 定义一个mapper类
//首先要定义四个泛型的类型
//keyin:  LongWritable    valuein: Text
//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	//map方法的生命周期:  框架每传一行数据就被调用一次
	//key :  这一行的起始点在文件中的偏移量
	//value: 这一行的内容
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//拿到一行数据转换为string
		String line = value.toString();
		//将这一行切分出各个单词
		String[] words = line.split(" ");
		//遍历数组,输出<单词,1>
		for(String word:words){
			context.write(new Text(word), new IntWritable(1));
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 定义一个reducer类
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次  
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		//定义一个计数器
		int count = 0;
		//遍历这一组kv的所有v,累加到count中
		for(IntWritable value:values){
			count += value.get();
		}
		context.write(key, new IntWritable(count));
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 定义一个主类,用来描述job并提交job
public class WordCountRunner {
	//把业务逻辑相关的信息(哪个是mapper,哪个是reducer,要处理的数据在哪里,输出的结果放哪里……)描述成一个job对象
	//把这个描述好的job提交给集群去运行
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job wcjob = Job.getInstance(conf);
		//指定我这个job所在的jar包
//		wcjob.setJar("/home/hadoop/wordcount.jar");
		wcjob.setJarByClass(WordCountRunner.class);
		
		wcjob.setMapperClass(WordCountMapper.class);
		wcjob.setReducerClass(WordCountReducer.class);
		//设置我们的业务逻辑Mapper类的输出key和value的数据类型
		wcjob.setMapOutputKeyClass(Text.class);
		wcjob.setMapOutputValueClass(IntWritable.class);
		//设置我们的业务逻辑Reducer类的输出key和value的数据类型
		wcjob.setOutputKeyClass(Text.class);
		wcjob.setOutputValueClass(IntWritable.class);
		
		//指定要处理的数据所在的位置
		FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
		//指定处理完成之后的结果所保存的位置
		FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
		
		//向yarn集群提交这个job
		boolean res = wcjob.waitForCompletion(true);
		System.exit(res?0:1);
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2.3、Combiner

​ 每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能。

​ 例如:对于hadoop自带的wordcount的例子,value就是一个叠加的数字,
​ 所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qFhFc7JF-1630247234628)(assert/1582126908990.png)]

  • 具体使用
自定义Combiner:

public static class MyCombiner extends  Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(
                Text key, Iterable<LongWritable> values,Context context)throws IOException, InterruptedException {

            long count = 0L;
            for (LongWritable value : values) {
                count += value.get();
            }
            context.write(key, new LongWritable(count));
        };
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 在主类中添加
Combiner设置
    // 设置Map规约Combiner
    job.setCombinerClass(MyCombiner.class);

执行后看到map的输出和combine的输入统计是一致的,而combine的输出与reduce的输入统计是一样的。
  • 1
  • 2
  • 3
  • 4
  • 5

2.4、partitioner

​ 在进行MapReduce计算时,有时候需要把最终的输出数据分到不同的文件中,比如按照省份划分的话,需要把同一省份的数据放到一个文件中;按照性别划分的话,需要把同一性别的数据放到一个文件中。负责实现划分数据的类称作Partitioner。

  • HashPartitioner源码如下
package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    //默认使用key的hash值与上int的最大值,避免出现数据溢出 的情况
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。那么任何整数与1相除的余数肯定是0。也就是说getPartition(…)方法的返回值总是0。也就是Mapper任务的输出总是送给一个Reducer任务,最终只能输出到一个文件中。

具体实现

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FivePartitioner extends Partitioner<IntWritable, IntWritable>{

    /**
     * 我们的需求:按照能否被5除尽去分区
     * 
     * 1、如果除以5的余数是0,  放在0号分区
     * 2、如果除以5的余数不是0,  放在1分区
     */
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        
        int intValue = key.get();
        
        if(intValue % 5 == 0){
            return 0;
        }else{
           return 1;
        }    
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 在主函数里加入如下两行代码即可:
job.setPartitionerClass(FivePartitioner.class);
job.setNumReduceTasks(2);//设置为2
  • 1
  • 2

2.5、MapReduce的执行流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UTLzAOYz-1630247234629)(assert/1582129217021.png)]

  • 详细流程

  • Map阶段

    • l 第一阶段是把输入目录下文件按照一定的标准逐个进行逻辑切片,形成切片规划。默认情况下,Split size = Block size。每一个切片由一个MapTask处理。(getSplits)

      l 第二阶段是对切片中的数据按照一定的规则解析成<key,value>对。默认规则是把每一行文本内容解析成键值对。key是每一行的起始位置(单位是字节),value是本行的文本内容。(TextInputFormat)

      l 第三阶段是调用Mapper类中的map方法。上阶段中每解析出来的一个<k,v>,调用一次map方法。每次调用map方法会输出零个或多个键值对。

      l 第四阶段是按照一定的规则对第三阶段输出的键值对进行分区。默认是只有一个区。分区的数量就是Reducer任务运行的数量。默认只有一个Reducer任务。

      l 第五阶段是对每个分区中的键值对进行排序。首先,按照键进行排序,对于键相同的键值对,按照值进行排序。比如三个键值对<2,2>、<1,3>、<2,1>,键和值分别是整数。那么排序后的结果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,直接输出到文件中。

      l 第六阶段是对数据进行局部聚合处理,也就是combiner处理。键相等的键值对会调用一次reduce方法。经过这一阶段,数据量会减少。本阶段默认是没有的。

  • reduce阶段

    • l 第一阶段是Reducer任务会主动从Mapper任务复制其输出的键值对。Mapper任务可能会有很多,因此Reducer会复制多个Mapper的输出。

      l 第二阶段是把复制到Reducer本地数据,全部进行合并,即把分散的数据合并成一个大的数据。再对合并后的数据排序。

      l 第三阶段是对排序后的键值对调用reduce方法。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对。最后把这些输出的键值对写入到HDFS文件中。

2.6、MapReduce的shuffle阶段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fP17mV2Q-1630247234629)(assert/1582129765528.png)]

  • shuffle被称作MapReduce的心脏,是MapReduce的核心。

  • 由上图看出,每个数据切片由一个Mapper进程处理,也就是说mappper只是处理文件的一部分。

  • 每一个Mapper进程都有一个环形的内存缓冲区,用来存储Map的输出数据,这个内存缓冲区的默认大小是100MB,当数据达到阙值0.8,也就是80MB的时候,一个后台的程序就会把数据溢写到磁盘中。在将数据溢写到磁盘的过程中要经过复杂的过程,首先要将数据进行分区排序(按照分区号如0,1,2),分区完以后为了避免Map输出数据的内存溢出,可以将Map的输出数据分为各个小文件再进行分区,这样map的输出数据就会被分为了具有多个小文件的分区已排序过的数据。然后将各个小文件分区数据进行合并成为一个大的文件(将各个小文件中分区号相同的进行合并)。

  • 这个时候Reducer启动了三个分别为0,1,2。0号Reducer会取得0号分区 的数据;1号Reducer会取得1号分区的数据;2号Reducer会取得2号分区的数据。

2.7、MapReduce优化

2.7.1、资源相关参数

//以下参数是在用户自己的MapReduce应用程序中配置就可以生效

(1) mapreduce.map.memory.mb: 一个Map Task可使用的内存上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。

(2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。

(3) mapreduce.map.cpu.vcores: 每个Maptask可用的最多cpu core数目, 默认值: 1

(4) mapreduce.reduce.cpu.vcores: 每个Reducetask可用最多cpu core数目默认值: 1

(5) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap

size等参数, 例如:“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”

(@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: “”

(6) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java

heap size等参数, 例如:“-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc”, 默认值: “”

//应该在yarn启动之前就配置在服务器的配置文件中才能生效

(1) yarn.scheduler.minimum-allocation-mb RM中每个容器请求的最小配置,以MB为单位,默认1024。

(2) yarn.scheduler.maximum-allocation-mb RM中每个容器请求的最大分配,以MB为单位,默认8192。

(3) yarn.scheduler.minimum-allocation-vcores 1

(4)yarn.scheduler.maximum-allocation-vcores 32

(5) yarn.nodemanager.resource.memory-mb 表示该节点上YARN可使用的物理内存总量,默认是8192(MB),注意,如果你的节点内存资源不够8GB,则需要调减小这个值,而YARN不会智能的探测节点的物理内存总量。

//shuffle性能优化的关键参数,应在yarn启动之前就配置好

(1) mapreduce.task.io.sort.mb 100 shuffle的环形缓冲区大小,默认100m

(2) mapreduce.map.sort.spill.percent 0.8 环形缓冲区溢出的阈值,默认80%

2.7.2、容错相关参数

(1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

(3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。

(4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.

(5) mapreduce.task.timeout:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是临时卡住,也许永远会卡住。为了防止因为用户程序永远block不退出,则强制设置了一个超时时间(单位毫秒),默认是600000,值为0将禁用超时。

2.7.3、效率跟稳定性参数

(1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为true, 如果为true,则可以并行执行一些Map任务的多个实例。

(2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为true

(3)mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时最小切片大小,默认1。

(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片时最大切片大小

2.8、mapreduce程序在yarn上的执行流程

Hadoop jar xxx.jar

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Gqc88PB-1630247234630)(assert/1582125526987.png)]

详细流程

  • 一:客户端向集群提交一个任务,该任务首先到ResourceManager中的ApplicationManager;
  • 二:ApplicationManager收到任务之后,会在集群中找一个NodeManager,并在该NodeManager所在DataNode上启动一个AppMaster进程,该进程用于进行任务的划分和任务的监控;
  • 三:AppMaster启动起来之后,会向ResourceManager中的ApplicationManager注册其信息(目的是与之通信);
  • 四:AppMaster向ResourceManager下的ResourceScheduler申请计算任务所需的资源;
  • 五:AppMaster申请到资源之后,会与所有的NodeManager通信要求它们启动计算任务所需的任务(Map和Reduce);
  • 六:各个NodeManager启动对应的容器用来执行Map和Reduce任务;
  • 七:各个任务会向AppMaster汇报自己的执行进度和执行状况,以便让AppMaster随时掌握各个任务的运行状态,在某个任务出了问题之后重启执行该任务;
  • 八:在任务执行完之后,AppMaster向ApplicationManager汇报,以便让ApplicationManager注销并关闭自己,使得资源得以回收;

2.9、执行MapReduce常见的问题

  1. client对集群中HDFS的操作没有权限
在集群配置文件hdfs-site.xml
property>
     <name>dfs.permissions</name>
    <value>false</value>
</property>
然后重启
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. mapreduce的输出路径已存在,必须先删除掉那个路径

  2. 提交集群运行,运行失败

job.setJar("/home/hadoop/wordcount.jar");
  • 1
  1. 日志打不出来,报警告信息

    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).  
    log4j:WARN Please initialize the log4j system properly.  
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.  
    
    • 1
    • 2
    • 3

    需要在项目的src下面新建file名为log4j.properties的文件

3、yarn

3.1、介绍yarn

​ 通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度。可以把yarn理解为相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序,Yarn为这些程序提供运算所需的资源(内存、cpu)。

3.2、yarn的基本架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6ASK413u-1630247234630)(assert/1582169222018.png)]

​ YARN是一个资源管理、任务调度的框架,主要包含三大模块:ResourceManager(RM)、NodeManager(NM)、ApplicationMaster(AM)。

ResourceManager负责所有资源的监控、分配和管理;

ApplicationMaster负责每一个具体应用程序的调度和协调

NodeManager负责每一个节点的维护。

​ 对于所有的applications,RM拥有绝对的控制权和对资源的分配权。而每个AM则会和RM协商资源,同时和NodeManager通信来执行和监控task。

3.3、yarn三大组件

3.3.1.ResourceManager
  • ResourceManager负责整个集群的资源管理和分配,是一个全局的资源管理系统。
  • NodeManager以心跳的方式向ResourceManager汇报资源使用情况(目前主要是CPU和内存的使用情况)。RM只接受NM的资源回报信息,对于具体的资源处理则交给NM自己处理。
  • YARN Scheduler根据application的请求为其分配资源,不负责application job的监控、追踪、运行状态反馈、启动等工作。
3.3.2. NodeManager
  • NodeManager是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN集群每个节点都运行一个NodeManager。
  • NodeManager定时向ResourceManager汇报本节点资源(CPU、内存)的使用情况和Container的运行状态。当ResourceManager宕机时NodeManager自动连接RM备用节点。
  • NodeManager接收并处理来自ApplicationMaster的Container启动、停止等各种请求。
3.3.3. ApplicationMaster
  • 用户提交的每个应用程序均包含一个ApplicationMaster,它可以运行在ResourceManager以外的机器上。
  • 负责与RM调度器协商以获取资源(用Container表示)。
  • 将得到的任务进一步分配给内部的任务(资源的二次分配)。
  • 与NM通信以启动/停止任务。
  • 监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
  • 当前YARN自带了两个ApplicationMaster实现,一个是用于演示AM编写方法的实例程序DistributedShell,它可以申请一定数目的Container以并行运行一个Shell命令或者Shell脚本;另一个是运行MapReduce应用程序的AM—MRAppMaster。

注:RM只负责监控AM,并在AM运行失败时候启动它。RM不负责AM内部任务的容错,任务的容错由AM完成。

3.4、Yarn 调度器Scheduler

​ 在Yarn中,负责给应用分配资源的就是Scheduler。在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler。

3.4.1.FIFO Scheduler

FIFO Scheduler把应用按提交的顺序排成一个队列,这是一个先进先出队列,在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nqfj7d01-1630247234631)(assert/1582169909285.png)]

​ FIFO Scheduler是最简单也是最容易理解的调度器,也不需要任何配置,但它并不适用于共享集群。大的应用可能会占用所有集群资源,这就导致其它应用被阻塞。在共享集群中,更适合采用Capacity Scheduler或Fair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源。

3.4.2. Capacity Scheduler

​ Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OYVjEV21-1630247234632)(assert/1582169930354.png)]

​ 容量调度器 Capacity Scheduler 最初是由 Yahoo 最初开发设计使得 Hadoop 应用能够被多用户使用,且最大化整个集群资源的吞吐量,现被 IBM BigInsights 和 Hortonworks HDP 所采用。

​ Capacity Scheduler 被设计为允许应用程序在一个可预见的和简单的方式共享集群资源,即"作业队列"。Capacity Scheduler 是根据租户的需要和要求把现有的资源分配给运行的应用程序。Capacity Scheduler 同时允许应用程序访问还没有被使用的资源,以确保队列之间共享其它队列被允许的使用资源。管理员可以控制每个队列的容量,Capacity Scheduler 负责把作业提交到队列中。

3.4.3.Fair Scheduler

​ 在Fair调度器中,我们不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。如下图所示,当第一个大job提交时,只有这一个job在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。

需要注意的是,在下图Fair调度器中,从第二个任务提交到获得资源会有一定的延迟,因为它需要等待第一个任务释放占用的Container。小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DzlGDzTK-1630247234632)(assert/1582169959564.png)]

​ 公平调度器 Fair Scheduler 最初是由 Facebook 开发设计使得 Hadoop 应用能够被多用户公平地共享整个集群资源,现被 Cloudera CDH 所采用。

Fair Scheduler 不需要保留集群的资源,因为它会动态在所有正在运行的作业之间平衡资源。

4、hive

4.1、hive的介绍

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-839dNodk-1630247234633)(assert/1582170599356.png)]

​ Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。

​ 本质是将SQL转换为MapReduce程序。

​ 主要用途:用来做离线数据分析,比直接用MapReduce开发效率更高。

4.2、hive的架构

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HKauMknj-1630247234633)(assert/1582170850441.png)]

用户接口:包括 CLI、JDBC/ODBC、WebGUI。其中,CLI(command line interface)为shell命令行;JDBC/ODBC是Hive的JAVA实现,与传统数据库JDBC类似;WebGUI是通过浏览器访问Hive。

元数据存储:通常是存储在关系数据库如 mysql/derby中。Hive 将元数据存储在数据库中。Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。

解释器、编译器、优化器、执行器:完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在 HDFS 中,并在随后有 MapReduce 调用执行。

4.3、Hive 数据模型

Hive中所有的数据都存储在HDFS中,没有专门的数据存储格式

在创建表时指定数据中的分隔符,Hive 就可以映射成功,解析数据。

Hive中包含以下数据模型

**db:**在hdfs中表现为hive.metastore.warehouse.dir目录下一个文件夹

**table:**在hdfs中表现所属db目录下一个文件夹

**external table:**数据存放位置可以在HDFS任意指定路径

**partition:**在hdfs中表现为table目录下的子目录

**bucket:**在hdfs中表现为同一个表目录下根据hash散列之后的多个文件

4.4、常用操作

4.4.1、数据库相关

Hive配置单元包含一个名为 default 默认的数据库.

  • —创建数据库

create database [if not exists] ;

  • –显示所有数据库

show databases;

  • –删除数据库

drop database if exists [restrict|cascade];

​ 默认情况下,hive不允许删除含有表的数据库,要先将数据库中的表清空才能drop,否则会报错
–加入cascade关键字,可以强制删除一个数据库

hive> drop database if exists users cascade;
  • 1
  • –切换数据库

use ;

4.4.2、内部表外部表
建内部表
create table 
student(Sno int,Sname string,Sex string,Sage int,Sdept string) 
row format delimited fields terminated by ',';
建外部表
create external table 
student_ext(Sno int,Sname string,Sex string,Sage int,Sdept string) 
row format delimited fields terminated by ',' location '/stu';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
内、外部表加载数据:
load data local inpath '/root/hivedata/students.txt' overwrite into table student;

load data inpath '/stu' into table student_ext;
  • 1
  • 2
  • 3
  • 4
4.4.3、创建分区表
  • 分区建表分为2种,一种是单分区,也就是说在表文件夹目录下只有一级文件夹目录。另外一种是多分区,表文件夹下出现多文件夹嵌套模式。
  • 单分区建表语句
create table day_table (id int, content string) partitioned by (dt string);
单分区表,按天分区,在表结构中存在id,content,dt三列。
  • 1
  • 2
  • 双分区建表语句
create table day_hour_table (id int, content string) partitioned by (dt string, hour string);
双分区表,按天和小时分区,在表结构中新增加了dt和hour两列。
  • 1
  • 2
 导入数据
 load data local inpath '/root/hivedata/dat_table.txt' into table day_table partition(dt='2017-07-07');
 
 load data local inpath '/root/hivedata/dat_table.txt' into table day_hour_table partition(dt='2017-07-07', hour='08');
 
 基于分区的查询:

 SELECT day_table.* FROM day_table WHERE day_table.dt = '2017-07-07';

 查看分区
 show partitions day_hour_table;  

总的说来partition就是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 指定分隔符

    —指定分隔符创建分区表

    create table day_table (id int, content string) partitioned by (dt string) row format delimited fields terminated by ',';
    
    • 1

    —复杂类型的数据表指定分隔符

    数据如下

    zhangsan	beijing,shanghai,tianjin,hangzhou
    wangwu	shanghai,chengdu,wuhan,haerbin
    
    • 1
    • 2

    建表语句

    create table
    complex_array(name string,work_locations array<string>) 
    row format delimited fields terminated by '\t' 
    collection items terminated by ',';
    
    • 1
    • 2
    • 3
    • 4
4.4.4、增删分区
  • 增加分区
alter table t_partition add partition (dt='2008-08-08') location 'hdfs://node-21:9000/t_parti/';

执行添加分区  /t_parti文件夹下的数据不会被移动。并且没有分区目录dt=2008-08-08 
  • 1
  • 2
  • 3
  • 删除分区
alter table t_partition drop partition (dt='2008-08-08');

执行删除分区时/t_parti下的数据会被删除并且连同/t_parti文件夹也会被删除

注意区别于load data时候添加分区:会移动数据 会创建分区目录
  • 1
  • 2
  • 3
  • 4
  • 5
4.4.5、hive中的join
准备数据
1,a
2,b
3,c
4,d
7,y
8,u

2,bb
3,cc
7,yy
9,pp



建表:
create table a(id int,name string)
row format delimited fields terminated by ',';

create table b(id int,name string)
row format delimited fields terminated by ',';

导入数据:
load data local inpath '/root/hivedata/a.txt' into table a;
load data local inpath '/root/hivedata/b.txt' into table b;


实验:
** inner join
select * from a inner join b on a.id=b.id;

+-------+---------+-------+---------+--+
| a.id  | a.name  | b.id  | b.name  |
+-------+---------+-------+---------+--+
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 7     | y       | 7     | yy      |
+-------+---------+-------+---------+--+





**left join   
select * from a left join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id  | a.name  | b.id  | b.name  |
+-------+---------+-------+---------+--+
| 1     | a       | NULL  | NULL    |
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 4     | d       | NULL  | NULL    |
| 7     | y       | 7     | yy      |
| 8     | u       | NULL  | NULL    |
+-------+---------+-------+---------+--+





**right join
select * from a right join b on a.id=b.id;

select * from b right join a on b.id=a.id;
+-------+---------+-------+---------+--+
| a.id  | a.name  | b.id  | b.name  |
+-------+---------+-------+---------+--+
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 7     | y       | 7     | yy      |
| NULL  | NULL    | 9     | pp      |
+-------+---------+-------+---------+--+




**
select * from a full outer join b on a.id=b.id;
+-------+---------+-------+---------+--+
| a.id  | a.name  | b.id  | b.name  |
+-------+---------+-------+---------+--+
| 1     | a       | NULL  | NULL    |
| 2     | b       | 2     | bb      |
| 3     | c       | 3     | cc      |
| 4     | d       | NULL  | NULL    |
| 7     | y       | 7     | yy      |
| 8     | u       | NULL  | NULL    |
| NULL  | NULL    | 9     | pp      |
+-------+---------+-------+---------+--+


**hive中的特别join
select * from a left semi join b on a.id = b.id;

select a.* from a inner join b on a.id=b.id;
+-------+---------
| a.id  | a.name  
+-------+---------
| 2     | b       
| 3     | c       
| 7     | y       
+-------+---------
相当于
select a.id,a.name from a where a.id in (select b.id from b); 在hive中效率极低

select a.id,a.name from a join b on (a.id = b.id);

select * from a inner join b on a.id=b.id;


cross join##慎用)
返回两个表的笛卡尔积结果,不需要指定关联键。
select a.*,b.* from a cross join b;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
4.4.6、json解析
1、先加载rating.json文件到hive的一个原始表 rat_json
样例:{"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"}

create table rat_json(line string) row format delimited;
load data local inpath '/root/hivedata/rating.json' into table rat_json;

2、需要解析json数据成四个字段,插入一张新的表 t_rating
drop table if exists t_rating;
create table t_rating(movieid string,rate int,timestring string,uid string)
row format delimited fields terminated by '\t';

3、json表数据解析到rating表中
insert overwrite table t_rating
select 
get_json_object(line,'$.movie') as moive,
get_json_object(line,'$.rate') as rate,
get_json_object(line,'$.timeStamp') as timestring, get_json_object(line,'$.uid') as uid 
from rat_json limit 10;

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

4.5、常用函数

4.5.1、数值函数
  • 指定精度取整函数 : round

    语法: round(double a, int d)

    返回值: DOUBLE

    说明: 返回指定精度d的double类型

    举例:

    hive> select round(3.1415926,4) from dual;
    
    3.1416
    
    • 1
    • 2
    • 3
  • 向下取整函数 : floor

    语法: floor(double a)

    返回值: BIGINT

    说明: 返回等于或者小于该double变量的最大的整数

    举例:

    hive> select floor(3.1415926) from dual;
    3
    
    hive> select floor(25) from dual;
    25
    
    • 1
    • 2
    • 3
    • 4
    • 5

  • 向上取整函数 : ceil

    语法: ceil(double a)

    返回值: BIGINT

    说明: 返回等于或者大于该double变量的最小的整数

    举例:

    hive> select ceil(3.1415926) from dual;
    4
    
    hive> select ceil(46) from dual;
    46
    
    • 1
    • 2
    • 3
    • 4
    • 5

  • 取随机数函数 : rand

    语法: rand(),rand(int seed)

    返回值: double

    说明: 返回一个0到1范围内的随机数。如果指定种子seed,则会等到一个稳定的随机数序列

    举例:

    hive> select rand() from dual;
    
    0.5577432776034763
    
    • 1
    • 2
    • 3

  • 绝对值函数 : abs

    语法: abs(double a) abs(int a)

    返回值: double int

    说明: 返回数值a的绝对值

    举例:

    hive> select abs(-3.9) from dual;
    3.9
    
    hive> select abs(10.9) from dual;
    10.9
    
    • 1
    • 2
    • 3
    • 4
    • 5

4.5.2、日期函数
  • to_date(string timestamp):返回时间字符串中的日期部分,
    • 如to_date(‘1970-01-01 00:00:00’)=‘1970-01-01’
  • current_date:返回当前日期
  • year(date):返回日期date的年,类型为int
    • 如year(‘2019-01-01’)=2019
  • month(date):返回日期date的月,类型为int,
    • 如month(‘2019-01-01’)=1
  • day(date): 返回日期date的天,类型为int,
    • 如day(‘2019-01-01’)=1
  • weekofyear(date1):返回日期date1位于该年第几周。
    • 如weekofyear(‘2019-03-06’)=10
  • datediff(date1,date2):返回日期date1与date2相差的天数
    • 如datediff(‘2019-03-06’,‘2019-03-05’)=1
  • date_add(date1,int1):返回日期date1加上int1的日期
    • 如date_add(‘2019-03-06’,1)=‘2019-03-07’
  • date_sub(date1,int1):返回日期date1减去int1的日期
    • 如date_sub(‘2019-03-06’,1)=‘2019-03-05’
  • months_between(date1,date2):返回date1与date2相差月份
    • 如months_between(‘2019-03-06’,‘2019-01-01’)=2
  • add_months(date1,int1):返回date1加上int1个月的日期,int1可为负数
    • 如add_months(‘2019-02-11’,-1)=‘2019-01-11’
  • last_day(date1):返回date1所在月份最后一天
    • 如last_day(‘2019-02-01’)=‘2019-02-28’
  • next_day(date1,day1):返回日期date1的下个星期day1的日期。day1为星期X的英文前两字母
    • 如next_day(‘2019-03-06’,‘MO’) 返回’2019-03-11’
  • **trunc(date1,string1)
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/402659
推荐阅读
相关标签