赞
踩
优秀是一种习惯
为什么要设计Shuffle?
Map端的Shuffle阶段如何实现?
Reduce端的Shuffle阶段如何实现?
Combiner的功能是什么?如何实现Combiner?
压缩的好处是什么?常见的压缩类型有哪些?MapReduce如何配置压缩?
Shuffle分组的规则是什么?如何自定义分组比较器?
规则:先调用分组比较器,如果有,直接调用比较方法,如果没有,调用K2的比较方法
定义
继承WritableComparator,重写compare
排序:大于、等于、小于
job.setSortCOmparatorClass
分组:等于、不等于
job.setGroupingComparatorClass
引入:Reduce的个数可以自己指定,Map的个数如何决定?
目标:了解TextInputFormat中的分片规则
路径
实施
InputFormat的功能
TextInputFormat读取数据的实现
TextInputFormat分片的规则
getSplits:用于将输入的所有数据划分为多个分片
规则
step1:判断是否分片的条件
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
文件大小 / splitSize > 1.1
step2:计算splitSize大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
128M 1 Long.MAX_VALUE
minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1)
maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize
step3:computeSplitSize计算逻辑
Math.max(minSize, Math.min(maxSize, blockSize))
max(1,min(Long.MAX_VALUE,128M))
规则
小结
引入:MapReduce用于实现数据统计分析,类似于SQL,如何实现Join过程?
目标:了解MapReduce中Reduce Join的实现
路径
实施
Reduce Join的原理
Reduce Join的实现
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @ClassName MRDriver * @Description TODO 实现ReduceJoin * @Create By Frank */ public class ReduceJoinMr extends Configured implements Tool { /** * 用于将Job的代码封装 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //todo:1-构建一个Job Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置 job.setJarByClass(ReduceJoinMr.class);//指定可以运行的类型 //todo:2-配置这个Job //input // job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat Path inputPath1 = new Path("datas/join/orders.txt");//读取订单数据 Path inputPath2 = new Path("datas/join/product.txt");//读取商品的数据 //设置的路径可以给目录,也可以给定文件,如果给定目录,会将目录中所有文件作为输入,但是目录中不能包含子目录 TextInputFormat.setInputPaths(job,inputPath1,inputPath2);//为当前job设置输入的路径 //map job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法 job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型 job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型 //shuffle // job.setPartitionerClass(HashPartitioner.class);//自定义分区 // job.setGroupingComparatorClass(null);//自定义分组的方式 // job.setSortComparatorClass(null);//自定义排序的方式 //reduce job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法 job.setOutputKeyClass(Text.class);//设置Reduce输出的Key类型 job.setOutputValueClass(Text.class);//设置Reduce输出的Value类型 job.setNumReduceTasks(1);//设置ReduceTask的个数,默认为1 //output:输出目录默认不能提前存在 // job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat Path outputPath = new Path("datas/output/join/reduceJoin");//用程序的第三个参数作为输出 //解决输出目录提前存在,不能运行的问题,提前将目前删掉 //构建一个HDFS的文件系统 FileSystem hdfs = FileSystem.get(this.getConf()); //判断输出目录是否存在,如果存在就删除 if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径 //todo:3-提交运行Job return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置 Configuration conf = new Configuration(); //t通过Toolruner的run方法调用当前类的run方法 int status = ToolRunner.run(conf, new ReduceJoinMr(), args); //退出程序 System.exit(status); } /** * @ClassName MRMapper * @Description TODO 这是MapReduce模板的Map类 * 输入的KV类型:由inputformat决定,默认是TextInputFormat * 输出的KV类型:由map方法中谁作为key,谁作为Value决定 */ public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> { Text outputKey= new Text(); Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //两个文件的每一行数据,就是Value // 要判断这条数据来自哪个文件 FileSplit fileSplit = (FileSplit) context.getInputSplit();//获取这条数据对应的分片 String name = fileSplit.getPath().getName();//获取这条数据对应的文件的名称 if("orders.txt".equals(name)){ //如果这是订单的数据:1001,20150710,p0001,2 String[] split1 = value.toString().split(","); //用商品id作为Key this.outputKey.set(split1[2]); //其他信息作为Value this.outputValue.set(split1[0]+"\t"+split1[1]+"\t"+split1[3]); //输出 context.write(this.outputKey,this.outputValue); }else{ //这是商品数据:p0001,直升机,1000,2000 String[] split2 = value.toString().split(","); //用商品id作为key this.outputKey.set(split2[0]); //用商品名称作为Value this.outputValue.set(split2[1]); //输出 context.write(this.outputKey,this.outputValue); } } } /** * @ClassName MRReducer * @Description TODO MapReduce模板的Reducer的类 * 输入的KV类型:由Map的输出决定,保持一致 * 输出的KV类型:由reduce方法中谁作为key,谁作为Value决定 */ public static class MRReducer extends Reducer<Text,Text,Text,Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //传进来的是每个商品id对应的订单信息和商品名称 StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { //将订单信息和商品的名称进行拼接 stringBuilder.append(value.toString()+"\t"); } //输出 context.write(key,new Text(stringBuilder.toString())); } } }
Reduce Join的特点
小结
引入:Reduce Join必须经过Shuffle,有没有更好的方案?
目标:了解MapReduce中Map Join的实现
路径
实施
在每一个MapTask 的内存中放入一份完整的商品表,前提:商品表的数据比较小
每个MapTask处理一部分订单表,直接在MapTask所在的内存中订单表的每一个小部分与商品表的完整的数据进行join
Map Join的实现
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; /** * @ClassName MRDriver * @Description TODO 这是MapReduce程序的Driver类的模板 * @Date 2020/5/30 10:34 * @Create By Frank */ public class MapJoinMr extends Configured implements Tool { /** * 用于将Job的代码封装 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { //todo:1-构建一个Job Job job = Job.getInstance(this.getConf(),"model");//构建Job对象,调用父类的getconf获取属性的配置 job.setJarByClass(MapJoinMr.class);//指定可以运行的类型 //todo:2-配置这个Job //input // job.setInputFormatClass(TextInputFormat.class);//设置输入的类的类型,默认就是TextInputFormat Path inputPath = new Path("datas/join/orders.txt");//将大的数据进行读取 //将订单作为输入 TextInputFormat.setInputPaths(job,inputPath);//为当前job设置输入的路径 //将商品表放入分布式缓存中 job.addCacheFile(new Path("datas/join/product.txt").toUri()); //map job.setMapperClass(MRMapper.class);//设置Mapper的类,需要调用对应的map方法 job.setMapOutputKeyClass(Text.class);//设置Mapper输出的key类型 job.setMapOutputValueClass(Text.class);//设置Mapper输出的value类型 //shuffle // job.setPartitionerClass(HashPartitioner.class);//自定义分区 // job.setGroupingComparatorClass(null);//自定义分组的方式 // job.setSortComparatorClass(null);//自定义排序的方式 //reduce // job.setReducerClass(MRReducer.class);//设置Reduce的类,需要调用对应的reduce方法 // job.setOutputKeyClass(NullWritable.class);//设置Reduce输出的Key类型 // job.setOutputValueClass(NullWritable.class);//设置Reduce输出的Value类型 job.setNumReduceTasks(0);//设置ReduceTask的个数,默认为1 //output:输出目录默认不能提前存在 // job.setOutputFormatClass(TextOutputFormat.class);//设置输出的类,默认我诶TextOutputFormat Path outputPath = new Path("datas/output/join/mapJoin");//用程序的第三个参数作为输出 //解决输出目录提前存在,不能运行的问题,提前将目前删掉 //构建一个HDFS的文件系统 FileSystem hdfs = FileSystem.get(this.getConf()); //判断输出目录是否存在,如果存在就删除 if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath);//为当前Job设置输出的路径 //todo:3-提交运行Job return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口,调用run方法 * @param args */ public static void main(String[] args) throws Exception { //构建一个Configuration对象,用于管理这个程序所有配置,工作会定义很多自己的配置 Configuration conf = new Configuration(); //t通过Toolruner的run方法调用当前类的run方法 int status = ToolRunner.run(conf, new MapJoinMr(), args); //退出程序 System.exit(status); } /** * @ClassName MRMapper * @Description TODO 这是MapReduce模板的Map类 * 输入的KV类型:由inputformat决定,默认是TextInputFormat * 输出的KV类型:由map方法中谁作为key,谁作为Value决定 */ public static class MRMapper extends Mapper<LongWritable, Text, Text,Text> { //获取分布式缓存中的数据,存入Map集合,Key是商品id,Value是商品名称 Map<String,String> map = new HashMap<String, String>(); /** * Map类中有三个方法 * setup:在调用map之前会调用一次,类似于初始化的方法 * map:实现Map处理逻辑的方法 * cleanup:Map结束以后会调用的方法,相当于close方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { //将缓存中的数据读取出来,封装到Map集合中 URI[] cacheFiles = context.getCacheFiles(); //打开这个缓存的文件 BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath())); //将每一行的内容,封装到Map集合中 String line = null; while(StringUtils.isNotBlank(line = bufferedReader.readLine())){ //分割将商品id和商品名称放入Map集合 String pid = line.split(",")[0]; String pname = line.split(",")[1]; map.put(pid,pname); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //只有订单的数据,取出订单中的商品id String pid = value.toString().split(",")[2]; //在Map集合中找到这个商品id对应的商品名称 String name = map.get(pid); //输出订单信息和商品名称 context.write(value,new Text(name)); } } }
Map Join的特点
小结
引入:YARN如何实现分布式的资源管理的?
目标:掌握YARN的分布式集群架构
实施
架构
角色
小结
引入:一个MapReduce程序提交给YARN,YARN是如何实现运行的?
目标:掌握MapReduce程序在YARN中的运行流程
实施
小结
目标:了解YARN中的资源管理配置
实施
| 配置属性 | 值 | 含义 |
| ---------------------------------------- | ---- | --------------------------------------- |
| yarn.nodemanager.resource.memory-mb | 8192 | 每台NodeManger能够使用的最大物理内存数 |
| yarn.nodemanager.resource.cpu-vcores | 8 | 每台NodeManger能够使用的最大物理CPU核数 |
| yarn.scheduler.minimum-allocation-mb | 1024 | 每个Task容器最少申请的内存 |
| yarn.scheduler.maximum-allocation-mb | 8192 | 每个Task容器最多申请的内存 |
| yarn.scheduler.minimum-allocation-vcores | 1 | 每个Task容器最少申请的CPU核数 |
| yarn.scheduler.maximum-allocation-vcores | 8 | 每个Task容器最多申请的CPU核数 |
| yarn.nodemanager.vmem-pmem-ratio | 2.1 | 虚拟内存率【磁盘空间】 |
常见问题
小结
引入:提交给YARN的程序,按照什么顺序运行,怎么分配资源?
目标:了解YARN中常见的任务的调度机制
路径
实施
FIFO:先进先出调度
应用:Hadoop1.x版本系列
原理
特点
Capacity:容量调度
应用:Apache版本的Hadoop默认的调度器
原理
特点
Fair:公平调度
应用:CDH版本的Hadoop默认的调度器
原理
- 特点 - 多队列 - 队列内部公平的分配资源,允许指定权重和优先级来指定资源的优先分配 - 动态资源抢占、权重分配 - 配置 | 属性名称 | 值 | 含义 | | :----------------------------------------------------------- | :----------------------------------------------------------- | :----------------------------------------------------------- | | yarn.resourcemanager.scheduler.class | org.apache.hadoop.<br/>yarn.server.resourcemanager<br/>.scheduler.fair.FairScheduler | 指定调度器类型 | | yarn.scheduler.fair.allocation.file | etc/hadoop/fair-scheduler.xml | 指定公平调度的配置文件,配置文件中定义每个队列以及每个队列的资源 | | yarn.scheduler.fair.user-as-default-queue | false | 没有指定队列名时,是否使用用户名作为队列名称 | | yarn.scheduler.fair.preemption | false | 是否允许资源抢占 | | yarn.scheduler.fair.preemption.cluster-utilization-threshold | 0.8 | 最大抢占比例 | | yarn.scheduler.fair.sizebasedweight | true | 是否启用权重 | | yarn.scheduler.fair.assignmultiple | true | 是否允许分配多个container | | yarn.scheduler.fair.max.assign | 20 | 如果允许,最多分配多少个 | | yarn.scheduler.fair.locality.threshold.node | -1 | 放弃等待优先本地计算的节点比例,-1表示都不放弃 | | yarn.scheduler.fair.locality.threshold.rack | -1 | 放弃等待优先机架计算的节点比例,-1表示都不放弃 | | yarn.scheduler.fair.allow-undeclared-pools | true | 是否允许创建未定义的队列 | | yarn.scheduler.fair.update-interval-ms | 500 | 表示重新计算公平调度的间隔,单位毫秒 |
<?xml version="1.0"?> <allocations> <queue name="sample_queue"> <minResources>10000 mb,0vcores</minResources> <maxResources>90000 mb,0vcores</maxResources> <maxRunningApps>50</maxRunningApps> <maxAMShare>0.1</maxAMShare> <weight>2.0</weight> <schedulingPolicy>fair</schedulingPolicy> <queue name="sample_sub_queue"> <aclSubmitApps>charlie</aclSubmitApps> <minResources>5000 mb,0vcores</minResources> </queue> </queue> <queueMaxAMShareDefault>0.5</queueMaxAMShareDefault> <!-- Queue 'secondary_group_queue' is a parent queue and may have user queues under it --> <queue name="secondary_group_queue" type="parent"> <weight>3.0</weight> </queue> <user name="sample_user"> <maxRunningApps>30</maxRunningApps> </user> <userMaxAppsDefault>5</userMaxAppsDefault> <queuePlacementPolicy> <rule name="specified" /> <rule name="primaryGroup" create="false" /> <rule name="nestedUserQueue"> <rule name="secondaryGroupExistingQueue" create="false" /> </rule> <rule name="default" queue="sample_queue"/> </queuePlacementPolicy> </allocations>
小结
引入:工作中处理的数据都放在哪里?
目标:了解数据仓库的基本功能和场景
路径
实施
OLTP与OLAP
OLTP:联机事务处理
应用:满足业务上的数据存储
例如
特点
模型:数据库
实现:数据库工具
OLAP:联机分析处理
应用:专门用于解决数据处理的数据存储的平台
例如
特点
存储:数据仓库模型
实现:数据仓库工具
数据仓库的设计
数据仓库的流程
小结
引入:数据仓库中的分布式计算如果用MR来实现,业务分析人员不会写Java怎么办?
目标:掌握Hive的功能与应用场景
路径
实施
Hive的诞生
Hive的功能
MapReduce处理的数据来自于HDFS文件
SQL处理的来自于表
功能一:Hive实现了将HDFS文件映射成表的数据
功能二:Hive实现了将SQL转换为MapReduce/Tez/Spark的程序,提交给YARN执行
功能:实现分布式存储【HDFS】和分布式计算【MapReduce/Tez/Spark】
本质:Hive是一种支持SQL的特殊的Hadoop的客户端
不使用Hive:用户必须会Java或者Python,才能操作Hadoop
有了Hive:用户可以通过SQL直接操作Hadoop
Hive的应用场景
Hive官方:hive.apache.org
小结
目标:掌握Hive的架构组成
实施
Hive客户端
Hive服务端
元数据
Hive写一条SQL语句
select count(*) from table;
问题:SQL语句会变成MapReduce,读的文件是谁呢?
元数据:存储整个Hive中所有核心的数据
小结
目标:实现Hive的安装部署
路径
实施
版本下载
区别
解压安装
以第三台机器为例
将hive安装包和MySQL连接驱动包上传到/export/software目录下
上传
cd /export/software/
rz
解压安装Hive
tar -zxvf apache-hive-2.1.0-bin.tar.gz -C /export/server/
cd /export/server/
mv apache-hive-2.1.0-bin hive-2.1.0-bin
bin:客户端和服务端的管理命令存储目录
lib:依赖包
安装MySQL
为什么要用到MySQL?
将Hive的元数据存储在MySQL中
inux阶段已安装过,不用安装,检查配置即可
检查MySQL登录与授权是否正常
检查登录
mysql -uroot -p
检查授权
select user,host from mysql.user;
如果不一样,没有授权,请执行授权语句
grant all privileges on *.* to 'root'@'%' identified by '123456';
delete from mysql.user where host != '%';
flush privileges;
修改配置
切换到配置文件目录
cd /export/server/hive-2.1.0-bin/conf/
修改hive-env.sh
mv hive-env.sh.template hive-env.sh
#修改48行
HADOOP_HOME=/export/server/hadoop-2.7.5
#修改51行
export HIVE_CONF_DIR=/export/server/hive-2.1.0-bin/conf
将提供的hive-site.xml放入conf目录中
修改hive-site.xml
<property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>123456</value> </property> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://node3:3306/hivemetadata?createDatabaseIfNotExist=true&useSSL=false</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>hive.metastore.schema.verification</name> <value>false</value> </property> <property> <name>datanucleus.schema.autoCreateAll</name> <value>true</value> </property> <property> <name>hive.server2.thrift.bind.host</name> <value>node3</value> </property>
将MySQL连接驱动放入Hive的lib目录中
cp /export/software/mysql-connector-java-5.1.38.jar /export/server/hive-2.1.0-bin/lib/
cd /export/server/hive-2.1.0-bin/
ll lib/
配置Hive的环境变量
vim /etc/profile
#HIVE_HOME
export HIVE_HOME=/export/server/hive-2.1.0-bin
export PATH=:$PATH:$HIVE_HOME/bin
source /etc/profile
启动
启动HDFS和YARN
start-dfs.sh
start-yarn.sh
启动Hive
第一次启动Hive之前需要初始化元数据【只有第一次启动需要,以后每次启动都不需要了,类似于格式化】
创建HDFS相关目录
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -chmod g+w /tmp
hdfs dfs -chmod g+w /user/hive/warehouse
初始化Hive元数据
cd /export/server/hive-2.1.0-bin/
bin/schematool -dbType mysql -initSchema
hive
引入:Hive如何实现WordCount的问题?
目标:实现HiveSQL开发WordCount程序
路径
实施
创建表
create table tb_word(
words string
);
加载HDFS数据
load data inpath '/wordcount.txt' into table tb_word;
SQL分析处理
create table tb_word2 as select explode(split(words," ")) as word from tb_word;
select word,count(*) as numb from tb_word2 group by word order by numb desc;
小结
引入:使用MR实现二手房分析时,需要自定义数据类型解决,使用HiveSQL如何解决?
目标:实现HiveSQL开发二手房统计分析程序
路径
实施
数据
梅园六街坊,2室0厅,47.72,浦东,低区/6层,朝南,500,104777,1992年建
创建表
create table tb_house(
xiaoqu string,
huxing string,
area double,
region string,
floor string,
fangxiang string,
t_price int,
s_price int,
buildinfo string
) row format delimited fields terminated by ',';
加载本地文件数据
load data local inpath '/export/data/secondhouse.csv' into table tb_house;
SQL分析处理
select
region,
count(*) as numb,
round(avg(s_price),2) as avgprice,
max(s_price) as maxprice,
min(s_price) as minprice
from tb_house
group by region;
小结
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。