当前位置:   article > 正文

hadoop -- Yarn资源管理_yarn资源管理器

yarn资源管理器

Yarn

YARN被设计用以解决以往架构的需求和缺陷的资源管理调度软件


Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统调度平台,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。

资源管理系统:集群的硬件资源,和程序运行相关,比如内存,CPU等

调度平台:多个程序同时申请计算资源如何分配,调度的规则(算法)。

通用∶不仅仅支持MapReduce程序,理论上支持各种计算程序。YARN不关心你干什么,只关心你要资源,在有的情况下给你,用完之后还我。

可以把Hadoop YARN理解为相当于一个分布式的操作系统平台,而MapReduce等计算程序则相当于运行于操作系统之上的应用程序,YARN为这些程序提供运算所需的资源(内存、CPU等)。

Yarn架构体系


主从架构

 

也是采用 master(Resource Manager)- slave (Node Manager)架构,Resource Manager 整个集群只有一个,一个可靠的节点。

1、 每个节点上可以负责该节点上的资源管理以及任务调度,Node Manager 会定时向Resource Manager汇报本节点上 的资源使用情况和任务运行状态,
2、 Resource Manager会通过心跳应答的机制向Node Manager下达命令或者分发新的任务,
3、 Yarn 将某一资源分配给该应用程序后,应用程序会启动一个Application Master,
4、 Application Master为应用程序负责向Resource Manager申请资源,申请资源之后,再和申请到的节点进行通信,运行内部任务。

Resource Manager

RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。

Schedule 资源调度器是一个可插拔的组件,用户可根据自己需要设计资新的源调度器,YARN提供多个可直接使用的资源调度器。资源调度器将系统中的资源分配给正在运行的程序,不负责监控或跟踪应用的执行状态,不负责重启失败的任务

Applications Manager 应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

Node Manager

NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。

ApplicationMaster

用户提交的每个应用程序均包含一个AM,主要功能包括:

1.与RM调度器协商以获取资源(用Container表示);

2.将得到的任务进一步分配给内部的任务

3.与NM通信以启动/停止任务;

4.监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

注:RM只负责监控AM,在AM运行失败时候启动它,RM并不负责AM内部任务的容错,这由AM来完成。

Container

Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。yarn的container容器是yarn虚拟出来的一个东西,属于虚拟化的,它是由memory+vcore组成,是专门用来运行任务的

安装

etc/hadoop/目录下 yarn-site.xml文件

  1. cd /opt/apps/hadoop-3.1.1/etc/hadoop/
  2. vi yarn-site.xml
  3. <!-- resource,manager主节点所在机器 -->
  4. <property>
  5. <name>yarn.resourcemanager.hostname</name>
  6. <value>linux01</value>
  7. </property>
  8. <!-- 为mr程序提供shuffle服务 -->
  9. <property>
  10. <name>yarn.nodemanager.aux-services</name>
  11. <value>mapreduce_shuffle</value>
  12. </property>
  13. <!-- 一台NodeManager的总可用内存资源 -->
  14. <property>
  15. <name>yarn.nodemanager.resource.memory-mb</name>
  16. <value>4096</value>
  17. </property>
  18. <!-- 一台NodeManager的总可用(逻辑)cpu核数 -->
  19. <property>
  20. <name>yarn.nodemanager.resource.cpu-vcores</name>
  21. <value>4</value>
  22. </property>
  23. <!-- 是否检查容器的虚拟内存使用超标情况
  24. vmem为true 指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小
  25. -->
  26. <property>
  27. <name>yarn.nodemanager.vmem-check-enabled</name>
  28. <value>false</value>
  29. </property>
  30. <!-- 容器的虚拟内存使用上限:与物理内存的比率 -->
  31. <property>
  32. <name>yarn.nodemanager.vmem-pmem-ratio</name>
  33. <value>2.1</value>
  34. </property>
  35. 将 yarn-site.xml 同步给其他Linux
  36. scp yarn-site.xml linux02:$PWD
  37. scp yarn-site.xml linux03:$PWD

配置一键启停

  1. cd /opt/apps/hadoop-3.1.1/sbin
  2. vi start-yarn.sh
  3. vi stop-yarn.sh
  4. YARN_RESOURCEMANAGER_USER=root
  5. HADOOP_SECURE_DN_USER=yarn
  6. YARN_NODEMANAGER_USER=root
  7. start-yarn.sh 一键启动
  8. 启动后可以访问 http://linux01:8088 查看页面
  9. 解决linux连接不上 可能网卡出现问题
  10. systemctl stop NetworkManager
  11. systemctl diable NetworkManager
  12. systemctl restart network

MR程序提交到Yarn

使用idea提交程序

配置mapred-site.xml文件 添加到resources目录下

  1. <?xml version="1.0"?>
  2. <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  3. <configuration>
  4. <property>
  5. <name>yarn.app.mapreduce.am.env</name>
  6. <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
  7. </property>
  8. <property>
  9. <name>mapreduce.map.env</name>
  10. <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
  11. </property>
  12. <property>
  13. <name>mapreduce.reduce.env</name>
  14. <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
  15. </property>
  16. </configuration>
  17. day05.com.doit.demo06;

修改提交任务的代码 maven打jar包的命令为 package

  1. public class Test02 {
  2. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  3. System.setProperty("HADOOP_USER_NAME", "root");
  4. Configuration conf = new Configuration();
  5. //操作HDFS数据
  6. conf.set("fs.defaultFS", "hdfs://linux01:8020");
  7. //设置运行模式
  8. conf.set("mapreduce.framework.name", "yarn");
  9. //设置ResourceManager位置
  10. conf.set("yarn.resourcemanager.hostname", "linux01");
  11. // 设置MapReduce程序运行在windows上的跨平台参数
  12. conf.set("mapreduce.app-submission.cross-platform","true");
  13. Job job = Job.getInstance(conf, "WordCount");
  14. //设置jar包路径
  15. job.setJar("D:\\IdeaProjects\\hadoop\\target\\test_yarn.jar");
  16. job.setMapperClass(WordCountMapper.class);
  17. job.setReducerClass(WordCountReduce.class);
  18. job.setMapOutputKeyClass(Text.class);
  19. job.setMapOutputValueClass(IntWritable.class);
  20. job.setOutputKeyClass(Text.class);
  21. job.setOutputValueClass(IntWritable.class);
  22. //设置路径为HDFS路径
  23. FileInputFormat.setInputPaths(job,new Path("/wc/input/word.txt"));
  24. FileOutputFormat.setOutputPath(job,new Path("/wc/out4"));
  25. job.waitForCompletion(true);
  26. }
  27. }

在linux上直接提交jar包

  1. public class Test02 {
  2. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  3. Configuration conf = new Configuration();
  4. //设置运行模式
  5. conf.set("mapreduce.framework.name", "yarn");
  6. //设置ResourceManager位置
  7. conf.set("yarn.resourcemanager.hostname", "linux01");
  8. // 设置MapReduce程序运行在windows上的跨平台参数
  9. conf.set("mapreduce.app-submission.cross-platform","true");
  10. Job job = Job.getInstance(conf, "WordCount");
  11. //设置jar包路径
  12. //job.setJar("D:\\IdeaProjects\\hadoop\\target\\test_yarn.jar");
  13. job.setJarByClass(Test02.class);
  14. job.setMapperClass(WordCountMapper.class);
  15. job.setReducerClass(WordCountReduce.class);
  16. job.setMapOutputKeyClass(Text.class);
  17. job.setMapOutputValueClass(IntWritable.class);
  18. job.setOutputKeyClass(Text.class);
  19. job.setOutputValueClass(IntWritable.class);
  20. //设置路径为HDFS路径
  21. FileInputFormat.setInputPaths(job,new Path("/wc/input/word.txt"));
  22. FileOutputFormat.setOutputPath(job,new Path("/wc/out5"));
  23. job.waitForCompletion(true);
  24. }
  25. }
  26. 打成jar包后
  27. linux上使用
  28. 需要查看 mapred-site.xml 如果没有配置 需要配置一下
  29. hadoop jar jar包名 运行的类
  30. hadoop jar test_yarn.jar day03.com.doit.demo02.Test02

Map Join

Map端join是指数据达到map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。

  1. order.txt
  2. order011 u001
  3. order012 u001
  4. order033 u005
  5. order034 u002
  6. order055 u003
  7. order066 u004
  8. order077 u010
  9. user.txt
  10. u001,hangge,18,male,angelababy
  11. u002,huihui,58,female,ruhua
  12. u003,guanyu,16,male,chunge
  13. u004,laoduan,38,male,angelababy
  14. u005,nana,24,femal,huangbo
  15. u006,xingge,18,male,laoduan
  16. 最终结果
  17. u001,hangge,18,male,angelababy,order012
  18. u001,hangge,18,male,angelababy,order011
  19. u002,huihui,58,female,ruhua,order034
  20. u003,guanyu,16,male,chunge,order055
  21. u004,laoduan,38,male,angelababy,order066
  22. u005,nana,24,femal,huangbo,order033
  23. null,order077

一个用户可能会产生多个订单,可能user.txt中的用户非常少,但是订单数据又非常非常多,这时我们可以考虑使用Map端join.一个小文件,一个大文件时,可以使用Map端join,说的简单一些,就是不走reduce,通过Map直接得出结果.

原理:将小文件上传到分布式缓存,保证每个map都可以访问完整的小文件的数据,然后与大文件切分后的数据进行连接,得出最终结果.

  1. package hadoop06.com.doit.demo;
  2. import hadoop03.com.doit.demo02.WordCountMapper;
  3. import hadoop03.com.doit.demo02.WordCountReducer;
  4. import hadoop05.com.doit.demo05.Test;
  5. import org.apache.commons.lang.ObjectUtils;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.mapreduce.Job;
  13. import org.apache.hadoop.mapreduce.Mapper;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import java.io.BufferedReader;
  17. import java.io.FileReader;
  18. import java.io.IOException;
  19. import java.net.URI;
  20. import java.net.URISyntaxException;
  21. import java.util.HashMap;
  22. import java.util.Map;
  23. public class MapJoinDemo {
  24. public static class JoinMapper extends Mapper<LongWritable,Text,Text, NullWritable>{
  25. //定义集合用来存储user.txt的数据 键是uid 值是这一行记录
  26. private Map<String,String> userMap = new HashMap<>();
  27. private Text k2 = new Text();
  28. @Override
  29. protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  30. //读取本地user.txt文件 由于user.txt添加到了分布式缓存中,会将这个文件 缓存到执行maptask的计算机上
  31. //由于这个文件和class文件放在一起 可以直接读取
  32. BufferedReader br = new BufferedReader(new FileReader("user.txt"));
  33. String line = null;
  34. while((line = br.readLine())!=null){
  35. //System.out.println(line);
  36. String uid = line.split(",")[0];
  37. //将uid 和 user的一行记录放入到map中
  38. userMap.put(uid,line);
  39. }
  40. }
  41. @Override
  42. protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
  43. //得到order的一条记录
  44. String line = value.toString();
  45. //获取order的 uid
  46. String uid = line.split("\\s+")[1];// u001
  47. //获取map中 当前uid的 用户信息
  48. String userInfo = userMap.get(uid);
  49. //拼接字符串写出
  50. k2.set(userInfo+","+line.split("\\s+")[0]);
  51. context.write(k2, NullWritable.get());
  52. }
  53. }
  54. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
  55. System.setProperty("HADOOP_USER_NAME", "root");
  56. Configuration con = new Configuration();
  57. //配置到yarn上执行
  58. con.set("mapreduce.framework.name", "yarn");
  59. //配置操作HDFS数据
  60. con.set("fs.defaultFS", "hdfs://linux01:8020");
  61. //配置resourceManager位置
  62. con.set("yarn.resourcemanager.hostname", "linux01");
  63. //配置mr程序运行在windows上的跨平台参数
  64. con.set("mapreduce.app-submission.cross-platform","true");
  65. Job job = Job.getInstance(con,"wordcount");
  66. //分布式缓存user.txt文件
  67. job.addCacheFile(new URI("hdfs://linux01:8020/user.txt"));
  68. //设置jar包的路径
  69. job.setJar("D:\\IdeaProjects\\test_hadoop\\target\\test_hadoop-1.0-SNAPSHOT.jar");
  70. //设置Mapper
  71. job.setMapperClass(JoinMapper.class);
  72. //设置最后结果的输出类型
  73. job.setOutputKeyClass(Text.class);
  74. job.setOutputValueClass(NullWritable.class);
  75. //设置读取HDFS上的文件 的路径
  76. //设置读取文件的位置 可以是文件 也可以是文件夹
  77. FileInputFormat.setInputPaths(job,new Path("/join/order.txt"));
  78. //设置输出文件的位置 指定一个文件夹 文件夹不已存在 会报错
  79. FileOutputFormat.setOutputPath(job,new Path("/join/out"));
  80. //提交任务 并等待任务结束
  81. job.waitForCompletion(true);
  82. }
  83. }

MR程序提交到Yarn上流程

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/593028
推荐阅读
相关标签
  

闽ICP备14008679号