赞
踩
YARN被设计用以解决以往架构的需求和缺陷的资源管理和调度软件。
Apache Hadoop YARN (Yet Another Resource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统和调度平台,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。
资源管理系统:集群的硬件资源,和程序运行相关,比如内存,CPU等
调度平台:多个程序同时申请计算资源如何分配,调度的规则(算法)。
通用∶不仅仅支持MapReduce程序,理论上支持各种计算程序。YARN不关心你干什么,只关心你要资源,在有的情况下给你,用完之后还我。
可以把Hadoop YARN理解为相当于一个分布式的操作系统平台,而MapReduce等计算程序则相当于运行于操作系统之上的应用程序,YARN为这些程序提供运算所需的资源(内存、CPU等)。
主从架构
也是采用 master(Resource Manager)- slave (Node Manager)架构,Resource Manager 整个集群只有一个,一个可靠的节点。
1、 每个节点上可以负责该节点上的资源管理以及任务调度,Node Manager 会定时向Resource Manager汇报本节点上 的资源使用情况和任务运行状态,
2、 Resource Manager会通过心跳应答的机制向Node Manager下达命令或者分发新的任务,
3、 Yarn 将某一资源分配给该应用程序后,应用程序会启动一个Application Master,
4、 Application Master为应用程序负责向Resource Manager申请资源,申请资源之后,再和申请到的节点进行通信,运行内部任务。
Resource Manager
RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。
Schedule 资源调度器是一个可插拔的组件,用户可根据自己需要设计资新的源调度器,YARN提供多个可直接使用的资源调度器。资源调度器将系统中的资源分配给正在运行的程序,不负责监控或跟踪应用的执行状态,不负责重启失败的任务。
Applications Manager 应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。
Node Manager
NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。
ApplicationMaster
用户提交的每个应用程序均包含一个AM,主要功能包括:
1.与RM调度器协商以获取资源(用Container表示);
2.将得到的任务进一步分配给内部的任务
3.与NM通信以启动/停止任务;
4.监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
注:RM只负责监控AM,在AM运行失败时候启动它,RM并不负责AM内部任务的容错,这由AM来完成。
Container
Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。yarn的container容器是yarn虚拟出来的一个东西,属于虚拟化的,它是由memory+vcore组成,是专门用来运行任务的
etc/hadoop/目录下 yarn-site.xml文件
- cd /opt/apps/hadoop-3.1.1/etc/hadoop/
- vi yarn-site.xml
-
- <!-- resource,manager主节点所在机器 -->
- <property>
- <name>yarn.resourcemanager.hostname</name>
- <value>linux01</value>
- </property>
-
- <!-- 为mr程序提供shuffle服务 -->
- <property>
- <name>yarn.nodemanager.aux-services</name>
- <value>mapreduce_shuffle</value>
- </property>
-
- <!-- 一台NodeManager的总可用内存资源 -->
- <property>
- <name>yarn.nodemanager.resource.memory-mb</name>
- <value>4096</value>
- </property>
- <!-- 一台NodeManager的总可用(逻辑)cpu核数 -->
- <property>
- <name>yarn.nodemanager.resource.cpu-vcores</name>
- <value>4</value>
- </property>
-
- <!-- 是否检查容器的虚拟内存使用超标情况
- vmem为true 指的是默认检查虚拟内存,容器使用的虚拟内存不能超过我们设置的虚拟内存大小
- -->
- <property>
- <name>yarn.nodemanager.vmem-check-enabled</name>
- <value>false</value>
- </property>
-
- <!-- 容器的虚拟内存使用上限:与物理内存的比率 -->
- <property>
- <name>yarn.nodemanager.vmem-pmem-ratio</name>
- <value>2.1</value>
- </property>
-
-
- 将 yarn-site.xml 同步给其他Linux
- scp yarn-site.xml linux02:$PWD
- scp yarn-site.xml linux03:$PWD
配置一键启停
- cd /opt/apps/hadoop-3.1.1/sbin
- vi start-yarn.sh
- vi stop-yarn.sh
-
- YARN_RESOURCEMANAGER_USER=root
- HADOOP_SECURE_DN_USER=yarn
- YARN_NODEMANAGER_USER=root
-
- start-yarn.sh 一键启动
-
- 启动后可以访问 http://linux01:8088 查看页面
-
-
- 解决linux连接不上 可能网卡出现问题
- systemctl stop NetworkManager
- systemctl diable NetworkManager
- systemctl restart network
-
配置mapred-site.xml文件 添加到resources目录下
- <?xml version="1.0"?>
- <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
- <configuration>
- <property>
- <name>yarn.app.mapreduce.am.env</name>
- <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
- </property>
- <property>
- <name>mapreduce.map.env</name>
- <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
- </property>
- <property>
- <name>mapreduce.reduce.env</name>
- <value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.1</value>
- </property>
-
- </configuration>
-
- day05.com.doit.demo06;
修改提交任务的代码 maven打jar包的命令为 package
- public class Test02 {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- System.setProperty("HADOOP_USER_NAME", "root");
- Configuration conf = new Configuration();
- //操作HDFS数据
- conf.set("fs.defaultFS", "hdfs://linux01:8020");
- //设置运行模式
- conf.set("mapreduce.framework.name", "yarn");
- //设置ResourceManager位置
- conf.set("yarn.resourcemanager.hostname", "linux01");
- // 设置MapReduce程序运行在windows上的跨平台参数
- conf.set("mapreduce.app-submission.cross-platform","true");
-
- Job job = Job.getInstance(conf, "WordCount");
-
- //设置jar包路径
- job.setJar("D:\\IdeaProjects\\hadoop\\target\\test_yarn.jar");
-
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReduce.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- //设置路径为HDFS路径
- FileInputFormat.setInputPaths(job,new Path("/wc/input/word.txt"));
- FileOutputFormat.setOutputPath(job,new Path("/wc/out4"));
-
- job.waitForCompletion(true);
- }
- }
- public class Test02 {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
-
- //设置运行模式
- conf.set("mapreduce.framework.name", "yarn");
- //设置ResourceManager位置
- conf.set("yarn.resourcemanager.hostname", "linux01");
- // 设置MapReduce程序运行在windows上的跨平台参数
- conf.set("mapreduce.app-submission.cross-platform","true");
-
- Job job = Job.getInstance(conf, "WordCount");
-
- //设置jar包路径
- //job.setJar("D:\\IdeaProjects\\hadoop\\target\\test_yarn.jar");
-
- job.setJarByClass(Test02.class);
-
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReduce.class);
-
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- //设置路径为HDFS路径
- FileInputFormat.setInputPaths(job,new Path("/wc/input/word.txt"));
- FileOutputFormat.setOutputPath(job,new Path("/wc/out5"));
-
- job.waitForCompletion(true);
- }
- }
- 打成jar包后
- linux上使用
- 需要查看 mapred-site.xml 如果没有配置 需要配置一下
-
- hadoop jar jar包名 运行的类
- hadoop jar test_yarn.jar day03.com.doit.demo02.Test02
Map端join是指数据达到map处理函数之前进行合并的,效率要远远高于Reduce端join,因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
- order.txt
- order011 u001
- order012 u001
- order033 u005
- order034 u002
- order055 u003
- order066 u004
- order077 u010
-
- user.txt
- u001,hangge,18,male,angelababy
- u002,huihui,58,female,ruhua
- u003,guanyu,16,male,chunge
- u004,laoduan,38,male,angelababy
- u005,nana,24,femal,huangbo
- u006,xingge,18,male,laoduan
-
-
- 最终结果
- u001,hangge,18,male,angelababy,order012
- u001,hangge,18,male,angelababy,order011
- u002,huihui,58,female,ruhua,order034
- u003,guanyu,16,male,chunge,order055
- u004,laoduan,38,male,angelababy,order066
- u005,nana,24,femal,huangbo,order033
- null,order077
一个用户可能会产生多个订单,可能user.txt中的用户非常少,但是订单数据又非常非常多,这时我们可以考虑使用Map端join.一个小文件,一个大文件时,可以使用Map端join,说的简单一些,就是不走reduce,通过Map直接得出结果.
原理:将小文件上传到分布式缓存,保证每个map都可以访问完整的小文件的数据,然后与大文件切分后的数据进行连接,得出最终结果.
- package hadoop06.com.doit.demo;
-
- import hadoop03.com.doit.demo02.WordCountMapper;
- import hadoop03.com.doit.demo02.WordCountReducer;
- import hadoop05.com.doit.demo05.Test;
- import org.apache.commons.lang.ObjectUtils;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.util.HashMap;
- import java.util.Map;
-
- public class MapJoinDemo {
-
- public static class JoinMapper extends Mapper<LongWritable,Text,Text, NullWritable>{
-
- //定义集合用来存储user.txt的数据 键是uid 值是这一行记录
- private Map<String,String> userMap = new HashMap<>();
-
- private Text k2 = new Text();
- @Override
- protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
- //读取本地user.txt文件 由于user.txt添加到了分布式缓存中,会将这个文件 缓存到执行maptask的计算机上
- //由于这个文件和class文件放在一起 可以直接读取
- BufferedReader br = new BufferedReader(new FileReader("user.txt"));
- String line = null;
- while((line = br.readLine())!=null){
- //System.out.println(line);
- String uid = line.split(",")[0];
- //将uid 和 user的一行记录放入到map中
- userMap.put(uid,line);
- }
- }
-
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
- //得到order的一条记录
- String line = value.toString();
- //获取order的 uid
- String uid = line.split("\\s+")[1];// u001
- //获取map中 当前uid的 用户信息
- String userInfo = userMap.get(uid);
-
- //拼接字符串写出
- k2.set(userInfo+","+line.split("\\s+")[0]);
-
- context.write(k2, NullWritable.get());
- }
- }
-
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
-
- System.setProperty("HADOOP_USER_NAME", "root");
- Configuration con = new Configuration();
-
- //配置到yarn上执行
- con.set("mapreduce.framework.name", "yarn");
- //配置操作HDFS数据
- con.set("fs.defaultFS", "hdfs://linux01:8020");
- //配置resourceManager位置
- con.set("yarn.resourcemanager.hostname", "linux01");
- //配置mr程序运行在windows上的跨平台参数
- con.set("mapreduce.app-submission.cross-platform","true");
-
- Job job = Job.getInstance(con,"wordcount");
-
- //分布式缓存user.txt文件
- job.addCacheFile(new URI("hdfs://linux01:8020/user.txt"));
-
- //设置jar包的路径
- job.setJar("D:\\IdeaProjects\\test_hadoop\\target\\test_hadoop-1.0-SNAPSHOT.jar");
-
- //设置Mapper
- job.setMapperClass(JoinMapper.class);
-
- //设置最后结果的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(NullWritable.class);
-
- //设置读取HDFS上的文件 的路径
- //设置读取文件的位置 可以是文件 也可以是文件夹
- FileInputFormat.setInputPaths(job,new Path("/join/order.txt"));
- //设置输出文件的位置 指定一个文件夹 文件夹不已存在 会报错
- FileOutputFormat.setOutputPath(job,new Path("/join/out"));
-
- //提交任务 并等待任务结束
- job.waitForCompletion(true);
-
- }
-
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。