当前位置:   article > 正文

hadoop之YARN简介_hadoop yarn

hadoop yarn

一、Yarn资源调度器

1、Yarn基础架构

Yarn负责为运行程序提供服务器运算资源,相当于一个分布式的操作平台,所有的MR程序的进程都由Yarn来调配和安排资源。
Yarn的构成,分别由ResourceManager,ApplicationMaster,NodeManager 和 Container构成。

  • ResourceMagager (简称RM) 资源管理器,整个集群资源的管理者
    • 处理客户端的请求(Client)
    • 启动和监控ApplicationMaster(一个任务的管理者)
    • 监控NodeManager(一个服务器节点的管理者)
    • 集群资源的调度、分配和回收
  • ApplicationMaster (简称AM)
    • 为应用程序申请资源和分配任务
    • 任务的监控与容错(当发生错误时,重新分发任务,确保任务的准确执行)
    • 每一个MR程序会开启一个对应的ApplicationMaster管理
  • NodeManager (简称NM)
    • 管理单个节点上的资源,包括向RM汇报当前节点的状态,当前文件存储的状态,管理此节点开辟的Container(容器)等
    • 处理来自RM的命令
    • 处理来自ApplicationMaster的命令
  • Container(容器)
    • 是Yarn的资源的抽象概念,封装了包括内存,CPU,磁盘,网络等资源,是运行任务的最小单位。

上面四者的关系是,每个集群都会有一个ResourceManager作为整个集群的管理者,每当有MR程序申请运行资源时,会起一个ApplicationMaster来管理该程序,每个AM会根据MRAppMaster,MapTask的数量和Reduce的数量向RM申请运行资源。每个节点(每台服务器)由一个NodeManager管理,负责管理该服务器的所有资源,每个节点里面根据内存,CPU核数,磁盘等可以有多个Container容器,比如服务器是16核,32G内存,可以分为16个容器,每个容器有1个核和2G内存的资源。比如上面的AM需要资源时,就会跟RM申请,以容器为单位的资源给到AM运行。

2、Yarn作业提交流程

Yarn简单的运行流程如下:
客户端跟ResourceManager的交互,也是作业的提交阶段
(1)MapReduce所在的client(客户端)向整个集群提交MapReduce作业,简称MR
(2)MapReduce所在的client(客户端)向ResourceManager(简称RM)提交申请执行一个ApplicationMaster(简称AM)
(3)RM将该应用程序AM的需要提交资源的资源路径返回给客户端,并返回标识该AM的application_id
(4)客户端该应用程序AM运行所需资源提交到RM返回的路径上,所需的资源包括job.split(Map的切片信息),job.xml(job运行的配置文件),wc.jar(该job的运行包)
(5)客户端向RM提交资源后,再次向RM提交申请运行MRAppMaster,此处AM的具体实例就成了MRAppMaster
因为 ApplicationMaster 就是整个任务,或者整个Job的统称,但提交的是MR任务时,会有MRAppMaster来管理,当提交的是Spark或者Flink的任务时,也会有特定的进程来管理。
ResourceManager跟NodeManager的交互,也是作业的分配阶段
(6)RM将客户端请求运行MRAppMaster的请求,初始化成一个任务Task,Task会在队列里面按照优先级和调度策略等再安排什么时候执行
(7)当轮到该MR执行时,会将该Task任务派发给一个NodeManager(简称NM)执行
(8)该NodeManager先创建执行的环境,就是创建容器Container
(9)该容器Container向RM请求下载资源,拷贝job.split(切片信息),job.xml(job运行的配置文件),wc.jar(该job的运行包)等资源到容器内,并在该容器运行MRAppMaster
(10)MRAppMaster向ResourceManager申请运行MapTask资源(比如需要2个)
(11)ResourceManager将运行MapTask任务分配给另外两个NodeManager,这两个NodeManager分别领取任务并创建容器。
MRAppMaster跟NodeManager的交互,也是作业的执行阶段
(12)MRAppmaster向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据进行处理分区和排序等
(13)一旦有MapTask运行完,MRAppmaster就会再次向RM申请运行ReduceTask资源(比如也是2个),RM分配NodeManager开辟容器,运行ReduceTask
(14)ReduceTask向MapTask获取相应分区的数据,并处理数据
(15)MapTask和ReduceTask运行完毕后,MRAppMaster和容器会清理工作状态,恢复空闲状态,作业的信息会被历史服务器存储,以备后续核查
(16)MRAppMaster程序运行完后,MR会向MRAppmaster申请注销自己
其中进度和状态的更新,YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
除了向应用管理器(ApplicationMaster)请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

3、Yarn调度器和调度算法

在上面的第(6)步中,RM将用户的请求封装成Task,但如果同时有多个客户端提交多个Task,就会涉及到Task的先后运行的调度问题,还有当系统资源不足时,哪个Task才有资源的优先使用权,该用多少资源等这就是调度器和调度算法要解决的问题。
目前的调度器是三种,FIFO(先进先出调度器),Capacity Scheduler(容量调度器)和Fair Scheduler(公平调度器),Hadoop-3.1.3默认的资源调度器是容量调度器,CDH框架默认调度器是公平调度器,一般情况下中小型公司用Capacity调度器,大型公司用Fair调度器。
Yarn的配置文件 yarn-default.xml 中的值 yarn.resourcemanager.scheduler.class 定了默认的是 CapacityScheduler 调度器。

先进先出调度器(FIFO)

FIFO调度器(First In First Out):单队列,根据提交作业的先后顺序,先来先服务。虽然简单易懂,但因为不支持多队列,调度也只能按照先后顺序,不符合实际的情况,所以正式环境很少用到。

容量调度器(Capacity Scheduler)

容量调度器有三个队列,分别用A,B,C表示为queueA,queueB,queueC。
容量调度器的特点:
(1)queueA占用20%资源,queueB占用50%资源,queueC占用30%资源
(2)多队列:每个队列里面都是按照FIFO的调度策略,即先进来的先执行
(3)容量保证:管理员可以为每个队列设置资源最低和最高的使用上限
(4)灵活性:如果一个队列的资源有剩余,可以暂时借给其他队列使用;但当该队列有新任务时,可以将该资源抢占回来,该队列的任务拥有该队列资源的更高权限
(5)多用户:每个队列下都可以用多个用户平均分配资源,且调度器会对同一用户提交的作业所占的资源进行限制,避免因单一用户提交死循环导致整个集群崩溃

容量调度器的分配算法:
(1)队列资源分配
从root开始,使用深度优先算法,优先选择资源占用率最低的队列分配,如果资源占用率一样,则按照队列顺序分配。A,B,C队列都是无资源占用的,所以优先选择A队列
(2)作业资源分配
默认按照作业的优先级和提交时间顺序来分配资源,有设置优先级,按照优先级排序,没有设置优先级,按照提交的顺序排序
(3)容器资源分配
按照容器的优先级分配资源,如果优先级相同或没有设置优先级,则按照数据本地性原则分配:
1)任务和数据在同一节点
2)任务和数据在同一机架
3)任务和数据不在同一节点也不在同一机架

公平调度器(Fair Scheduler)

Fair Scheduler(公平调度器),是由Fackbook开发的多用户调度器,同队列所有任务共享资源,在时间尺度上面获得公平的资源。Fair的调度器特点跟Capacity的特点一样的有:
(1)支持多队列多用户作业,同样的3个队列分别占20%,50%和30%
(2)容量保证,管理员可以为每个队列设置资源最低和最高使用上限
(3)灵活性,一个队列的资源充足时可以临时借给其他队列,而一旦该队列有新的应用程序时,则借给其他队列的资源就会归还给该队列
(4)会对同一用户添加的作业所占资源量进行限制,防止一人干掉整个集群

公平调度器和容量调度器的区别在于:
(1)核心策略不同,容量调度器优先选择资源占用率低的队列,公平调度器优先选择缺额较大的队列,缺额就是某一时刻,作业应获得的资源与实际获取的资源之间的差距就叫缺额(例如队列A已经执行了10个任务,只剩下100M的内容容量,此时来了任务11,需要1G的内存,此时的缺额就是1G-100M=900M)。
(2)每个队列可以单独设置资源分配的方式不同,容量调度器:FIFO、DRF,公平调度器:FIFO、FAIR、DRF等
(3)容量调度器一般是先进先出,就是先提交的作业,或者优先级高的作业优先获得执行,而公平调度器会根据优先级进行资源的分配,保证每个任务都能的到对应的资源,当资源不足时,都能的到相应的优先级的配额资源。
公平调度器是使所有作业都获得公平的资源,所以会优先为缺额大的队列分配资源。Fair一般是按照总资源对所有作业进行平均分配,多退少补;设置优先级的情况,会按照优先级的比例分配,多退少补。再对剩下的资源也同样进行平均分配,直至所有任务都有资源执行或所有资源分配完。

FIFO策略:
公平调度器每个队列资源分配策略如果选择FIFO的话,此时公平调度器相当于上面讲过的容量调度器。
Fair策略:
Fair策略(默认)是一种基于最大最小公平算法实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。这意味着,如果一个队列中有两个应用程序同时运行,则每个应用程序可得到1/2的资源,如果是三个应用程序同时运行,则每个应用程序可得到1/3的资源。
具体资源分配流程和容量调度器一致:
(1)选择队列
(2)选择作业
(3)选择容器
以上三步,每一步都是按照公平策略来分配资源的。

作业资源分配:
(a)不加权(关注点是Job的个数) :
需求:有一条队列总资源12个,有4个job,对资源的需求分别是:
job1->1,job2->2, job3->6,job4->5
第一次算:12 / 4=3 没有优先级时,每个任务平均分配,每个任务分配3个资源
job1:分3 --> 多 2 个
job2:分3 --> 多 1 个
job3:分3 – 差 3 个
job4:分3 --> 差 2 个
第二次算: 3 /2 = 1.5 (多出3个资源,给Job3和Job4平均分配)
job1:分1
job2:分2
job3:分3个差3个–> 分1.5 --> 最终:4.5
job4 :分3个差2个–> 分1.5 --> 最终:4.5
第n次算: 一直算到没有空闲资源,此时因为资源已经满了,便不再分配,等Job1和Job2运行完,或者有其他空闲资源,会再次执行分配操作,给剩余的Job分配资源。

(b)加权(关注点是Job的权重)
需求:有一条队列总资源16,有4个job
对资源的需求分别是:
job1->4,job2->2,job3->10,job4->4
每个job的权重为:
job1->5,job2->8,job3->1,job4->2
第一次算: 16(5+8+1+2) = 1,按照优先级(权重)划分资源
job1 :分5,实际需要4,多1
job2 :分8,实际需要2,多6
job3 :分1,实际需要10,差9
job4 :分2,实际需要4,差2
第二次算: 7/ (1+2) = 7/3 (约等于2.33)
job1 :分4
job2 :分2
job3 :分1 --> 分2.33,实际需要10,差6.67
job4 :分2 --> 分4.66,实际需要4,多2.66
第三次算:2.66/1=2.66
job1 :分4
job2 :分2
job3 :分1 --> 分2.33–> 分2.66,实际分到6左右,实际需要10,差4
job4 :分4
第n次算: 一直算到没有空闲资源
以上可以看出,虽然job3的优先级很低,但是还是在系统资源许可的情况下,分到了6个的资源,保证了作业的运行,保证了每个作业都能够及时的得到响应。

DRF策略:
DRF(Dominant Resource Fairness)
之前的资源考虑的大多是内存,但很多时候资源有很多种,例如CPU和内存,网络宽带等,我们很难衡量两个应用应该分配的比例,DRF可以根据实际情况选择不同的分配CPU和内存资源的比例,更加的切合实际。比如A任务是需要16核CPU,5G内存,而B任务是需要2核CPU,20G内存,如果之前是按照1核1G内存的比例分配的,这样就会给A任务分配16核CPU,16G内存,给B分配,20核CPU和20G内存,就会导致两个任务都会有资源被浪费,而不是最优的选择。DRF可以动态的调整CPU和内存之间的配比,会更加符合实际。

4、Yarn常用命令

查看所有的application的信息

$ yarn application -list
  • 1

对application的状态进行过滤 (ALL, NEW, NEW_SAVING, SUBMITTED,ACCEPTED,RUNNING,FINISHED,FAILED,KILLED)等

$ yarn application -list -appStates FINISHED
  • 1

干掉运行卡顿的application,需要接

$ yarn application -kill application_1612577921195_0001
  • 1

查看application的日志,需要接

$ yarn logs -applicationId application_1612577921195_0001
  • 1

查看Container容器日志,需要 和

$ yarn logs -applicationId application_1612577921195_0001 - containerId container_1612577921195_0001_01_000001
  • 1

查看尝试运行的任务,需要接

$ yarn applicationattempt -list application_1612577921195_0001
  • 1

打印尝试运行任务的状态,需要接

$ yarn applicationattempi -status appattempt_1612577921195_0001_000001
  • 1

查看容器,列出所有的容器

$ yarn container -list appattempt_1612577921195_0001_000001
  • 1

查看Container的状态,yarn container -status ,只有在运行状态才能查看

$ yarn container -status container_1612577921195_0001_01_000001
  • 1

查看节点情况

$ yarn node -list -all
  • 1

更新队列的配置,无需重启yarn

$ yarn rmadmin -refreshQueues
  • 1

查看队列 ,查看默认队列的情况

$ yarn queue -status default
  • 1

二、Yarn案例实操

1、Yarn生成环境配置

yarn-site.xml 的部分配置如下:

<!-- 配置调度器,默认是容量调度器 -->
<property>
	<description>The class to use as the resource scheduler.</description>
	<name>yarn.resourcemanager.scheduler.class</name>
	<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

<!-- ResourceManager处理调度器请求的线程数量,默认50;如果提交的任务数大于50,可以增加该值,但是不能超过CPU的总线程数-->
<!-- 例如3个CPU,每个只有4线程,3台 * 4线程 = 12线程(去除其他应用程序实际不能超过8,需要预留部分) -->
<property>
	<description>Number of threads to handle scheduler interface.</description>
	<name>yarn.resourcemanager.scheduler.client.thread-count</name>
	<value>8</value>
</property>

<!-- 是否让yarn自动检测硬件进行配置,默认是false,如果该节点有很多其他应用程序,建议手动配置。-->
<!-- 如果该节点没有其他应用程序,可以采用自动 -->
<property>
	<description>Enable auto-detection of node capabilities such as memory and CPU.
	</description>
	<name>yarn.nodemanager.resource.detect-hardware-capabilities</name>
	<value>false</value>
</property>

<!-- 是否将虚拟核数当作CPU核数,默认是false,采用物理CPU核数 -->
<property>
	<description>Flag to determine if logical processors(such as
	hyperthreads) should be counted as cores. Only applicable on Linux
	when yarn.nodemanager.resource.cpu-vcores is set to -1 and
	yarn.nodemanager.resource.detect-hardware-capabilities is true.
	</description>
	<name>yarn.nodemanager.resource.count-logical-processors-as-cores</name>
	<value>false</value>
</property>

<!-- 虚拟核数和物理核数乘数,默认是1.0 -->
<property>
	<description>Multiplier to determine how to convert phyiscal cores to
	vcores. This value is used if yarn.nodemanager.resource.cpu-vcores
	is set to -1(which implies auto-calculate vcores) and
	yarn.nodemanager.resource.detect-hardware-capabilities is set to true. The	number of vcores will be calculated as	number of CPUs * multiplier.
	</description>
	<name>yarn.nodemanager.resource.pcores-vcores-multiplier</name>
	<value>1.0</value>
</property>

<!-- NodeManager使用内存数,默认8G,修改为4G内存 -->
<property>
	<description>Amount of physical memory, in MB, that can be allocated 
	for containers. If set to -1 and
	yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
	automatically calculated(in case of Windows and Linux).
	In other cases, the default is 8192MB.
	</description>
	<name>yarn.nodemanager.resource.memory-mb</name>
	<value>4096</value>
</property>

<!-- nodemanager的CPU核数,不按照硬件环境自动设定时默认是8个,修改为4个 -->
<property>
	<description>Number of vcores that can be allocated
	for containers. This is used by the RM scheduler when allocating
	resources for containers. This is not used to limit the number of
	CPUs used by YARN containers. If it is set to -1 and
	yarn.nodemanager.resource.detect-hardware-capabilities is true, it is
	automatically determined from the hardware in case of Windows and Linux.
	In other cases, number of vcores is 8 by default.</description>
	<name>yarn.nodemanager.resource.cpu-vcores</name>
	<value>4</value>
</property>

<!-- 容器最小内存,默认1G -->
<property>
	<description>The minimum allocation for every container request at the RM	in MBs. Memory requests lower than this will be set to the value of this	property. Additionally, a node manager that is configured to have less memory	than this value will be shut down by the resource manager.
	</description>
	<name>yarn.scheduler.minimum-allocation-mb</name>
	<value>1024</value>
</property>

<!-- 容器最大内存,默认8G,修改为2G -->
<property>
	<description>The maximum allocation for every container request at the RM	in MBs. Memory requests higher than this will throw an	InvalidResourceRequestException.
	</description>
	<name>yarn.scheduler.maximum-allocation-mb</name>
	<value>2048</value>
</property>

<!-- 容器最小CPU核数,默认1个 -->
<property>
	<description>The minimum allocation for every container request at the RM	in terms of virtual CPU cores. Requests lower than this will be set to the	value of this property. Additionally, a node manager that is configured to	have fewer virtual cores than this value will be shut down by the resource	manager.
	</description>
	<name>yarn.scheduler.minimum-allocation-vcores</name>
	<value>1</value>
</property>

<!-- 容器最大CPU核数,默认4个,修改为2个 -->
<property>
	<description>The maximum allocation for every container request at the RM	in terms of virtual CPU cores. Requests higher than this will throw an
	InvalidResourceRequestException.</description>
	<name>yarn.scheduler.maximum-allocation-vcores</name>
	<value>2</value>
</property>

<!-- 虚拟内存检查,默认打开,修改为关闭 -->
<property>
	<description>Whether virtual memory limits will be enforced for
	containers.</description>
	<name>yarn.nodemanager.vmem-check-enabled</name>
	<value>false</value>
</property>

<!-- 虚拟内存和物理内存设置比例,默认2.1 -->
<property>
	<description>Ratio between virtual memory to physical memory when	setting memory limits for containers. Container allocations are	expressed in terms of physical memory, and virtual memory usage	is allowed to exceed this allocation by this ratio.
	</description>
	<name>yarn.nodemanager.vmem-pmem-ratio</name>
	<value>2.1</value>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118

如果集群的硬件资源一致,可以分发该配置,如果集群的硬件资源不一致,则仍需单独配置。
配置文件修改后,都需要重启yYarn服务:

sbin/stop-yarn.sh
sbin/start-yarn.sh
  • 1
  • 2

2、Yarn容量调度器配置

Yarn的队列配置都是按照实际的生产需求来配置的,配置队列一是可以防止个别员工写的死循环导致整个集群资源全部耗尽,还能给队列设置优先级,优先保障特殊时期重要任务的资源充足,从而保障业务的正常运行。
假设有如下需求:
需求1:default队列占总内存的40%,最大资源容量占总资源60%,hive队列占总内存的60%,最大资源容量占总资源80%。
需求2:配置队列优先级

配置容量调度器

在capacity-scheduler.xml中配置如下:
(1)修改如下配置

<!-- 指定多队列,增加hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,hive</value>
    <description>
      The queues at the this level (root is the root queue).
    </description>
</property>

<!-- 降低default队列资源额定容量为40%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
</property>

<!-- 降低default队列资源最大容量为60%,默认100% -->
<property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

(2)为新加队列添加必要属性:

<!-- 指定hive队列的资源额定容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.capacity</name>
    <value>60</value>
</property>

<!-- 用户最多可以使用队列多少资源,1表示能全部占用该队列的资源 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.user-limit-factor</name>
    <value>1</value>
</property>

<!-- 指定hive队列的资源最大容量 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-capacity</name>
    <value>80</value>
</property>

<!-- 启动hive队列 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.state</name>
    <value>RUNNING</value>
</property>

<!-- 哪些用户有权限向队列提交作业,默认*是所有用户都可以提交,指定用户需要指定用户名称 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_submit_applications</name>
    <value>*</value>
</property>

<!-- 哪些用户有权操作队列,管理员权限(查看/杀死),默认是全部用户*,指定用户需要指定用户名称 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_administer_queue</name>
    <value>*</value>
</property>

<!-- 哪些用户有权配置提交任务优先级,默认是全部用户*,指定用户需要指定用户名称 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.acl_application_max_priority</name>
    <value>*</value>
</property>

<!-- 如果application指定了超时时间,则提交到该队列的application能够指定的最大超时时间不能超过该值。 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.maximum-application-lifetime</name>
    <value>-1</value>
</property>

<!-- 如果application没指定超时时间,则用default-application-lifetime作为默认值 -->
<property>
    <name>yarn.scheduler.capacity.root.hive.default-application-lifetime</name>
    <value>-1</value>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

配置完配置文件之后,需要先分发配置文件,然后再执行以下命令刷新队列:

yarn rmadmin -refreshQueues
  • 1

之后就能在Yarn的Web端上面看到Scheduler下面的Application Queues看到配置的两个队列。

向容量调度器中提交任务

(1)hadoop jar的方式提交队列到Hive

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -D mapreduce.job.queuename=hive /input /output
  • 1

(2)打jar包的形式
默认的任务提交都是提交到default队列的。如果希望向其他队列提交任务,需要在Driver中声明:

public class WcDrvier {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        conf.set("mapreduce.job.queuename","hive");

        //1. 获取一个Job实例
        Job job = Job.getInstance(conf);
		... ...
        //6. 提交Job
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在conf.set设置后,才会在提交任务时,提交到hive队列运行。

容量调度器任务优先级

配置优先级的目的是在资源紧张的时候,优先为重要的任务提供运行的资源,Yarn将所有任务的优先级限制为0,若想使用任务的优先级功能,须先开放限制。
(1)修改yarn-site.xml文件,增加以下参数,以下设置为5个优先级,分别对应1-5

<property>
	<name>yarn.cluster.max-application-priority</name>
	<value>5</value>
</property>
  • 1
  • 2
  • 3
  • 4

(2)分发Yarn配置,并重启Yarn进程

xsync yarn-site.xml
sbin/stop-yarn.sh 
sbin/start-yarn.sh 
  • 1
  • 2
  • 3

(3)模拟紧张资源,可以连续提交如下任务,如重复提交3-10次,视资源情况而定

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi 5 2000000
  • 1

(4)再次提交优先级更高的任务,其中 -D mapreduce.job.priority=5 为设置优先级

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi  -D mapreduce.job.priority=5 5 2000000
  • 1

(5)也可以通过命令行的模式,修改正在排队的任务的优先级,让其优先执行

yarn application -appID application_1611133087930_0009 -updatePriority 5
  • 1

3、Yarn公平调度器配置

安装

要使用公平调度器,首先要在 yarn-site.xml 中配置公平调度器:

<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
<property>
  • 1
  • 2
  • 3
  • 4

配置

自定义的Fair Scheduler通常涉及涉及更改两个文件。首先,可以通过在现有配置目录中的 yarn-site.xml 文件中添加配置属性来设置调度程序范围的选项。其次,需要创建一个分配文件 fair-scheduler.xml,列出存在哪些队列以及它们各自的权重和容量。分配文件每 10 秒重新加载一次,允许即时更改。
(1)修改yarn-site.xml文件,加入以下参数

<property>
	<name>yarn.scheduler.fair.allocation.file</name>
	<value>/opt/module/hadoop-3.1.3/etc/hadoop/fair-scheduler.xml</value>
	<description>指明公平调度器队列分配的配置文件</description>
</property>

<property>
	<name>yarn.scheduler.fair.preemption</name>
	<value>false</value>
	<description>禁止队列间资源抢占</description>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可以放在yarn-site.xml的明细属性:

属性说明
yarn.scheduler.fair.allocation.file分配文件的路径。分配文件是一个 XML 清单,除了某些策略默认值之外,还描述了队列及其属性。此文件必须采用下一节中描述的 XML 格式。如果给出了相对路径,则会在类路径(通常包括 Hadoop conf 目录)中搜索该文件。默认为 fair-scheduler.xml。
yarn.scheduler.fair.user-as-default-queue如果未指定队列名称,是否使用与分配关联的用户名作为默认队列名称。如果设置为“false”或未设置,则所有作业都有一个共享的默认队列,名为“default”。默认为真。如果分配文件中给出了队列放置策略,则忽略此属性。
yarn.scheduler.fair.preemption是否使用抢占。默认为假。
yarn.scheduler.fair.preemption.cluster-utilization-threshold抢占开始后的利用率阈值。利用率计算为所有资源中使用量与容量的最大比率。默认为 0.8f。
yarn.scheduler.fair.sizebasedweight是否根据应用程序的大小将份额分配给各个应用程序,而不是为所有应用程序提供平等的份额而不考虑大小。设置为 true 时,应用程序的权重为 1 的自然对数加上应用程序的总请求内存,再除以 2 的自然对数。默认为 false。
yarn.scheduler.fair.assignmultiple是否允许在一次心跳中分配多个容器。默认为假。
yarn.scheduler.fair.dynamic.max.assign如果assignmultiple为true,是否动态决定一次心跳可以分配的资源量。开启后,节点上大约一半未分配的资源会在一次心跳中分配给容器。默认为真。
yarn.scheduler.fair.max.assign如果 assignmultiple 为真且 dynamic.max.assign 为假,一次心跳中可以分配的最大容器数量。默认为 -1,即不设置限制。
yarn.scheduler.fair.locality.threshold.node对于在特定节点上请求容器的应用程序,自上次容器分配以来在接受另一个节点上的放置之前要等待的调度机会数。表示为 0 和 1 之间的浮点数,作为集群大小的一部分,是要传递的调度机会的数量。默认值 -1.0 表示不放弃任何调度机会。
yarn.scheduler.fair.locality.threshold.rack对于在特定机架上请求容器的应用程序,自上次容器分配以来在接受另一个机架上的放置之前等待的调度机会数。表示为 0 和 1 之间的浮点数,作为集群大小的一部分,是要传递的调度机会的数量。默认值 -1.0 表示不放弃任何调度机会。
yarn.scheduler.fair.allow-undeclared-pools如果是这样,则可以在应用程序提交时创建新队列,无论是因为它们由提交者指定为应用程序的队列,还是因为它们由 user-as-default-queue 属性放置在那里。如果这是错误的,任何时候一个应用程序将被放置在分配文件中未指定的队列中,而是放置在“默认”队列中。默认为真。如果分配文件中给出了队列放置策略,则忽略此属性。
yarn.scheduler.fair.update-interval-ms锁定调度程序并重新计算公平份额、重新计算需求并检查是否有任何东西需要抢占的时间间隔。默认为 500 毫秒。
yarn.resource-types.memory-mb.increment-allocationfairscheduler 以该值的增量授予内存。如果您提交的任务的资源请求不是memory-mb.increment-allocation的倍数,则请求将四舍五入到最接近的增量。默认为 1024 MB。
yarn.resource-types.vcores.increment-allocationfairscheduler 以该值的增量授予 vcores。如果您提交的任务的资源请求不是vcores.increment-allocation的倍数,则请求将四舍五入到最接近的增量。默认为 1。
yarn.resource-types..increment-allocationfairscheduler 以该值的增量授予。如果您提交的任务的资源请求不是.increment-allocation的倍数,则请求将四舍五入到最接近的增量。如果没有为资源指定此属性,则不会应用增量舍入。如果未指定单位,则采用资源的默认单位。
yarn.scheduler.increment-allocation-mb内存的分配增量。不再优选。请改用yarn.resource-types.memory-mb.increment-allocation。默认为 1024 MB。
yarn.scheduler.increment-allocation-vcoresCPU vcores 的分配增量。不再优选。请改用yarn.resource-types.vcores.increment-allocation。默认为 1。

(2)配置fair-scheduler.xml

<?xml version="1.0"?>
<allocations>
  <!-- 单个队列中Application Master占用资源的最大比例,取值0-1 ,企业一般配置0.1 -->
  <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault>
  <!-- 单个队列最大资源的默认值 test atguigu default -->
  <queueMaxResourcesDefault>4096mb,4vcores</queueMaxResourcesDefault>

  <!-- 增加一个队列test -->
  <queue name="test">
	<!-- 队列最小资源 -->
	<minResources>2048mb,2vcores</minResources>
	<!-- 队列最大资源 -->
	<maxResources>4096mb,4vcores</maxResources>
	<!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
	<maxRunningApps>4</maxRunningApps>
	<!-- 队列中Application Master占用资源的最大比例 -->
	<maxAMShare>0.5</maxAMShare>
	<!-- 该队列资源权重,默认值为1.0 -->
	<weight>1.0</weight>
	<!-- 队列内部的资源分配策略 -->
	<schedulingPolicy>fair</schedulingPolicy>
  </queue>
  
  <!-- 增加一个队列atguigu -->
  <queue name="atguigu" type="parent">
	<!-- 队列最小资源 -->
	<minResources>2048mb,2vcores</minResources>
	<!-- 队列最大资源 -->
	<maxResources>4096mb,4vcores</maxResources>
	<!-- 队列中最多同时运行的应用数,默认50,根据线程数配置 -->
	<maxRunningApps>4</maxRunningApps>
	<!-- 队列中Application Master占用资源的最大比例 -->
	<maxAMShare>0.5</maxAMShare>
	<!-- 该队列资源权重,默认值为1.0 -->
	<weight>1.0</weight>
	<!-- 队列内部的资源分配策略 -->
	<schedulingPolicy>fair</schedulingPolicy>
  </queue>

  <!-- 任务队列分配策略,可配置多层规则,从第一个规则开始匹配,直到匹配成功 -->
  <queuePlacementPolicy>
	<!-- 提交任务时指定队列,如未指定提交队列,则继续匹配下一个规则; false表示:如果指定队列不存在,不允许自动创建-->
	<rule name="specified" create="false"/>
	<!-- 提交到root.group.username队列,若root.group不存在,不允许自动创建;若root.group.user不存在,允许自动创建 -->
	<rule name="nestedUserQueue" create="true">
		<rule name="primaryGroup" create="false"/>
	</rule>
	<!-- 最后一个规则必须为reject或者default。Reject表示拒绝创建提交失败,default表示把任务提交到default队列 -->
	<rule name="reject" />
  </queuePlacementPolicy>
</allocations>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

分配文件必须是 XML 格式。关于配置文件里面的详细说明,可以查看官方的配置文档。
公平调度器官方配置文档
配置完后,记住要分发配置并重启Yarn:

xsync yarn-site.xml
xsync fair-scheduler.xml
sbin/stop-yarn.sh
sbin/start-yarn.sh
  • 1
  • 2
  • 3
  • 4

提交任务

(1)提交任务是指定队列,任务就会到指定的root.test队列中

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi -Dmapreduce.job.queuename=root.test 1 1
  • 1

(2)提交任务时不指定队列,按照配置规则,任务会到默认的队列上面

hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar pi 1 1
  • 1

Yarn的Tool接口案例

我们在调用hadoop自带的程序时,程序后面是可以添加参数的,但我们自己编写的程序打包成jar包后上传时,只能固定参数的内容,那怎么也给自己写的程序打成jar包时,可以自由的调用参数呢?
hadoop这边提供了Tool接口,让我们自己写的程序也可以动态修改参数。
(1)创建类WordCount并实现Tool接口:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import java.io.IOException;

public class WordCount implements Tool {

    private Configuration conf;
    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        private Text outK = new Text();
        private IntWritable outV = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split(" ");
            for (String word : words) {
                outK.set(word);
                context.write(outK, outV);
            }
        }
    }

    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable outV = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            outV.set(sum);
            context.write(key, outV);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

(2)新建WordCountDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.util.Arrays;

public class WordCountDriver {
    private static Tool tool;
    public static void main(String[] args) throws Exception {
        // 1. 创建配置文件
        Configuration conf = new Configuration();
        // 2. 判断是否有tool接口
        switch (args[0]){
            case "wordcount":
                tool = new WordCount();
                break;
            default:
                throw new RuntimeException(" No such tool: "+ args[0] );
        }
        // 3. 用Tool执行程序
        // Arrays.copyOfRange 将老数组的元素放到新数组里面
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
        System.exit(run);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

(3)在HDFS上准备输入文件,假设为/input目录,向集群提交该Jar包

yarn jar YarnDemo.jar com.atguigu.yarn.WordCountDriver wordcount /input /output
  • 1

注意此时提交的3个参数,第一个用于生成特定的Tool,第二个和第三个为输入输出目录。此时如果我们希望加入设置参数,可以在wordcount后面添加参数,例如:

yarn jar YarnDemo.jar com.atguigu.yarn.WordCountDriver wordcount -Dmapreduce.job.queuename=root.test /input /output1
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/852412
推荐阅读
相关标签
  

闽ICP备14008679号