赞
踩
MR是一个分布式运算框架,主要分为Map和reduce两个阶段,map负责把一个任务分解成多个任务,reduce负责把分解后多个任务的处理结果汇总
优缺点
① 易编程,简单的实现一些接口,就可以完成一个分布式程序,可以分布到大量机器上运行.
② 易扩展, 当计算力不够的时候,可以通过简单的增加机器来扩展计算能力
③ 高容错, 当一个机器挂了,可以把上面的计算任务转移到另一个节点上,不至于任务失败
④ 大数据量, 可以实现上千台服务器集群并发工作,提供数据处理能力
缺点;
① 不适合实时
② 不适合流式处理,流式处理数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化
③ 不适合DAG(有向图)计算, 多个程序存在依赖关系,后一个应用程序的输入为前一个的输出,在这种情况下,MapReduce的作业输出都会写入到磁盘,会造成大量的磁盘IO,导致性能低下
MapReduce处理思想
① 第一个阶段的MapTask并发实例,完全并行运行,互不干涉
② 第二个阶段的ReduceTask并发实例互不相关,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
③ MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑复杂,那只能多个MapReduce程序,串行运行
MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程
① MrAppMaster: 负责整个程序的过程调度及状态协调.
② Map Task: 负责Map阶段的整个数据处理流程
③ ReduceTask: 负责Reduce阶段的整个数据处理流程
常用数据序列化类型
Java类型 Hadoop Writable类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable
MapReduce 编程
编写MapReduce程序分为三个部分: Mapper Reducer Driver
①Mapper
用户自定义的Mapper要继承自己的父类,Mapper的输入数据是KV对的形式,KV的类型可以自定义,Mapper中的业务逻辑写在map()方法中,Mapper的输出数据是KV对的形式(可自定义KV类型),map()方法对每个<K,V>
② Reducer阶段
用户自定义的reducer要继承自己的父类,Reducer的输入数据类型对应Mapper的输出数据类型,也是KV,Reducer的业务逻辑写在reduce()方法中,ReduceTask进程对每一组形同K的<K,V>组调用一次reduce()方法
③ Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象
序列化
序列化就是吧内存中的对象转换成字节序列(或者其他数据传输协议)以便于持久化到磁盘和网络传输.反序列化就是将收到的字节序列(或者其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象.
Java的序列化是一个重量级的序列化框架(Serializable), 一个对象序列化后,会附带很多额外的信息(各种校验信息,集成体系等),不便于在网络中高效传输,所以,Hadoop自己开发了一套序列化机制(Writable).
优点: ① 紧凑,高效的使用存储空间
② 快速,读写数据的额外开销小
③ 可扩展,可以随着通信协议的升级而升级
④ 互操作,支持多语言的交互
自定义bean对象实现序列化接口(Writable)
① 实现Writable
② 反序列化时,需要反射调用空参构造函数,所以必须有空参构造器
③ 重写序列化方法: write()
④ 重写反序列化方法: readFields()
⑤ 注意反序列化的顺序和序列化的顺序完全一致
⑥ 要想输出结果到文件中,需要重写toString()方法,使用分隔符将字段隔开
⑦ 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce中的shuffle过程要求对key必须能排序.
MapReduce原理
9.1 InputFormat 数据输入
① 切片和MapTask并行度
数据块: Block是HDFS物理上把数据分成一块一块的
数据切片: 数据切片只是在逻辑上对输入进行切片,并不会在磁盘上将其切分成片进行存储
一个job的Map阶段并行度由客户端在提交job时的切片数决定,每个split切片分配一个Maptask并行实例处理,默认情况下,切片大小=BlockSize,切片时不考虑数据集的整体,而是逐个针对每个文件单独切片
② job提交流程关键点
waitForCompletion()–>job.submit()
建立连接: connect() --> 创建代理: new Cluster(getConfiguration())–>判断模式: initialize(jobTrackAddr,conf)
submitter.submitJobInternal(Job.this,cluster)
创建Stag路径: Path jobStagingArea=JobSubmissionFiles.getStagingDir(cluster,conf)
创建job路径: JobID jobId = submitClient.getNewJobID()
拷贝jar包到集群: copyAndConfigureFiles(job,submitJobDir);rUploader.uploadFiles(job,jobSubmitDir);
计算切片: writeSplits(job,submitJobDir);maps=writeNewSplits(job,jobSubmitDir);input.getSplits(job)
向Stag路径写XML配置文件: writeConf(conf,submitJobFile); conf.writeXml(out);
提交job,返回提交状态: status = submitClient.submitJob(jobId,submitJobDir.toString(),job.getCredentials());
③ input.getSplit(job) 切片关键源码
程序先找到数据存储目录,开始遍历目录下的每个文件,规划切片
先获取文件大小: fs.sizeOf(xxx.txt)
计算切片大小: computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))
默认情况下,切片大小=blocksize
每次切片都判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片
将切片信息写到一个切片规划文件里,inputSplit 只记录了切片的元数据信息,如起始位置,长度,所在节点列表等
提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数
④ 切片参数
切片计算公式: Math.max(minSize, Math.min(maxSize,blockSzie))
mapreduce.input.fileinputformat.split.minsize=1 (默认值1)
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue (默认值)
所以,默认情况下,切片大小=blockSize
maxsize(切片最大值) : 参数如果调的比blockSize小,则会让切片变小,而且等于设置的maxsize值
minsize(切片最小值): 参数如果调的比blockSize大,则会让切片变大,就是设置的minsize值
获取切片信息: String name= inputSplit.getPath().getName();
FileSplit inputSplit = (FileSplit) context.getInputS
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。