赞
踩
对大量数据集文件处理的过程 b—k---m—G---T—PB—EB—ZB 数据 换算单位 1024
(1)Volume大量 数据量非常大 TB级别以上的
(2)Variety多样 数据格式多样 结构化数据:数据库中的数据 半结构化数据:json mongdb 非结构化数据:图片,视频 ,音频。
(3)、Velocity快速 数据的处理一定要快
(4)、Value价值 在大量数据中分析出自己有价值的数据
(1)hdfs 大数据文件存储问题
HDFS(Hadoop Distributed File System)Hadoop 分布式文件系统:
基于流数据模式访问
就是可以字节序列化的数据,java.io.Serializable接口
分布式文件系统处理的数据必须是流数据,可以写IO操作的数据
它是以128MB的数据块 存储文件(在Hadoop 1.x版本中是以64MB的数据块存储文件的)
其中每一个存储节点上都有一个DataNode进程,由NameNode来进行协调。
(2)mapreduce 编程模型 计算框架
NameNode: 是整个HDFS集群的总入口,存储着HDFS的集群的文件元数据(如:client上传文件的文件名 副本数 块数等相关信息)。
metadata:元文件 (ip 文件名 block(默认128M) 副本 分配到那个datanode)
1、安装centos7、x虚拟机,并且启动
2、输入hostname 查看当前主机名字
3、使用vim /etc/hostname 修改主机名字
4、添加主机名字并且和ip映射 vim /etc/hosts
格式 ip(当前的ip地址) centos(主机名字)
5、重新centos系统 reboot 或者关闭虚拟机 重新连接
6、必须安装jdk 并且配置环境变量
7、hadoop下面的目录及其内部文件结构
bin ----------可执行的二进制脚本文件 etc/hadoop目录 hadoop系统配置文件所在的目录
hadoop-env.sh -----------配置环境
core-site.xml ----------配售hdfs集群的核心配置
hdfs-site.xml ----------用来对hdfs文件系统做配置的
share -----------用来存放hadoop的依赖jar第三方jar目录
lib ----------用来存放hadoop使用核心库文件
8、配置hadoop的环境变量 /etc/profile
在结尾加上export HADOOP_HOME=/usr/hadoop-2.9.2
export PATH=PATHJAVA_HOME/bin:HADOOP_HOME/bin:HADOOP_HOME/sbin
9、配置core-site.xml
vim /usr/hadoop-2.9.2/etc/hadoop/core-site.xml 加入如下配置:
(就是配置namenode的全局入口====给哪个机器配置namenode 相当于配置地址)
<configuration>
<!--配置hdfs文件系统默认名称-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop(主机名):9000</value>
</property>
</configuration>
10、配置hdfs-site.xml
vim /usr/hadoop-2.9.2/etc/hadoop/hdfs-site.xml 加入如下配置: (配置文件的副本个数
默认是三个)
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
11、配置slaves文件(将namenode和dataname联系起来 启动了namenode namenode就启动了datanode)
vim /usr/hadoop-2.9.2/etc/hadoop/slaves 加入如下配置: hadoop (当前主机名)
12、格式化hdfs
hdfs namenode -format (仅仅是第一次使用需要格式化)
注意:这里的格式化是格式成hadoop可以识别的文件系统,比如我们买了一块硬盘我们需要格式化成windows或者mac,linux系统识别的文件系统,才能使用这个文件系统。
关闭防火墙
systemctl stop firewalld systemctl disable firewalld
1、hadoop对hdfs集群的管理提供了两种脚本
a、hadoop-daemon.sh start namenode/dataname/secondarynamenode 本地启动脚本
作用:对集群中的单个节点进行操作b、start-dfs.sh 集群启动脚本 作用:对进群中所有节点统一操作 命令:start-dfs.sh
可以在任意节点使用,如果在从节点使用,会找到从节点中core-size配置文件中namenode的入口(就是namenode的地址,他在哪个服务器上),然后尝试登录到此namenode服务器上,再通过namenode中的slaves配置文件找到所有的datanode服务器机器名字
通过事先配好的映射找到对应的服务器ip登录他们,从而打开所有的datanode服务器
本地启动脚本,在集群环境中需要去启动每一台服务器,工作量较大。
集群启动脚本,整个集群只需要启动一次。启动过程:随即在一台服务器上输入start-dfs.sh命令,然后会通过core-size.xml配置文件中配置namendoe的地址,去找NameNode的所在的服务器,会登录此服务器启动namenode,然后根据此台服务器中的slaves配置文件(namenode与datanode的关联文件)中的服务器名称,依次映射对应的ip地址找到对应的datanode服务器,登录启动所有datanode服务器。
通过查看日志得知namenode数据datanode数据默认都是存放在/tmp/tmp/hadoop-root/dfs下(这里是临时的文件存储目录),这对于我们来说是不安全的,系统可能会定期的清除目录中的文件,因此为了保证数据的安全完整性,我们需要修改此文件的默认存储位置。
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoop-2.9.2/data</value>
</property>
解决启动hsfs集群时的登录验证密码问题,需要使用ssh协议的第二种方式
第一种方式:基于口令的安全验证 第二种方式:基于密匙的安全验证
存在两个服务器A,B。A访问登录B,A需要生成一组秘钥(公钥和私钥),A将公钥配置在B的信任文件中,在A要登录B的时候,A会带着公钥去找B,B拿到公钥后先判断信任文件中是否含有此公钥,如果没有,会拒绝A的登录,如果有会根据此公钥和随机产生的字符串生成新的资询再传给A,A根据私钥将质询破解,得到字符串再返回给B,如果B此次得到的字符串和随机生成的字符串完全相同,则A成功登录B。
- 生成ssh秘钥对 ssh-keygen -t rsa 然后回车几次就可以啦
- 查看秘钥对生成位置 ls /root/.ssh 会发现在home目录中生成了两个文件
id_rsa(私钥) id_rsa.pub(公钥)- 将公钥加入另一台机器的受信列表中 ssh-copy-id hadoop(主机名) cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys (和上面命令一样)
- 再次查看/root/.ssh 目录 多出两个文件其中authorized_keys就是存放公钥列表文件 authorized_keys id_rsa id_rsa.pub known_hosts
- 检测是否配置成功 ssh hadoop 不需要输入密码即可
查看目录:hdfs dfs -ls / 上传文件:hdfs dfs -put aa.txt / 创建文件(迭代创建):hdfs
dfs -mkdir -p /bbb/cccc 查看文件内容:hdfs dfs -cat /aa.txt 追加文件内容:hdfs
dfs -appendToFile bb.txt /aa.txt hdfs中复制文件: hdfs dfs -cp /aa.txt
/datas 下载文件到本地:hdfs dfs -get /aa.txt /root/down.txt
查找文件名的目录:hdfs dfs -find / -name “aa.txt” 将hdfs文件移动到hdfs另一个位置:hdfs
dfs -mv /bb.txt /datas/bb.txt
<properties> <hadoop.version>2.9.2</hadoop.version> </properties> <dependencies> <!--hadoop公共依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <!--hadoop client 依赖--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <!--junit--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
public class TestHDFS { private FileSystem fileSystem; //hdfs客户端对象 @Before public void before() throws IOException { //hadoop文件系统的权限设置为root 包装伪装用户 System.setProperty("HADOOP_USER_NAME","root"); //或者在hdfs-size.xml中配置 修改windows对hdfs的权限 true是默认不支持权限 3 //用来对core-site.xml hdfs-site.xml进行配置 Configuration conf = new Configuration(); //连接hdfs conf.set("fs.defaultFS","hdfs://10.15.0.4:9000"); //设置上传文件的副本集 conf.set("dfs.replication","1"); fileSystem = FileSystem.get(conf); } @After public void close() throws IOException { fileSystem.close(); } }
@Test
public void testUpload() throws IOException {
//流入流 读取流
FileInputStream is=new FileInputStream("计算机中要上传文件的绝对路径")
Path path = new Path("上传到hdfs上面的哪个文件");
FSDataOutputSytream os = fileSystem.create(path);
//参数1:读取流、参数2 打印流、参数3:字节数、参数4:是否关流
IOUtils.copyBytes(is,"os",1024,true);
}
// 1.第一种方式 @Test public void testDownload() throws IOException { Path source = new Path("hdfs上面要下载文件的路径"); Path path = new Path("要下载到本地的位置"); //参数一:是否删除源文件 参数二:hdfs上下载的文件 参数三:下载到本地文件的位置 参数四:windows兼容 fileSystem.copyTolocalFile(flase,source,path,true); } // 2.第二种方式 @Test public void testDownload1() throws IOException { Path path = new Path("在hdfs上面要下载的文件"); FSDataInputStream in = fileSystem.open(path); FileOutputStream fileOutputStream = new FileOutputStream("要下载到本机的绝对路径"); //参数一:读取流 参数二:打印流 参数三:字节数 参数四:是否关流 IOUtils.copyBytes(in,fileOutputStream,1024,true); }
@Test
public void testListDirs() throws IOException {
Path path = new Path("/");
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
System.out.println(fileStatus.isDirectory()+" "+fileStatus.getPath());
}
}
hadoop配置文件解析顺序 从高到低
- javacode中Confriguration 设置会覆盖core-default.xml hdfs-default.xml中内容
- javaclient jar 中的默认 core-default.xml hdfs-default.xml中内容 (默认dataname有三个)
- hadoop安装包自定义配置 hadoop etc hadoop core-size.xml hdfs-size.xml内容会覆盖hadoop中默认内容
- hadoop中默认的core-default.xml hdfs-default.xml中内容(默认dataname有三个)
由于client对hdfs操作过于频繁,操作的文件都是随机操作,所以为了提供hdfs的集群效率,hdfs集群将 namenode中数据(元数据)始终放在内存中。
特点:不安全,内存断电会立即丢失数据。
FsImage:保存NameNode当前这一时刻数据状态 相当于快照
Editslog:日志文件,只记录client到hdfs写操作(二级制文件)
用来记录用户的写操作。每当元数据有更新或者添加元数据时,修改内存中的元数据并追加到Edits中。
secondaryNameNode:用来定期的完成对FsImage和Edits日志的操作。
规定是一个小时,或者客户端进行1000000次操作时进行合并。
答:可以,但是会使edist日志文件越来越大,效率大大降低。
NameNode在关闭的时候会默认进行edits和fsimage的合并,每次启动NameNdoe,他都会滚动一次
产生新的edits1… 新的客户端操作日志会存储在新产生的edits1中
单节点问题,单节点自动故障转移
a、找另一个namenode备份原有的namenode数据
b、如何解决集群中的脑裂问题(一个集群中多个管理者数据不一致这种情况称之为------脑裂)
QJM使用Zookeeper 完成高可用
提示:考察的是JournalNode中的两种功能: 1、同步数据 2、隔离机制
答案:JournalNode有一下两种功能:
1、负责NameNode中edits的同步数据。
2、JournalNode的隔离机制,因为它是唯一个连接着两个NameNode的组件,所以他会第一个发现。为了保证这一时刻只有一个NameNode(advice)活跃,他会使用ssh登录到NameNode节点上,使用kill命令杀死NameNode
四.hadoop第四天
4.1.1、首先搭建ZK集群,安装zk tar -zxvf zookeeper-3.4.12.tar.gz,同步zk2 zk3 scp -r zookeeper-3.4.12 root@zk2:/root/ ,在每一个节点上创建zk数据目录mkdir /root/zkdata,在每一个节点存在zk数据的目录中必须创建一个myid文件,数字节点唯一 zk1:echo “1” >>zkdata/myid,zk2:echo “2” >>zkdata/myid,zk3:echo “3” >>zkdata/myid,在每一个zk节点数据目录中创建一个名字为zoo.cfg zk配置文件 zk1-2-3:touch zkdata/zoo.cfg
修改不同的节点配置zk1-2-3
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/root/zkdata1
clientPort=300
server.1=主机名:3002:3003
server.2=主机名:4002:4003
server.3=主机名:5002:5003
启动zk节点:如果没有配置zk的环境变量需要进入bin才能启动,./zkServer.sh start /root/zkdata/zoo_sampe.cfg ,查看状态:./zkServer.sh status /root/zkdata/zoo_sampe.cfg
设置主机名和ip映射,配置环境变量,配置ssh免密码登录,在namenode上面生成公私秘钥,将公钥传给datanode的服务器,ssh-keygen -t rsa 生成公私秘钥,ssh-copy-id namenode主机名,在各个服务器上面必须安装依赖 yum install psmisc -y 在每一台服务器上面安装hadoop并且压缩,配置hadoop的环境变量(否则命令只能在bin中执行)source /etc/profile 测试 echo $PATH
修改hadoop中的配置文件:hadoop-env.sh
- <!--hdfs主要入口不再是一个具体机器而是一个虚拟的名称ns 后面配置文件会映射所对应所有namenode --> - <property> <name>fs.defaultFS</name> <value>hdfs://ns</value> </property> - <property> <name>hadoop.tmp.dir</name> <value>/root/hadoop-2.9.2/data</value> </property> - <!--这里的hadoop指的是zk集群对应的每一个服务器名称和对应的端口--> - <property> <name>ha.zookeeper.quorum</name> <value>hadoop1:3001,hadoop1:4001,hadoop1:5001</value> </property> - hdfs-site.xml中的配置文件 - <!--指定hdfs的nameservice为ns,这里和core-size.xml中保持一致--> <property> <name>dfs.nameservices</name> <value>ns</value> </property> - <!-- ns下面有两个NameNode,分别是nn1,nn2 --> <property> <name>dfs.ha.namenodes.ns</name> <value>nn1,nn2</value> </property> - <!-- nn1的RPC通信地址 --> <property> <name>dfs.namenode.rpc-address.ns.nn1</name> <value>hadoop2:9000</value> </property> - <!-- nn1的http通信地址 --> <property> <name>dfs.namenode.http-address.ns.nn1</name> <value>hadoop2:50070</value> </property> - <!-- nn2的RPC通信地址 --> <property> <name>dfs.namenode.rpc-address.ns.nn2</name> <value>hadoop3:9000</value> </property> <!-- nn2的http通信地址 --> <property> <name>dfs.namenode.http-address.ns.nn2</name> <value>hadoop3:50070</value> </property> - <!-- 指定NameNode的元数据在JournalNode上的存放位置 这里的hadoop是journalNode所在服务器--> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://hadoop2:8485;hadoop3:8485;hadoop4:8485/ns</value> </property> - <!-- 指定JournalNode在本地磁盘存放数据的位置 --> <property> <name>dfs.journalnode.edits.dir</name> <value>/root/journal</value> </property> - <!-- 开启NameNode故障时自动切换 --> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> - <!-- 配置失败自动切换实现方式 --> <property> <name>dfs.client.failover.proxy.provider.ns</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> - <!-- 配置隔离机制,如果ssh是默认22端口,value直接写sshfence即可 --> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> <value>shell(true)</value> </property> - <!-- 使用隔离机制时需要ssh免登陆 --> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property>
4.2.1核心:计算 hadoop体系下面的一个编程模型,计算框架 主要是用来操作HDFS中存储数据并对HDFS中数据进行计算。
4.2.2:Map:局部计算 Reduce:汇总计算 (两者都是需要我们写的代码)
1、概念:统一资源调度器,任务监控管理器
2、作用:整合hadoop集群中的资源(cpu,内存)进行统一的调度 哪个服务器需求大 就给谁。
任务监控 监控map和reduce的执行情况
Yarn = ResourceManage (master) + NodeManager(slave)
注意:DataNode服务器一定是NodeManager
ResourceManager作用-----------真正的资源管理者,监控者,决定给每个服务器的资源多少!
NodeManager作用1--------- map和Reduce的执行者 他们是程序员写的计算代码 NodeManager在每一个Datanode中执行每一个map(map计算block内容) 就相当于idea的功能 汇总计算等待所有局部计算完成后 在一个大功能服务器中(存在NodeManage的服务器)将所有计算汇总 同样也是NodeManager执行,
作用2-------NodeManager还会向ResourceManager汇报信息,需要多少内存和cpu等资源(其实是ApplicationMaster汇报的)确定此NodeManager还活着。
3.Job作业
一组MapReduce也被称之为一个job作业,代表一种计算 hadoop的集群
1、集群规划:
10.15.0.15 hadoop15 NameNode DataNode NodeManager
10.15.0.16 hadoop16 DataNode NodeManager ResourceManager
10.15.0.17 hadoop17 DataNode NodeManager
2、
a、修改ip地址 修改主机名称 重启机器
b、配置主机名称和对应的ip映射。
c、配置ssh免密登录。NameNode hadoop15: 生成ssh-keygen ssh-copy-id hadoop15 16 17
ResourceManager hadoop16: 生成ssh-keygen ssh-copy-id hadoop15 16 17
d、配置环境变量
3、安装hadoop 配置文件
hadoop-enc.sh core-site.xml hdfs-size.xml maped-site.xml
yarn-site.xml slaves:作用 决定namenode中的datanode是谁
决定resourceManager的nodemanager是谁
4、hdfs集群相关配置省略:
4.1.复制 cp hadoop-2.9.2/etc/hadoop/mapred-site.xml.template hadoop-2.9.2/etc/hadoop/mapred-site.xml
4.2.编辑 vim hadoop-2.9.2/etc/hadoop/mapred-site.xml添加配置
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
4.3 配置yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--`hadoop`为ResourceManager当前机器的主机名-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>Hadoop</value>
</property>
4.4、启动yarn:只能在有ResourceManager的服务器上面启动: start-yarn.sh
浏览器访问yarn页面: http://10.15.0.16:8088/cluster
5.1.1:job作业的数据来源一定是Hdfs,最总结果保存到hdfs中,在整个五个阶段中 map阶段和reduce阶段是程序员代码手工编码的。
hdfs----->inputFormat------>map------------------>shuffle---------------->reduce------->outputFormat------->hdfs
hdfs----->数据读入 ------->局部计算---->局部计算数据排序和分组---->汇总计算---->结果输出---------------->hdfs
5.1.2:
InputFormat: keyin—>行字母偏移量 Longwritable valuein------->读取的一行 字符串 Text
keyout--->行字母偏移量 Longwritable valueout------->读取的一行 字符串 Text
map : keyin—>行字母偏移量 Longwritable valuein------->读取的一行 字符串 Text
keyout--->每一行得到的数据(或者分割的部分数据)Text valueout---->根据需求设定
shuffle :局部计算进行排序和汇总
reduce :keyin----->根据map传过来的进行判断 value------->根据map传过来的进行判断
keyout--->根据需求设定 valueout---->根据需求设定
5.2 mapreduce程序代码
5.2.1:步骤:
准备数据文件 将数据文件上传到hdfs上面
引入依赖 开发job作业(开发map阶段 开发reduces阶段 开发job阶段)
//word count job作业开发 public class WordCountJob extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new WordCountJob(),args); } @Override public int run(String[] strings) throws Exception { //创建job作业 Configuration conf = getConf(); Job job = Job.getInstance(conf); job.setJarByClass(WordCountJob.class); //设置Input Format job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path("/wordcount/data")); //设置map阶段 job.setMapperClass(WordCountMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置Shuffle 阶段 默认 //设置reduce 阶段 job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置 Output Formate job.setOutputFormatClass(TextOutputFormat.class); //注意:要求结果目录不能存在 FileSystem fileSystem = FileSystem.get(conf); Path res = new Path("/wordcount/res"); if(fileSystem.exists(res)) { fileSystem.delete(res,true); } TextOutputFormat.setOutputPath(job, res); //提交job作业 boolean b = job.waitForCompletion(true); System.out.println("作业执行状态 = " + b); return 0; } //开发Map阶段 public static class WordCountMap extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //value 就是读入的一行数据 String[] keys = value.toString().split(" "); for (String word : keys) { context.write(new Text(word),new IntWritable(1)); } } } //开发Reduce阶段 public static class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum+=value.get(); } context.write(key,new IntWritable(sum)); } } } 注意:在执行mapreduce作业过程中,一定涉及到数据数据的序列化,hadoop对原始基本数据类型进行了二次包装 hadoop中包装类型 java原始数据类型 Text String LongWritable Long IntWritable Integer FloatWritable Float DoubleWritable Double
5.3MapReduce 自动化运行
5.3.1打包时指定main Class信息
5.3.2使用wagon插件实现自动上传至hadoop集群
5.3.3使用wagon上传jar完成后远程执行job作业
<build> <!--扩展maven的插件中加入ssh插件--> <extensions> <extension> <groupId>org.apache.maven.wagon</groupId> <artifactId>wagon-ssh</artifactId> <version>2.8</version> </extension> </extensions> <plugins> <!-- 在打包插件中指定main class 信息 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <outputDirectory>${basedir}/target</outputDirectory> <archive> <manifest> <mainClass>com.baizhi.wordcount.WordCountJob</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>wagon-maven-plugin</artifactId> <version>1.0</version> <configuration> <fromFile>target/${project.build.finalName}.jar</fromFile> <url>scp://root:1@10.15.0.5/root</url> <commands> <!-- 通过sh 执行shell脚本文件 --> <command>nohup hadoop-2.9.2/bin/hadoop jar hadoop_wordcount-1.0-SNAPSHOT.jar > /root/mapreduce.out 2>&1 & </command> </commands> <displayCommandOutputs>true</displayCommandOutputs> </configuration> </plugin> </plugins> </build>
导入插件maven helper
wagon配置加入commands命令执行命令操作:clean package wagon:upload-single wagon:sshexec
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop5:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop5:19888</value>
</property>
5.4.2 配置yarn-site.xml 并同步集群配置`
<!--开启日志聚合-->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!--日志保存时间 单位秒 这里是7天-->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
[root@hadoop6 ~]# mr-jobhistory-daemon.sh start historyserver
[root@hadoop6 ~]# mr-jobhistory-daemon.sh stop historyserver
private Logger logger= Logger.getLogger(想打日志的类对象);
logger.info(想打印的信息)
数据类型都间接实现了:Wirtable(实现序列化反序列化功能,实现他的write写的序列化,readFields读的序列化)Comparable(实现排序功能,实现它的compareTo比较方法完成排序功能) 。直接实现WritableComparable接口(此接口继承Writable,Comparable),因此我们自定义类型也需要实现相应的接口
通过查看源码得知自定义的数据类型需要实现类中 wirte、readFiles、compareTo、hashCode 和equals、toString等相关方法。如果存在地址不同的自定义对象地址相同,则需要继承hashCode 和equals方法。
//自定义Writable类型 public class AccessLogWritable implements WritableComparable<AccessLogWritable> { private Integer upload; private Integer down; private Integer total; @Override public int compareTo(AccessLogWritable o) { return this.total-o.getTotal(); } //把对象序列化 @Override public void write(DataOutput out) throws IOException { out.writeInt(upload); out.writeInt(down); out.writeInt(total); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.upload = in.readInt(); this.down = in.readInt(); this.total = in.readInt(); } @Override public String toString() { return "统计结果{" + "上传流量=" + upload + ", 下载流量=" + down + ", 上传下载总流量=" + total + '}'; } 注意:write的顺序和read的顺序必须严格一致,读的类型和写的类型也必须完全一致
6.2.1、mapreduce 计算、去重、排序、清洗工作
数据清洗:在复杂的数据中抽取我们业务关注的数据的过程称之为数据的清洗。
数据清洗的好处:减小数据的文件体积、加速数据的计算效率。
数据清洗没有reduce阶段 这个设置是job.setNumReduceTasks(0)
6.2.2、reduce个数设置!
默认job作业中只有一个reduce,可以设置多个reduce,通过job.setNumReduceTasks(n)设置有n个。
设置多个reduce提高了mapreduce的运行效率,一旦设置了多个reduce,结果会根据reduce数量放入不同的产生的文件中,为了保障每个reduce处理的数据均衡 mr引入了分区的概念(partition)
默认数量为: 1个 Reduce
可以通过: job.setNumReduceTasks(0); 0 就是没有 数字是几就是几个
//自定义分区 输入数据map端结果 public class ProvincePartitioner extends Partitioner<Text,AccessLogWritable> { //根据业务规则将不同省份结果划分到不同分区 private static HashMap<String,Integer> provincePartitioners = new HashMap<>(); static{ provincePartitioners.put("136",0); provincePartitioners.put("137",1); provincePartitioners.put("138",2); provincePartitioners.put("139",3); } // 返回分区号给那个reduce @Override public int getPartition(Text key, AccessLogWritable accessLogWritable, int numPartitions) { String keyPrefix = key.toString().substring(0, 3); Integer partionId = provincePartitioners.get(keyPrefix); return partionId ==null?4: partionId; } } //设置分区 job.setPartitionerClass(ProvincePartitioner.class); //设置reduce数量 job.setNumReduceTasks(5);
Combiner合并:又称之为map端的reduce,主要是通过对map局部的数据先进行一次reduce,从而来减少map端输出数据频繁发送给Reduce处理时所带来的网络压力问题。通过这种提前对map输出做一次局部reduce,这样既可以减轻网络压力,又能提高效率。在mapreduce编程模型中默认是关闭的。
开启Combiner
//shuffle 无须设置 自动处理
//设置Combiner
job.setCombinerClass(AccessLogCustomerTypeReduce.class);
//设置分区
job.setPartitionerClass(ProvincePartitioner.class);
...............
#1.计算切片 有几个切片就有几个map task #2.环形缓存区 经过map函数的逻辑处理后的数据输出之后,会通过OutPutCollector收集器将数据收集到环形缓存区保存。 环形缓存区的大小默认为100M,当保存的数据达到80%时,就将缓存区的数据溢出到磁盘上保存。 #3.溢出 环形缓存区的数据达到其容量的80%时就会溢出到磁盘上进行保存,在此过程中,程序会对数据进行分区(默认HashPartition)和排序(默认根据key进行快排) 缓存区不断溢出的数据形成多个小文件 #4.合并 溢出的多个小文件各个区合并在一起(0区和0区合并成一个0区),形成大文件 通过归并排序保证区内的数据有序 #5.shuffle 从过程2到过程7之间,即map任务和reduce任务之间的数据流称为shuffle(混洗),而过程5最能体现出混洗这一概念。一般情况下,一个reduce任务的输入数据来自与多个map任务,多个reduce任务的情况下就会出现如过程5所示的, 每个reduce任务从map的输出数据中获取属于自己的那个分区的数据。 #6.合并 运行reducetask的节点通过过程5,将来自多个map任务的属于自己的分区数据下载到本地磁盘工作目录。这多个分区文件通过归并排序合并成大文件,并根据key值分好组(key值相同的,value值会以迭代器的形式组在一起)。 #7.reducetask reducetask从本地工作目录获取已经分好组并且排好序的数据,将数据进行reduce函数中的逻辑处理。 #8.输出 每个reducetask输出一个结果文件。
1.向ResourceManager请求运行一个mapreduce程序。
2.ResourceManager返回hdfs地址,告诉客户端将作业运行相关的资源文件上传到hdfs。
3.客户端提交mr程序运行所需的文件(包括作业的jar包,作业的配置文件,分片信息等)到hdfs上。
4.作业相关信息提交完成后,客户端用过调用ResourcrManager的submitApplication()方法提交作业。
5.ResourceManager将作业传递给调度器,调度器的默认调度策略是先进先出。
6.调度器寻找一台空闲的节点,并在该节点隔离出一个容器(container),容器中分配了cpu,内存等资源,并启动MRAppmaster进程。
7.MRAppmaster根据需要运行多少个map任务,多少个reduce任务向ResourceManager请求资源。
8.ResourceManager分配相应数量的容器,并告知MRAppmaster容器在哪。
9.MRAppmaster启动maptask。
10.maptask从HDFS获取分片数据执行map逻辑。
11.map逻辑执行结束后,MRAppmaster启动reducetask。
12.reducetask从maptask获取属于自己的分区数据执行reduce逻辑。
13.reduce逻辑结束后将结果数据保存到HDFS上。
14.mapreduce作业结束后,MRAppmaster通知ResourceManager结束自己,让ResourceManager回收所有资源。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。