1、海量日志数据,提取出某日访问百度次数最多的那个IP。
解决方案:首先是将这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有个2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。
2、搜索引擎会通过日志文件把用户每次检索使用的所有检索串都记录下来,每个查询串的长度为1-255字节。
假设目前有一千万个记录(这些查询串的重复度比较高,虽然总数是1千万,但如果除去重复后,不超过3百万个。一个查询串的重复度越高,说明查询它的用户越多,也就是越热门。),请你统计最热门的10个查询串,要求使用的内存不能超过1G。
解决方案:第一步、先对这批海量数据预处理,在O(N)的时间内用Hash表完成排序;然后,第二步、借助堆这个数据结构,找出Top K,时间复杂度为N‘logK。 即,借助堆结构,我们可以在log量级的时间内查找和调整/移动。因此,维护一个K(该题目中是10)大小的小根堆,然后遍历300万的Query,分别和根元素进行对比所以,我们最终的时间复杂度是:O(N) + N'*O(logK),(N为1000万,N’为300万)。
或者:采用trie树,关键字域存该查询串出现的次数,没有出现为0。最后用10个元素的最小推来对出现频率进行排序。
3、有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。
解决方案:顺序读文件中,对于每个词x,取hash(x)%5000,然后按照该值存到5000个小文件(记为x0,x1,...x4999)中。这样每个文件大概是200k左右。
如果其中的有的文件超过了1M大小,还可以按照类似的方法继续往下分,直到分解得到的小文件的大小都不超过1M。 对每个小文件,统计每个文件中出现的词以及相应的频率(可以采用trie树/hash_map等),并取出出现频率最大的100个词(可以用含100个结点的最小堆),并把100个词及相应的频率存入文件,这样又得到了5000个文件。下一步就是把这5000个文件进行归并(类似与归并排序)的过程了。
4、有10个文件,每个文件1G,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。要求你按照query的频度排序。
解决方案: 方案1: 顺序读取10个文件,按照hash(query)%10的结果将query写入到另外10个文件(记为)中。这样新生成的文件每个的大小大约也1G(假设hash函数是随机的)。 找一台内存在2G左右的机器,依次对用hash_map(query, query_count)来统计每个query出现的次数。利用快速/堆/归并排序按照出现次数进行排序。将排序好的query和对应的query_cout输出到文件中。这样得到了10个排好序的文件(记为)。
对这10个文件进行归并排序(内排序与外排序相结合)。
方案2: 一般query的总量是有限的,只是重复的次数比较多而已,可能对于所有的query,一次性就可以加入到内存了。这样,我们就可以采用trie树/hash_map等直接来统计每个query出现的次数,然后按出现次数做快速/堆/归并排序就可以了。
方案3: 与方案1类似,但在做完hash,分成多个文件后,可以交给多个文件来处理,采用分布式的架构来处理(比如MapReduce),最后再进行合并。
5、 给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
解决方案:方案1:可以估计每个文件安的大小为5G×64=320G,远远大于内存限制的4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。
通读文件a,对每个url求取hash(url)%1000,然后根据所取得的值将url分别存储到1000个小文件(记为a0,a1,...,a999)中。这样每个小文件的大约为300M。
通读文件b,采取和a相同的方式将url分别存储到1000小文件(记为b0,b1,...,b999)。这样处理后,所有可能相同的url都在对应的小文件(a0vsb0,a1vsb1,...,a999vsb999)中,不对应的小文件不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。
求每对小文件中相同的url时,可以把其中一个小文件的url存储到hash_set中。然后遍历另一个小文件的每个url,看其是否在刚才构建的hash_set中,如果是,那么就是共同的url,存到文件里面就可以了。
方案2:如果允许有一定的错误率,可以使用Bloom filter,4G内存大概可以表示340亿bit。将其中一个文件中的url使用Bloom filter映射为这340亿bit,然后挨个读取另外一个文件的url,检查是否与Bloom filter,如果是,那么该url应该是共同的url(注意会有一定的错误率)。
6、在2.5亿个整数中找出不重复的整数,注,内存不足以容纳这2.5亿个整数。
解决方案:方案1:采用2-Bitmap(每个数分配2bit,00表示不存在,01表示出现一次,10表示多次,11无意义)进行,共需内存内存,还可以接受。然后扫描这2.5亿个整数,查看Bitmap中相对应位,如果是00变01,01变10,10保持不变。所描完事后,查看bitmap,把对应位是01的整数输出即可。
方案2:也可采用与第1题类似的方法,进行划分小文件的方法。然后在小文件中找出不重复的整数,并排序。然后再进行归并,注意去除重复的元素。
7、腾讯面试题:给40亿个不重复的unsigned int的整数,没排过序的,然后再给一个数,如何快速判断这个数是否在那40亿个数当中?
解决方案:申请512M的内存,一个bit位代表一个unsigned int值。读入40亿个数,设置相应的bit位,读入要查询的数,查看相应bit位是否为1,为1表示存在,为0表示不存在。
dizengrong: 方案2:因为2^32为40亿多,所以给定一个数可能在,也可能不在其中;这里我们把40亿个数中的每一个用32位的二进制来表示假设这40亿个数开始放在一个文件中。
然后将这40亿个数分成两类: 1.最高位为0 2.最高位为1 并将这两类分别写入到两个文件中,其中一个文件中数的个数<=20亿,而另一个>=20亿(这相当于折半了);与要查找的数的最高位比较并接着进入相应的文件再查找
再然后把这个文件为又分成两类: 1.次最高位为0 2.次最高位为1
并将这两类分别写入到两个文件中,其中一个文件中数的个数<=10亿,而另一个>=10亿(这相当于折半了); 与要查找的数的次最高位比较并接着进入相应的文件再查找。 ....... 以此类推,就可以找到了,而且时间复杂度为O(logn),方案2完。
附:这里,再简单介绍下,位图方法: 使用位图法判断整形数组是否存在重复 判断集合中存在重复是常见编程任务之一,当集合中数据量比较大时我们通常希望少进行几次扫描,这时双重循环法就不可取了。
位图法比较适合于这种情况,它的做法是按照集合中最大元素max创建一个长度为max+1的新数组,然后再次扫描原数组,遇到几就给新数组的第几位置上1,如遇到5就给新数组的第六个元素置1,这样下次再遇到5想置位时发现新数组的第六个元素已经是1了,这说明这次的数据肯定和以前的数据存在着重复。这种给新数组初始化时置零其后置一的做法类似于位图的处理方法故称位图法。它的运算次数最坏的情况为2N。如果已知数组的最大值即能事先给新数组定长的话效率还能提高一倍。
8、怎么在海量数据中找出重复次数最多的一个?
解决方案:先做hash,然后求模映射为小文件,求出每个小文件中重复次数最多的一个,并记录重复次数。然后找出上一步求出的数据中重复次数最多的一个就是所求(具体参考前面的题)。
9、上千万或上亿数据(有重复),统计其中出现次数最多的钱N个数据。
解决方案:上千万或上亿的数据,现在的机器的内存应该能存下。所以考虑采用hash_map/搜索二叉树/红黑树等来进行统计次数。然后就是取出前N个出现次数最多的数据了,可以用第2题提到的堆机制完成。
10、一个文本文件,大约有一万行,每行一个词,要求统计出其中最频繁出现的前10个词,请给出思想,给出时间复杂度分析。
解决方案:方案1:这题是考虑时间效率。用trie树统计每个词出现的次数,时间复杂度是O(n*le)(le表示单词的平准长度)。然后是找出出现最频繁的前10个词,可以用堆来实现,前面的题中已经讲到了,时间复杂度是O(n*lg10)。所以总的时间复杂度,是O(n*le)与O(n*lg10)中较大的哪一个。
附、100w个数中找出最大的100个数。
方案1:在前面的题中,我们已经提到了,用一个含100个元素的最小堆完成。复杂度为O(100w*lg100)。
方案2:采用快速排序的思想,每次分割之后只考虑比轴大的一部分,知道比轴大的一部分在比100多的时候,采用传统排序算法排序,取前100个。复杂度为O(100w*100)。
方案3:采用局部淘汰法。选取前100个元素,并排序,记为序列L。然后一次扫描剩余的元素x,与排好序的100个元素中最小的元素比,如果比这个最小的要大,那么把这个最小的元素删除,并把x利用插入排序的思想,插入到序列L中。依次循环,知道扫描了所有的元素。复杂度为O(100w*100)。
****************************************************************
大数据的本质:从数据中挖掘价值
云计算的本质:共享服务
【某公司笔试面试题】
1\使用mr,spark ,spark sql编写word count程序
【Spark 版本】
val conf=new SparkConf().setAppName("wd").setMaster("local[1]")
val sc=new SparkContext(conf,2)
//加载
val lines=sc.textFile("tructField("name",DataTypes.StringType,true)")
val paris=lines.flatMap(line=>line.split("^A"))
val words=paris.map((_,1))
val result=words.reduceByKey(_+_).sortBy(x=>x._1,false)
//打印
result.foreach(
wds=>{
println("单词:"+wds._1+" 个数:"+wds._2)
}
)
sc.stop()
【spark sql版本】
val conf=new SparkConf().setAppName("sqlWd").setMaster("local[1]")
val sc=new SparkContext(conf)
val sqlContext=new SQLContext(sc)
//加载
val lines=sqlContext.textFile("E:\idea15\createRecommeder\data\words.txt")
val words=lines.flatMap(x=>x.split(" ")).map(y=>Row(y))
val structType=StructType(Array(StructField("name",DataTypes.StringType,true)))
val df=sqlContext.createDataFrame(rows,structType)
df.registerTempTable("t_word_count")
sqlContext.udf.register("num_word",(name:String)=>1)
sqlContext.sql("select name,num_word(name) from t_word_count").groupBy(df.col("name")).count().show()
sc.stop()
2\hive的使用,内外部表的区别,分区作用,UDF和Hive优化
(1)hive使用:仓库、工具
(2)hive内外部表:内部表数据永久删除,外部表数据删除后、其他人依然可以访问
(3)分区作用:防止数据倾斜
(4)UDF函数:用户自定义的函数(主要解决格式,计算问题),需要继承UDF类
java代码实现
class TestUDFHive extends UDF {
public String evalute(String str){
try{
return "hello"+str
}catch(Exception e){
return str+"error"
}
}
}
(5)Hive优化:看做mapreduce处理
a\排序优化:sort by 效率高于 order by
b\分区:使用静态分区 (statu_date="20160516",location="beijin"),每个分区对应hdfs上的一个目录
c\减少job和task数量:使用表链接操作
d\解决groupby数据倾斜问题:设置hive.groupby.skewindata=true ,那么hive会自动负载均衡
e\小文件合并成大文件:表连接操作
f\使用UDF或UDAF函数:http://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888819.html
3\Hbase的rk设计,Hbase优化
a\rowkey:hbase三维存储中的关键(rowkey:行键 ,columnKey(family+quilaty):列键 ,timestamp:时间戳)
\rowkey字典排序、越短越好
\使用id+时间:9527+20160517 \使用hash散列:dsakjkdfuwdsf+9527+20160518
\应用中,rowkey 一般10~100bytes,8字节的整数倍,有利于提高操作系统性能
b\Hbase优化
\分区:RegionSplit()方法 \NUMREGIONS=9
\column不超过3个
\硬盘配置,便于regionServer管理和数据备份及恢复
\分配合适的内存给regionserver
其他:
hbase查询
(1)get
(2)scan
使用startRow和endRow限制
4\Linux常用操作
a\awk:
awk -F:`BEGIN{print "name ip "}{print $1 $7} END{print "结束"}` /etc/passwd
last | head -5 |awk `BEGIN{print "name ip"}{print $1 $3}END{print "结束了"}`
b\sed
5\java线程2种方式实现、设计模式、链表操作、排序
(1)2种线程实现
a\Thread类继承
TestCL th=new TestCL()//类继承Thread
th.start()
b\实现Runnable接口
Thread th=new Thread(new Runnable(){
public void run(){
//实现
}
})
th.start()
(2)设计模式,分为4类
a\创建模式:如工厂模式、单例模式
b\结构模式:代理模式
c\行为模式:观察者模式
d\线程池模式
6\【最熟悉的一个项目简介、架构图、使用的技术、你负责哪块】
7\cdh集群监控
(1)数据库监控 (2)主机监控 (3)服务监控 (4)活动监控
8\计算机网络工作原理
将分散的机器通过数据通信原理连接起来,实现共享!
9\hadoop生态系统
hdfs\mapreduce\hive\hbase\zookeeper\flume
hdfs原理及各个模块的功能 mapreduce原理 mapreduce优化 数据倾斜
11系统维护:hadoop升级datanode节点
12\【讲解项目要点:数据量、多少人、分工、运行时间、项目使用机器、算法、技术】
13\【学会向对方提问】
14\jvm运行机制及内存原理
运行:
I加载.class文件
II管理并且分配内存
III垃圾回收
内存原理:
IJVM装载环境和配置
II装载JVM.dll 并初始化JVM.dll
IV 处理class类
15\hdfs、yarn参数调优
mapreduce.job.jvm.num.tasks
默认为1,设置为 -1,重用jvm
16\Hbase、Hive、impala、zookeeper、Storm、spark原理和使用方法、使用其架构图讲解
【某公司笔试题】
1、如何为一个hadoop任务设置mappers的数量
答案:
使用job.setNumMapTask(int n)手动分割,这是不靠谱的
官方文档:“Note: This is only a hint to the framework”说明这个方法只是提示作用,不起决定性作用
实际上要用公式计算:
Max(min.split,min(max.split,block))就设置分片的最大最下值 computeSplitSize()设置
参考:http://blog.csdn.net/strongerbit/article/details/7440111
2、有可能使hadoop任务输出到多个目录中么?如果可以,怎么做?
答案:在1.X版本后使用MultipleOutputs.java类实现
源码:
MultipleOutputs.addNamedOutput(conf, "text2", TextOutputFormat.class, Long.class, String.class);
MultipleOutputs.addNamedOutput(conf, "text3", TextOutputFormat.class, Long.class, String.class);
参考:http://my.oschina.net/leejun2005/blog/94706
发音:Multiple['m?lt?pl]--》许多的
3、如何为一个hadoop任务设置要创建的reducer的数量
答案:job.setNumReduceTask(int n)
或者调整hdfs-site.xml中的mapred.tasktracker.reduce.tasks.maximum默认参数值
4、在hadoop中定义的主要公用InputFormats中,哪一个是默认值:
(A)TextInputFormat
(B)KeyValueInputFormat
(C)SequenceFileInputFormat
答案:A
5、两个类TextInputFormat和KeyValueTextInputFormat的区别?
答案:
?FileInputFormat的子类:
TextInputFormat(默认类型,键是LongWritable类型,值为Text类型,key为当前行在文件中的偏移量,value为当前行本身);
?KeyValueTextInputFormat(适合文件自带key,value的情况,只要指定分隔符即可,比较实用,默认是\t分割);
源码:
String sepStr =job.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");
注意:在自定义输入格式时,继承FileInputFormat父类
参考:http://www.cnblogs.com/vichao/archive/2013/06/06/3118100.html
6、在一个运行的hadoop任务中,什么是InputSpilt?
答案:InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定),默认情况下与block一样大。
拓展:需要在定义InputSplit后,展开讲解mapreduce的原理
7、Hadoop框架中,文件拆分是怎么被调用的?
答案:JobTracker, 创建一个InputFormat的 实例,调用它的getSplits()方法,把输入目录的文件拆分成FileSplist作 为Mapper task 的输入,生成Mapper task加入Queue。
源码中体现了拆分的数量
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);//minSplitSize默认是1
8、分别举例什么情况下使用combiner,什么情况下不会使用?
答案:Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了
9、Hadoop中job和Tasks之间的区别是什么?
答案:
job是工作的入口,负责控制、追踪、管理任务,也是一个进程
包含map task和reduce task
Tasks是map和reduce里面的步骤,主要用于完成任务,也是线程
10、Hadoop中通过拆分任务到多个节点运行来实现并行计算,但是某些节点运行较慢会拖慢整个任务的运行,hadoop采用何种机制应对这种情况?
答案:结果查看监控日志,得知产生这种现象的原因是数据倾斜问题
解决:
(1)调整拆分mapper的数量(partition数量)
(2)增加jvm
(3)适当地将reduce的数量变大
11、流API中的什么特性带来可以使map reduce任务可以以不同语言(如perl\ruby\awk等)实现的灵活性?
答案:用可执行文件作为Mapper和Reducer,接受的都是标准输入,输出的都是标准输出
参考:http://www.web520.cn/archives/9220
12、参考下面的M/R系统的场景:
--HDFS块大小为64MB
--输入类型为FileInputFormat
--有3个文件的大小分别是:64k 65MB 127MB
Hadoop框架会把这些文件拆分为多少块?
答案:
64k------->一个block
65MB---->两个文件:64MB是一个block,1MB是一个block
127MB--->两个文件:64MB是一个block,63MB是一个block
13、Hadoop中的RecordReader的作用是什么?
答案:属于split和mapper之间的一个过程
将inputsplit输出的行为一个转换记录,成为key-value的记录形式提供给mapper
14、Map阶段结束后,Hadoop框架会处理:Partitioning ,shuffle 和sort,在这个阶段都会发生了什么?
答案:
MR一共有四个阶段,split map shuff reduce 在执行完map之后,可以对map的输出结果进行分区,
分区:这块分片确定到哪个reduce去计算(汇总)
排序:在每个分区中进行排序,默认是按照字典顺序。
Group:在排序之后进行分组
15、如果没有定义partitioner,那么数据在被送达reducer前是如何被分区的?
答案:
Partitioner是在map函数执行context.write()时被调用。
用户可以通过实现自定义的?Partitioner来控制哪个key被分配给哪个?Reducer。
查看源码知道:
如果没有定义partitioner,那么会走默认的分区Hashpartitioner
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
参考:http://blog.csdn.net/gamer_gyt/article/details/47339755
16、什么是Combiner?
答案:这是一个hadoop优化性能的步骤,它发生在map与reduce之间
目的:解决了数据倾斜的问题,减轻网络压力,实际上时减少了maper的输出
源码信息如下:
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<Text, LongWritable> output, Reporter reporter)
throws IOException {
LongWritable maxValue = null;
while (values.hasNext()) {
LongWritable value = values.next();
if (maxValue == null) {
maxValue = value;
} else if (value.compareTo(maxValue) > 0) {
maxValue = value;
}
}
output.collect(key, maxValue);
}
在collect实现类中,有这样一段方法
public synchronized void collect(K key, V value)
throws IOException {
outCounter.increment(1);
writer.append(key, value);
if ((outCounter.getValue() % progressBar) == 0) {
progressable.progress();
}
}
下面是说明输出数量达到10000时,开始合并为一个maper
public static final long DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS = 10000;
Mapreduce原理详解:
http://my.oschina.NET/itblog/blog/275294
***************************************************
公司A:
-
1.讲讲你做的过的项目。 项目里有哪些 难点重点注意点呢?
-
2.讲讲多线程吧, 要是你,你怎么实现一个线程池呢?
-
3.讲一下Mapreduce或者hdfs的原理和机制。map读取数据分片。
-
4.shuffle 是什么? 怎么调优?
-
6.理论基础怎么样,比如数据结构,里面的快速排序,或者,树? 讲一讲你了解的树的知识?
-
7.数学怎么样呢?
-
8.讲一下数据库,SQl ,左外连接, 原理,实现?
- 9.还了解过数据的什么知识? 数据库引擎?
- 10.Hadoop的机架怎么配置的?
- 11.Hbase的设计有什么心得?
- 12.Hbase的操作是用的什么API还是什么工具?
-
13.对调度怎么理解.? 用什么工具吗?
-
14.用kettle 这种工具还是 自己写程序? 你们公司是怎么做的?
- 15.你们数据中心开发周期是多长?
- 16.你们hbase里面是存一些什么数据。
二面。三个人。
-
1.讲讲你做的项目。
-
2.平时 对多线程 这方面是怎么处理呢? 异步 是怎么思考呢? 遇到的一些锁啊, 是怎么做的呢? 比如两个人同时操作一样东西。怎么做的呢?一些并发操作设计到一些变量怎么做的呢?
- 3.你们用的最多是 http协议吧? 有没有特殊的头呢? 讲讲 你对tcp/ip的理解?
- 4.有没有用过Zookeeper呢? Zookeeper的适用场景是什么? HA 状态维护 分布式锁 全局配置文件管理 操作Zookeeper是用的什么?
Spark方面:
-
5.spark开发分两个方面?哪两个方面呢?
- 6.比如 一个读取hdfs上的文件,然后count有多少行的操作,你可以说说过程吗。那这个count是在内存中,还是磁盘中计算的呢?磁盘中。
- 7.spark和Mapreduce快? 为什么快呢? 快在哪里呢? 1.内存迭代。2.RDD设计。 3,算子的设计。
- 8.spark sql又为什么比hive快呢?
- 10.RDD的数据结构是怎么样的? Partition数组。 dependence
- 11.hadoop的生态呢。说说你的认识。 hdfs底层存储 hbase 数据库 hive数据仓库 Zookeeper分布式锁 spark大数据分析
公司B:
- 1.Spark工作的一个流程。
- 提交任务。QQ图片20161019131411.png
- 用户提交一个任务。 入口是从sc开始的。 sc会去创建一个taskScheduler。根据不同的提交模式, 会根据相应的taskchedulerImpl进行任务调度。 同时会去创建Scheduler和DAGScheduler。DAGScheduler 会根据RDD的宽依赖或者窄依赖,进行阶段的划分。划分好后放入taskset中,交给taskscheduler 。 appclient会到master上注册。首先会去判断数据本地化,尽量选最好的本地化模式去执行。 打散 Executor选择相应的Executor去执行。ExecutorRunner会去创建CoarseGrainerExecutorBackend进程。 通过线程池的方式去执行任务。 反向: Executor向 SchedulerBackend反向注册 Spark On Yarn模式下。 driver负责计算调度。appmaster 负责资源的申请。
-
2.Hbase的PUT的一个过程。
-
3.RDD算子里操作一个外部map比如往里面put数据。然后算子外再遍历map。有什么问题吗。
-
4.shuffle的过程。调优。
-
5.5个partition里面分布有12345678910.用算子求最大值或者和。不能用广播变量和累加器。或者sortbykey.
- 6.大表和小表join.
- 7.知道spark怎么读hbase吗?spark on hbase.。华为的。
- 8.做过hbase的二级索引吗?
- 9.sort shuffle的优点?
- 10.stage怎么划分的? 宽依赖窄依赖是什么?
公司W:
- 1.讲讲你做过的项目(一个整体思路)
- 2.问问大概情况。公司里集群规模。hbase数据量。数据规模。
- 3.然后挑选数据工厂开始详细问。问hbase.。加闲聊。
- 4.问二次排序是什么。topn是什么。二次排序要继承什么接口?
- 5.计算的数据怎么来的。
- 6.kakfadirect是什么,。为什么要用这个,有什么优点?。和其他的有什么区别。
- http://blog.csdn.net/erfucun/article/details/52275369
-
- /**
- * Create an input stream that directly pulls messages from Kafka Brokers
- * without using any receiver. This stream can guarantee that each message
- * from Kafka is included in transformations exactly once (see points below). * * Points to note: * - No receivers: This stream does not use any receiver. It directly queries Kafka * - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked * by the stream itself. For interoperability with Kafka monitoring tools that depend on * Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. * You can access the offsets used in each batch from the generated RDDs (see * [[org.apache.spark.streaming.kafka.HasOffsetRanges]]). * - Failure Recovery: To recover from driver failures, you have to enable checkpointing * in the [[StreamingContext]]. The information on consumed offset can be * recovered from the checkpoint. See the programming guide for details (constraints, etc.). * - End-to-end semantics: This stream ensures that every records is effectively received and * transformed exactly once, but gives no guarantees on whether the transformed data are * outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure * that the output operation is idempotent, or use transactions to output records atomically. * See the programming guide for more details. * * @param ssc StreamingContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" * to be set with Kafka broker(s) (NOT zookeeper servers) specified in * host1:port1,host2:port2 form. * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type */
- 7.问了shuffle过程。
- 8.怎么调优的,jvm怎么调优的?
- 9.jvm结构?堆里面几个区?
- 10.数据清洗怎么做的?
- 11.怎么用spark做数据清洗
- 12.跟我聊了spark的应用,商场里广告投放,以及黄牛检测
- 13.spark读取 数据,是几个Partition呢? hdfs几个block 就有几个 Partition?
- 14.spark on yarn的两种模式? client 模式? 和cluster模式?
- 15.jdbc?mysql的驱动包名字叫什么?
- 16.region多大会分区?
公司Q
- 1.说说Mapreduce?一整个过程的理解。讲一下。
- 2.hbase存数据用什么rowkey?加时间戳的话,会不会出现时间戳重复的问题,怎么做的呢?
- 3.Spring的两大模块? AOP,IOC在你们项目中分别是怎么用的呢?
- 4.你们集群的规模, 数据量?
公司M
- 1.画图,画Spark的工作模式,部署分布架构图
-
2.画图,画图讲解spark工作流程。以及在集群上和各个角色的对应关系。
- 3.java自带有哪几种线程池。
- 4.数据是怎么收集的。 kafka收集数据的原理?
- 5.画图,讲讲shuffle的过程。那你怎么在编程的时候注意避免这些性能问题。
- 6.讲讲列式存储的 parquet文件底层格式。
- 7.dataset和dataframe?
- 8.通过什么方式学习spark的?
- 9.有哪些数据倾斜,怎么解决?
- 10.宽依赖窄依赖?
- 11.yarn的原理?
- 12.BlockManager怎么管理硬盘和内存的。
- 13.哪些算子操作涉及到shuffle
- 14.看过源码? 你熟悉哪几个部分的源码?
- 15.集群上 nodemanager和ResourceManager的数量关系?
- 16.spark怎么整合hive? 大概这样。 spark on hive 。 hive还是hive 执行引擎是spark。
其他人的:
- 1.Spark如何处理结构化数据,Spark如何处理非结构话数据?
- 2.Spark性能优化主要有哪些手段?
- 3.简要描述Spark分布式集群搭建的步骤
- 4.对于Spark你觉得他对于现有大数据的现状的优势和劣势在哪里?
- 5.对于算法是否进行过自主的研究设计?
- 6.简要描述你了解的一些数据挖掘算法与内容 基本我有印象的就这几个问题,聊了2个多小时,脑子都差点被问干了
*******************************************************************
1.给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
假如每个url大小为10bytes,那么可以估计每个文件的大小为50G×64=320G,远远大于内存限制的4G,所以不可能将其完全加载到内存中处理,可以采用分治的思想来解决。
Step1:遍历文件a,对每个url求取hash(url)%1000,然后根据所取得的值将url分别存储到1000个小文件(记为a0,a1,...,a999,每个小文件约300M);
Step2:遍历文件b,采取和a相同的方式将url分别存储到1000个小文件(记为b0,b1,...,b999);
巧妙之处:这样处理后,所有可能相同的url都被保存在对应的小文件(a0vsb0,a1vsb1,...,a999vsb999)中,不对应的小文件不可能有相同的url。然后我们只要求出这个1000对小文件中相同的url即可。
Step3:求每对小文件ai和bi中相同的url时,可以把ai的url存储到hash_set/hash_map中。然后遍历bi的每个url,看其是否在刚才构建的hash_set中,如果是,那么就是共同的url,存到文件里面就可以了。
草图如下(左边分解A,右边分解B,中间求解相同url):
2.有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M,要求返回频数最高的100个词。
Step1:顺序读文件中,对于每个词x,取hash(x)%5000,然后按照该值存到5000个小文件(记为f0,f1,...,f4999)中,这样每个文件大概是200k左右,如果其中的有的文件超过了1M大小,还可以按照类似的方法继续往下分,直到分解得到的小文件的大小都不超过1M;
Step2:对每个小文件,统计每个文件中出现的词以及相应的频率(可以采用trie树/hash_map等),并取出出现频率最大的100个词(可以用含100个结点的最小堆),并把100词及相应的频率存入文件,这样又得到了5000个文件;
Step3:把这5000个文件进行归并(类似与归并排序);
草图如下(分割大问题,求解小问题,归并):
3.现有海量日志数据保存在一个超级大的文件中,该文件无法直接读入内存,要求从中提取某天出访问百度次数最多的那个IP。
Step1:从这一天的日志数据中把访问百度的IP取出来,逐个写入到一个大文件中;
Step2:注意到IP是32位的,最多有2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件;
Step3:找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率;
Step4:在这1000个最大的IP中,找出那个频率最大的IP,即为所求。
草图如下:
4.LVS和HAProxy相比,它的缺点是什么?
之前,的确是用LVS进行过MySQL集群的负载均衡,对HAProxy也有过了解,但是将这两者放在眼前进行比较,还真没试着了解过。面试中出现了这么一题,面试官给予的答案是LVS的配置相当繁琐,后来查找了相关资料,对这两种负载均衡方案有了更进一步的了解。LVS的负载均衡性能之强悍已经达到硬件负载均衡的F5的百分之60了,而HAproxy的负载均衡和Nginx负载均衡,均为硬件负载均衡的百分之十左右。由此可见,配置复杂,相应的效果也是显而易见的。在查找资料的过程中,试着将LVS的10种调度算法了解了一下,看似数量挺多的10种算法其实在不同的算法之间,有些只是有着一些细微的差别。在这10种调度算法中,静态调度算法有四种,动态调度算法有6种。
静态调度算法:
①RR轮询调度算法
这种调度算法不考虑服务器的状态,所以是无状态的,同时也不考虑每个服务器的性能,比如我有1-N台服务器,来N个请求了,第一个请求给第一台,第二个请求给第二台,,,第N个请求给第N台服务器,就酱紫。
②加权轮询
这种调度算法是考虑到服务器的性能的,你可以根据不同服务器的性能,加上权重进行分配相应的请求。
③基于目的地址的hash散列
这种调度算法和基于源地址的hash散列异曲同工,都是为了维持一个session,基于目的地址的hash散列,将记住同一请求的目的地址,将这类请求发往同一台目的服务器。简而言之,就是发往这个目的地址的请求都发往同一台服务器。而基于源地址的hash散列,就是来自同一源地址的请求都发往同一台服务器。
④基于源地址的hash散列
上述已讲,不再赘述。
动态调度
①最少连接调度算法
这种调度算法会记录响应请求的服务器上所建立的连接数,每接收到一个请求会相应的将该服务器的所建立连接数加1,同时将新来的请求分配到当前连接数最少的那台机器上。
②加权最少连接调度算法
这种调度算法在最少连接调度算法的基础上考虑到服务器的性能。当然,做这样子的考虑是有其合理性存在的,如果是同一规格的服务器,那么建立的连接数越多,必然越增加其负载,那么仅仅根据最少连接数的调度算法,必然可以实现合理的负载均衡。但如果,服务器的性能不一样呢?比如我有一台服务器,最多只能处理10个连接,现在建立了3个,还有一台服务器最多能处理1000条连接,现在建立了5个,如果单纯地按照上述的最少连接调度算法,妥妥的前者嘛,但前者已经建立了百分之三十的连接了,而后者连百分之一的连接还没有建立,试问,这合理吗?显然不合理。所以加上权重,才算合理。相应的公式也相当简单:active*256/weight。
③最短期望调度算法
这种算法,是避免出现上述加权最少连接调度算法中的一种特殊情况,导致即使加上权重,调度器也无差别对待了,举个栗子:
假设有三台服务器ABC,其当前所建立的连接数相应地为1,2,3,而权重也是1,2,3。那么如果按照加权最少连接调度算法的话,算出来是这样子的:
A:1256/1=256
B:2256/2=256
C:3256/3=256
我们会发现,即便加上权重,A、B、C,经过计算还是一样的,这样子调度器会无差别的在A、B、C中任选一台,将请求发过去。
而最短期望将active256/weight的算法改进为(active+1)256/weight
那么还是之前的例子:
A:(1+1)256/1=2/1256=2256
B:(2+1)256/2=3/2256=1.5256
C:(3+1)256、3=4/3256≈1.3256
显然C
④永不排队算法
将请求发给当前连接数为0的服务器上。
⑤基于局部的最少连接调度算法
这种调度算法应用于Cache系统,维持一个请求到一台服务器的映射,其实我们仔细想想哈,之前做的一系列最少连接相关的调度算法。考虑到的是服务器的状态与性能,但是一次请求并不是单向的,就像有一个从未合作过的大牛,他很闲,你让他去解决一个之前碰到过的一个问题,未必有找一个之前已经跟你合作过哪怕现在不怎么闲的臭皮匠效果好哦~,所以基于局部的最少连接调度算法,维持的这种映射的作用是,如果来了一个请求,相对应的映射的那台服务器,没有超载,ok交给老伙伴完事吧,俺放心,如果那台服务器不存在,或者是超载的状态且有其他服务器工作在一半的负载状态,则按最少连接调度算法在集群其余的服务器中找一台将请求分配给它。
⑥基于复制的局部最少连接调度算法
这种调度算法同样应用于cache系统,但它维持的不是到一台服务器的映射而是到一组服务器的映射,当有新的请求到来,根据最小连接原则,从该映射的服务器组中选择一台服务器,如果它没有超载则交给它去处理这个请求,如果发现它超载,则从服务器组外的集群中,按最少连接原则拉一台机器加入服务器组,并且在服务器组有一段时间未修改后,将最忙的那台服务器从服务器组中剔除。
5.Sqoop用起来感觉怎样?
说实话,Sqoop在导入数据的速度上确实十分感人,通过进一步了解,发现Sqoop1和Sqoop2在架构上还是有明显不同的,无论是从数据类型上还是从安全权限,密码暴露方面,Sqoop2都有了明显的改进,同时同一些其他的异构数据同步工具比较,如淘宝的DataX或者Kettle相比,Sqoop无论是从导入数据的效率上还是从支持插件的丰富程度上,Sqoop还是相当不错滴!!
6.ZooKeeper的角色以及相应的Zookepper工作原理?
果然,人的记忆力是有衰减曲线的,当面试官抛出这个问题后,前者角色,我只答出了两种(leader和follower),后者原理压根就模糊至忘记了。所以恶补了一下,涉及到Zookeeper的角色大概有如下四种:leader、learner(follower)、observer、client。其中leader主要用来决策和调度,follower和observer的区别仅仅在于后者没有写的职能,但都有将client请求提交给leader的职能,而observer的出现是为了应对当投票压力过大这种情形的,client就是用来发起请求的。而Zookeeper所用的分布式一致性算法包括leader的选举其实和-原始部落的获得神器为酋长,或者得玉玺者为皇帝类似,谁id最小,谁为leader,会根据你所配置的相应的文件在相应的节点机下生成id,然后相应的节点会通过getchildren()这个函数获取之前设置的节点下生成的id,谁最小,谁是leader。并且如果万一这个leader挂掉了或者堕落了,则由次小的顶上。而且在配置相应的zookeeper文件的时候回有类似于如下字样的信息:Server.x=AAAA:BBBB:CCCC。其中的x即为你的节点号哈,AAAA对应你所部属zookeeper所在的ip地址,BBBB为接收client请求的端口,CCCC为重新选举leader端口。
7.HBase的Insert与Update的区别?
这个题目是就着最近的一次项目问的,当时实现的与hbase交互的三个方法分别为insert、delete、update。由于那个项目是对接的一个项目,对接的小伙伴和我协商了下,不将update合并为insert,如果合并的话,按那个项目本身,其实通过insert执行overwrite相当于间接地Update,本质上,或者说在展现上是没什么区别的包括所调用的put。但那仅仅是就着那个项目的程序而言,如果基于HBaseshell层面。将同一rowkey的数据插入HBase,其实虽然展现一条,但是相应的timestamp是不一样的,而且最大的版本数可以通过配置文件进行相应地设置。
8.请简述大数据的结果展现方式。
1)报表形式
基于数据挖掘得出的数据报表,包括数据表格、矩阵、图形和自定义格式的报表等,使用方便、设计灵活。
2)图形化展现
提供曲线、饼图、堆积图、仪表盘、鱼骨分析图等图形形式宏观展现模型数据的分布情况,从而便于进行决策。
3)KPI展现
提供表格式绩效一览表并可自定义绩效查看方式,如数据表格或走势图,企业管理者可根据可度量的目标快速评估进度。
4)查询展现
按数据查询条件和查询内容,以数据表格来汇总查询结果,提供明细查询功能,并可在查询的数据表格基础上进行上钻、下钻、旋转等操作。
9.例举身边的大数据。
i.QQ,微博等社交软件产生的数据
ii.天猫,京东等电子商务产生的数据
iii.互联网上的各种数据
10.简述大数据的数据管理方式。
答:对于图像、视频、URL、地理位置等类型多样的数据,难以用传统的结构化方式描述,因此需要使用由多维表组成的面向列存储的数据管理系统来组织和管理数据。也就是说,将数据按行排序,按列存储,将相同字段的数据作为一个列族来聚合存储。不同的列族对应数据的不同属性,这些属性可以根据需求动态增加,通过这样的分布式实时列式数据库对数据统一进行结构化存储和管理,避免了传统数据存储方式下的关联查询。
11.什么是大数据?
答:大数据是指无法在容许的时间内用常规软件工具对其内容进行抓取、管理和处理的数据。
12.海量日志数据,提取出某日访问百度次数最多的那个IP。
首先是这一天,并且是访问百度的日志中的IP取出来,逐个写入到一个大文件中。注意到IP是32位的,最多有个2^32个IP。同样可以采用映射的方法,比如模1000,把整个大文件映射为1000个小文件,再找出每个小文中出现频率最大的IP(可以采用hash_map进行频率统计,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。
或者如下阐述(雪域之鹰):
算法思想:分而治之+Hash
1)IP地址最多有2^32=4G种取值情况,所以不能完全加载到内存中处理;
2)可以考虑采用“分而治之”的思想,按照IP地址的Hash(IP)%1024值,把海量IP日志分别存储到1024个小文件中。这样,每个小文件最多包含4MB个IP地址;
3)对于每一个小文件,可以构建一个IP为key,出现次数为value的Hashmap,同时记录当前出现次数最多的那个IP地址;
4)可以得到1024个小文件中的出现次数最多的IP,再依据常规的排序算法得到总体上出现次数最多的IP;
13.搜索引擎会通过日志文件把用户每次检索使用的所有检索串都记录下来,每个查询串的长度为1-255字节。
假设目前有一千万个记录(这些查询串的重复度比较高,虽然总数是1千万,但如果除去重复后,不超过3百万个。一个查询串的重复度越高,说明查询它的用户越多,也就是越热门。),请你统计最热门的10个查询串,要求使用的内存不能超过1G。
典型的TopK算法,还是在这篇文章里头有所阐述,详情请参见:十一、从头到尾彻底解析Hash表算法。
文中,给出的最终算法是:
第一步、先对这批海量数据预处理,在O(N)的时间内用Hash表完成统计(之前写成了排序,特此订正。July、2011.04.27);
第二步、借助堆这个数据结构,找出TopK,时间复杂度为N‘logK。
即,借助堆结构,我们可以在log量级的时间内查找和调整/移动。因此,维护一个K(该题目中是10)大小的小根堆,然后遍历300万的Query,分别和根元素进行对比所以,我们最终的时间复杂度是:O(N)+N’*O(logK),(N为1000万,N’为300万)。ok,更多,详情,请参考原文。
或者:采用trie树,关键字域存该查询串出现的次数,没有出现为0。最后用10个元素的最小推来对出现频率进行排序。
14.有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16字节,内存限制大小是1M。返回频数最高的100个词。
方案:顺序读文件中,对于每个词x,取hash(x)%5000,然后按照该值存到5000个小文件(记为x0,x1,…x4999)中。这样每个文件大概是200k左右。
如果其中的有的文件超过了1M大小,还可以按照类似的方法继续往下分,直到分解得到的小文件的大小都不超过1M。
对每个小文件,统计每个文件中出现的词以及相应的频率(可以采用trie树/hash_map等),并取出出现频率最大的100个词(可以用含100个结点的最小堆),并把100个词及相应的频率存入文件,这样又得到了5000个文件。下一步就是把这5000个文件进行归并(类似与归并排序)的过程了。
15.有10个文件,每个文件1G,每个文件的每一行存放的都是用户的query,每个文件的query都可能重复。要求你按照query的频度排序。
还是典型的TOPK算法,解决方案如下:
方案1:
顺序读取10个文件,按照hash(query)%10的结果将query写入到另外10个文件(记为)中。这样新生成的文件每个的大小大约也1G(假设hash函数是随机的)。
找一台内存在2G左右的机器,依次对用hash_map(query,query_count)来统计每个query出现的次数。利用快速/堆/归并排序按照出现次数进行排序。将排序好的query和对应的query_cout输出到文件中。这样得到了10个排好序的文件(记为)。
对这10个文件进行归并排序(内排序与外排序相结合)。
方案2:
一般query的总量是有限的,只是重复的次数比较多而已,可能对于所有的query,一次性就可以加入到内存了。这样,我们就可以采用trie树/hash_map等直接来统计每个query出现的次数,然后按出现次数做快速/堆/归并排序就可以了。
方案3:
与方案1类似,但在做完hash,分成多个文件后,可以交给多个文件来处理,采用分布式的架构来处理(比如MapReduce),最后再进行合并。
16.给定a、b两个文件,各存放50亿个url,每个url各占64字节,内存限制是4G,让你找出a、b文件共同的url?
方案1:可以估计每个文件安的大小为5G×64=320G,远远大于内存限制的4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。
遍历文件a,对每个url求取hash(url)%1000,然后根据所取得的值将url分别存储到1000个小文件(记为a0,a1,…,a999)中。这样每个小文件的大约为300M。
遍历文件b,采取和a相同的方式将url分别存储到1000小文件(记为b0,b1,…,b999)。这样处理后,所有可能相同的url都在对应的小文件(a0vsb0,a1vsb1,…,a999vsb999)中,不对应的小文件不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。
求每对小文件中相同的url时,可以把其中一个小文件的url存储到hash_set中。然后遍历另一个小文件的每个url,看其是否在刚才构建的hash_set中,如果是,那么就是共同的url,存到文件里面就可以了。
方案2:如果允许有一定的错误率,可以使用Bloomfilter,4G内存大概可以表示340亿bit。将其中一个文件中的url使用Bloomfilter映射为这340亿bit,然后挨个读取另外一个文件的url,检查是否与Bloomfilter,如果是,那么该url应该是共同的url(注意会有一定的错误率)。
Bloomfilter日后会在本BLOG内详细阐述。
17.在2.5亿个整数中找出不重复的整数,注,内存不足以容纳这2.5亿个整数。
方案1:采用2-Bitmap(每个数分配2bit,00表示不存在,01表示出现一次,10表示多次,11无意义)进行,共需内存2^32*2bit=1GB内存,还可以接受。然后扫描这2.5亿个整数,查看Bitmap中相对应位,如果是00变01,01变10,10保持不变。所描完事后,查看bitmap,把对应位是01的整数输出即可。
方案2:也可采用与第1题类似的方法,进行划分小文件的方法。然后在小文件中找出不重复的整数,并排序。然后再进行归并,注意去除重复的元素。
18.腾讯面试题:给40亿个不重复的unsignedint的整数,没排过序的,然后再给一个数,如何快速判断这个数是否在那40亿个数当中?
与上第6题类似,我的第一反应时快速排序+二分查找。以下是其它更好的方法:
方案1:oo,申请512M的内存,一个bit位代表一个unsignedint值。读入40亿个数,设置相应的bit位,读入要查询的数,查看相应bit位是否为1,为1表示存在,为0表示不存在。
dizengrong:
方案2:这个问题在《编程珠玑》里有很好的描述,大家可以参考下面的思路,探讨一下:
又因为2^32为40亿多,所以给定一个数可能在,也可能不在其中;
这里我们把40亿个数中的每一个用32位的二进制来表示
假设这40亿个数开始放在一个文件中。
然后将这40亿个数分成两类:
1.最高位为0
2.最高位为1
并将这两类分别写入到两个文件中,其中一个文件中数的个数<=20亿,而另一个>=20亿(这相当于折半了);
与要查找的数的最高位比较并接着进入相应的文件再查找
再然后把这个文件为又分成两类:
1.次最高位为0
2.次最高位为1
并将这两类分别写入到两个文件中,其中一个文件中数的个数<=10亿,而另一个>=10亿(这相当于折半了);
与要查找的数的次最高位比较并接着进入相应的文件再查找。
…….
以此类推,就可以找到了,而且时间复杂度为O(logn),方案2完。
附:这里,再简单介绍下,位图方法:
使用位图法判断整形数组是否存在重复
判断集合中存在重复是常见编程任务之一,当集合中数据量比较大时我们通常希望少进行几次扫描,这时双重循环法就不可取了。
位图法比较适合于这种情况,它的做法是按照集合中最大元素max创建一个长度为max+1的新数组,然后再次扫描原数组,遇到几就给新数组的第几位置上1,如遇到5就给新数组的第六个元素置1,这样下次再遇到5想置位时发现新数组的第六个元素已经是1了,这说明这次的数据肯定和以前的数据存在着重复。这种给新数组初始化时置零其后置一的做法类似于位图的处理方法故称位图法。它的运算次数最坏的情况为2N。如果已知数组的最大值即能事先给新数组定长的话效率还能提高一倍。
欢迎,有更好的思路,或方法,共同交流。
19.怎么在海量数据中找出重复次数最多的一个?
方案1:先做hash,然后求模映射为小文件,求出每个小文件中重复次数最多的一个,并记录重复次数。然后找出上一步求出的数据中重复次数最多的一个就是所求(具体参考前面的题)。
20.上千万或上亿数据(有重复),统计其中出现次数最多的钱N个数据。
方案1:上千万或上亿的数据,现在的机器的内存应该能存下。所以考虑采用hash_map/搜索二叉树/红黑树等来进行统计次数。然后就是取出前N个出现次数最多的数据了,可以用第2题提到的堆机制完成。
21.一个文本文件,大约有一万行,每行一个词,要求统计出其中最频繁出现的前10个词,请给出思想,给出时间复杂度分析。
方案1:这题是考虑时间效率。用trie树统计每个词出现的次数,时间复杂度是O(n*le)(le表示单词的平准长度)。然后是找出出现最频繁的前10个词,可以用堆来实现,前面的题中已经讲到了,时间复杂度是O(n*lg10)。所以总的时间复杂度,是O(n*le)与O(n*lg10)中较大的哪一个。
附、100w个数中找出最大的100个数。
方案1:在前面的题中,我们已经提到了,用一个含100个元素的最小堆完成。复杂度为O(100w*lg100)。
方案2:采用快速排序的思想,每次分割之后只考虑比轴大的一部分,知道比轴大的一部分在比100多的时候,采用传统排序算法排序,取前100个。复杂度为O(100w*100)。
方案3:采用局部淘汰法。选取前100个元素,并排序,记为序列L。然后一次扫描剩余的元素x,与排好序的100个元素中最小的元素比,如果比这个最小的要大,那么把这个最小的元素删除,并把x利用插入排序的思想,插入到序列L中。依次循环,知道扫描了所有的元素。复杂度为O(100w*100)。
第二部分、十个海量数据处理方法大总结
ok,看了上面这么多的面试题,是否有点头晕。是的,需要一个总结。接下来,本文将简单总结下一些处理海量数据问题的常见方法,而日后,本BLOG内会具体阐述这些方法。
一、Bloomfilter
适用范围:可以用来实现数据字典,进行数据的判重,或者集合求交集
基本原理及要点:
对于原理来说很简单,位数组+k个独立hash函数。将hash函数对应的值的位数组置1,查找时如果发现所有hash函数对应位都是1说明存在,很明显这个过程并不保证查找的结果是100%正确的。同时也不支持删除一个已经插入的关键字,因为该关键字对应的位会牵动到其他的关键字。所以一个简单的改进就是countingBloomfilter,用一个counter数组代替位数组,就可以支持删除了。
还有一个比较重要的问题,如何根据输入元素个数n,确定位数组m的大小及hash函数个数。当hash函数个数k=(ln2)*(m/n)时错误率最小。在错误率不大于E的情况下,m至少要等于n*lg(1/E)才能表示任意n个元素的集合。但m还应该更大些,因为还要保证bit数组里至少一半为0,则m应该>=nlg(1/E)*lge大概就是nlg(1/E)1.44倍(lg表示以2为底的对数)。
举个例子我们假设错误率为0.01,则此时m应大概是n的13倍。这样k大概是8个。
注意这里m与n的单位不同,m是bit为单位,而n则是以元素个数为单位(准确的说是不同元素的个数)。通常单个元素的长度都是有很多bit的。所以使用bloomfilter内存上通常都是节省的。
扩展:
Bloomfilter将集合中的元素映射到位数组中,用k(k为哈希函数个数)个映射位是否全1表示元素在不在这个集合中。Countingbloomfilter(CBF)将位数组中的每一位扩展为一个counter,从而支持了元素的删除操作。SpectralBloomFilter(SBF)将其与集合元素的出现次数关联。SBF采用counter中的最小值来近似表示元素的出现频率。
问题实例:给你A,B两个文件,各存放50亿条URL,每条URL占用64字节,内存限制是4G,让你找出A,B文件共同的URL。如果是三个乃至n个文件呢?
根据这个问题我们来计算下内存的占用,4G=2^32大概是40亿*8大概是340亿,n=50亿,如果按出错率0.01算需要的大概是650亿个bit。现在可用的是340亿,相差并不多,这样可能会使出错率上升些。另外如果这些urlip是一一对应的,就可以转换成ip,则大大简单了。
二、Hashing
适用范围:快速查找,删除的基本数据结构,通常需要总数据量可以放入内存
基本原理及要点:
hash函数选择,针对字符串,整数,排列,具体相应的hash方法。
碰撞处理,一种是openhashing,也称为拉链法;另一种就是closedhashing,也称开地址法,openedaddressing。
扩展:
d-lefthashing中的d是多个的意思,我们先简化这个问题,看一看2-lefthashing。2-lefthashing指的是将一个哈希表分成长度相等的两半,分别叫做T1和T2,给T1和T2分别配备一个哈希函数,h1和h2。在存储一个新的key时,同时用两个哈希函数进行计算,得出两个地址h1[key]和h2[key]。这时需要检查T1中的h1[key]位置和T2中的h2[key]位置,哪一个位置已经存储的(有碰撞的)key比较多,然后将新key存储在负载少的位置。如果两边一样多,比如两个位置都为空或者都存储了一个key,就把新key存储在左边的T1子表中,2-left也由此而来。在查找一个key时,必须进行两次hash,同时查找两个位置。
问题实例:
1).海量日志数据,提取出某日访问百度次数最多的那个IP。
IP的数目还是有限的,最多2^32个,所以可以考虑使用hash将ip直接存入内存,然后进行统计。
三、bit-map
适用范围:可进行数据的快速查找,判重,删除,一般来说数据范围是int的10倍以下
基本原理及要点:使用bit数组来表示某些元素是否存在,比如8位电话号码
扩展:bloomfilter可以看做是对bit-map的扩展
问题实例:
1)已知某个文件内包含一些电话号码,每个号码为8位数字,统计不同号码的个数。
8位最多99999999,大概需要99m个bit,大概10几m字节的内存即可。
2)2.5亿个整数中找出不重复的整数的个数,内存空间不足以容纳这2.5亿个整数。
将bit-map扩展一下,用2bit表示一个数即可,0表示未出现,1表示出现一次,2表示出现2次及以上。或者我们不用2bit来进行表示,我们用两个bit-map即可模拟实现这个2bit-map。
四、堆
适用范围:海量数据前n大,并且n比较小,堆可以放入内存
基本原理及要点:最大堆求前n小,最小堆求前n大。方法,比如求前n小,我们比较当前元素与最大堆里的最大元素,如果它小于最大元素,则应该替换那个最大元素。这样最后得到的n个元素就是最小的n个。适合大数据量,求前n小,n的大小比较小的情况,这样可以扫描一遍即可得到所有的前n元素,效率很高。
扩展:双堆,一个最大堆与一个最小堆结合,可以用来维护中位数。
问题实例:
1)100w个数中找最大的前100个数。
用一个100个元素大小的最小堆即可。
五、双层桶划分—-其实本质上就是【分而治之】的思想,重在“分”的技巧上!
适用范围:第k大,中位数,不重复或重复的数字
基本原理及要点:因为元素范围很大,不能利用直接寻址表,所以通过多次划分,逐步确定范围,然后最后在一个可以接受的范围内进行。可以通过多次缩小,双层只是一个例子。
扩展:
问题实例:
1).2.5亿个整数中找出不重复的整数的个数,内存空间不足以容纳这2.5亿个整数。
有点像鸽巢原理,整数个数为2^32,也就是,我们可以将这2^32个数,划分为2^8个区域(比如用单个文件代表一个区域),然后将数据分离到不同的区域,然后不同的区域在利用bitmap就可以直接解决了。也就是说只要有足够的磁盘空间,就可以很方便的解决。
2).5亿个int找它们的中位数。
这个例子比上面那个更明显。首先我们将int划分为2^16个区域,然后读取数据统计落到各个区域里的数的个数,之后我们根据统计结果就可以判断中位数落到那个区域,同时知道这个区域中的第几大数刚好是中位数。然后第二次扫描我们只统计落在这个区域中的那些数就可以了。
实际上,如果不是int是int64,我们可以经过3次这样的划分即可降低到可以接受的程度。即可以先将int64分成2^24个区域,然后确定区域的第几大数,在将该区域分成2^20个子区域,然后确定是子区域的第几大数,然后子区域里的数的个数只有2^20,就可以直接利用directaddrtable进行统计了。
六、数据库索引
适用范围:大数据量的增删改查
基本原理及要点:利用数据的设计实现方法,对海量数据的增删改查进行处理。
七、倒排索引(Invertedindex)
适用范围:搜索引擎,关键字查询
基本原理及要点:为何叫倒排索引?一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。
以英文为例,下面是要被索引的文本:
T0=“itiswhatitis”
T1=“whatisit”
T2=“itisabanana”
我们就能得到下面的反向文件索引:
“a”:{2}
“banana”:{2}
“is”:{0,1,2}
“it”:{0,1,2}
“what”:{0,1}
检索的条件”what”,”is”和”it”将对应集合的交集。
正向索引开发出来用来存储每个文档的单词的列表。正向索引的查询往往满足每个文档有序频繁的全文查询和每个单词在校验文档中的验证这样的查询。在正向索引中,文档占据了中心的位置,每个文档指向了一个它所包含的索引项的序列。也就是说文档指向了它包含的那些单词,而反向索引则是单词指向了包含它的文档,很容易看到这个反向的关系。
扩展:
问题实例:文档检索系统,查询那些文件包含了某单词,比如常见的学术论文的关键字搜索。
八、外排序
适用范围:大数据的排序,去重
基本原理及要点:外排序的归并方法,置换选择败者树原理,最优归并树
扩展:
问题实例:
1).有一个1G大小的一个文件,里面每一行是一个词,词的大小不超过16个字节,内存限制大小是1M。返回频数最高的100个词。
这个数据具有很明显的特点,词的大小为16个字节,但是内存只有1m做hash有些不够,所以可以用来排序。内存可以当输入缓冲区使用。
九、trie树
适用范围:数据量大,重复多,但是数据种类小可以放入内存
基本原理及要点:实现方式,节点孩子的表示方式
扩展:压缩实现。
问题实例:
1).有10个文件,每个文件1G,每个文件的每一行都存放的是用户的query,每个文件的query都可能重复。要你按照query的频度排序。
2).1000万字符串,其中有些是相同的(重复),需要把重复的全部去掉,保留没有重复的字符串。请问怎么设计和实现?
3).寻找热门查询:查询串的重复度比较高,虽然总数是1千万,但如果除去重复后,不超过3百万个,每个不超过255字节。
十、分布式处理mapreduce
适用范围:数据量大,但是数据种类小可以放入内存
基本原理及要点:将数据交给不同的机器去处理,数据划分,结果归约。
扩展:
问题实例:
1).ThecanonicalexampleapplicationofMapReduceisaprocesstocounttheappearancesof
eachdifferentwordinasetofdocuments:
2).海量数据分布在100台电脑中,想个办法高效统计出这批数据的TOP10。
3).一共有N个机器,每个机器上有N个数。每个机器最多存O(N)个数并对它们操作。如何找到N^2个数的中数(median)?
经典问题分析
上千万or亿数据(有重复),统计其中出现次数最多的前N个数据,分两种情况:可一次读入内存,不可一次读入。
可用思路:trie树+堆,数据库索引,划分子集分别统计,hash,分布式计算,近似统计,外排序
所谓的是否能一次读入内存,实际上应该指去除重复后的数据量。如果去重后数据可以放入内存,我们可以为数据建立字典,比如通过map,hashmap,trie,然后直接进行统计即可。当然在更新每条数据的出现次数的时候,我们可以利用一个堆来维护出现次数最多的前N个数据,当然这样导致维护次数增加,不如完全统计后在求前N大效率高。
如果数据无法放入内存。一方面我们可以考虑上面的字典方法能否被改进以适应这种情形,可以做的改变就是将字典存放到硬盘上,而不是内存,这可以参考数据库的存储方法。
当然还有更好的方法,就是可以采用分布式计算,基本上就是map-reduce过程,首先可以根据数据值或者把数据hash(md5)后的值,将数据按照范围划分到不同的机子,最好可以让数据划分后可以一次读入内存,这样不同的机子负责处理各种的数值范围,实际上就是map。得到结果后,各个机子只需拿出各自的出现次数最多的前N个数据,然后汇总,选出所有的数据中出现次数最多的前N个数据,这实际上就是reduce过程。
实际上可能想直接将数据均分到不同的机子上进行处理,这样是无法得到正确的解的。因为一个数据可能被均分到不同的机子上,而另一个则可能完全聚集到一个机子上,同时还可能存在具有相同数目的数据。比如我们要找出现次数最多的前100个,我们将1000万的数据分布到10台机器上,找到每台出现次数最多的前100个,归并之后这样不能保证找到真正的第100个,因为比如出现次数最多的第100个可能有1万个,但是它被分到了10台机子,这样在每台上只有1千个,假设这些机子排名在1000个之前的那些都是单独分布在一台机子上的,比如有1001个,这样本来具有1万个的这个就会被淘汰,即使我们让每台机子选出出现次数最多的1000个再归并,仍然会出错,因为可能存在大量个数为1001个的发生聚集。因此不能将数据随便均分到不同机子上,而是要根据hash后的值将它们映射到不同的机子上处理,让不同的机器处理一个数值范围。
而外排序的方法会消耗大量的IO,效率不会很高。而上面的分布式方法,也可以用于单机版本,也就是将总的数据根据值的范围,划分成多个不同的子文件,然后逐个处理。处理完毕之后再对这些单词的及其出现频率进行一个归并。实际上就可以利用一个外排序的归并过程。
另外还可以考虑近似计算,也就是我们可以通过结合自然语言属性,只将那些真正实际中出现最多的那些词作为一个字典,使得这个规模可以放入内存。
【某公司笔试面试题】
1使用mr,spark,sparksql编写wordcount程序
【Spark版本】
valconf=newSparkConf().setAppName("wd").setMaster("local[1]")
valsc=newSparkContext(conf,2)
//加载
vallines=sc.textFile("tructField("name",DataTypes.StringType,true)")
valparis=lines.flatMap(line=>line.split("^A"))
valwords=paris.map((_,1))
valresult=words.reduceByKey(_+_).sortBy(x=>x._1,false)
//打印
result.foreach(
wds=>{
println("单词:"+wds._1+"个数:"+wds._2)
}
)
sc.stop()
【sparksql版本】
valconf=newSparkConf().setAppName("sqlWd").setMaster("local[1]")
valsc=newSparkContext(conf)
valsqlContext=newSQLContext(sc)
//加载
vallines=sqlContext.textFile("E:idea15createRecommederdatawords.txt")
valwords=lines.flatMap(x=>x.split("")).map(y=>Row(y))
valstructType=StructType(Array(StructField("name",DataTypes.StringType,true)))
valdf=sqlContext.createDataFrame(rows,structType)
df.registerTempTable("t_word_count")
sqlContext.udf.register("num_word",(name:String)=>1)
sqlContext.sql("selectname,num_word(name)fromt_word_count").groupBy(df.col("name")).count().show()
sc.stop()
2hive的使用,内外部表的区别,分区作用,UDF和Hive优化
(1)hive使用:仓库、工具
(2)hive内外部表:内部表数据永久删除,外部表数据删除后、其他人依然可以访问
(3)分区作用:防止数据倾斜
(4)UDF函数:用户自定义的函数(主要解决格式,计算问题),需要继承UDF类
java代码实现
classTestUDFHiveextendsUDF{
publicStringevalute(Stringstr){
try{
return"hello"+str
}catch(Exceptione){
returnstr+"error"
}
}
}
(5)Hive优化:看做mapreduce处理
a排序优化:sortby效率高于orderby
b分区:使用静态分区(statu_date="20160516",location="beijin"),每个分区对应hdfs上的一个目录
c减少job和task数量:使用表链接操作
d解决groupby数据倾斜问题:设置hive.groupby.skewindata=true,那么hive会自动负载均衡
e小文件合并成大文件:表连接操作
f使用UDF或UDAF函数:hive中UDTF编写和使用(转) - ggjucheng - 博客园
3Hbase的rk设计,Hbase优化
aowkey:hbase三维存储中的关键(rowkey:行键,columnKey(family+quilaty):列键,timestamp:时间戳)
owkey字典排序、越短越好
使用id+时间:9527+20160517使用hash散列:dsakjkdfuwdsf+9527+20160518
应用中,rowkey一般10~100bytes,8字节的整数倍,有利于提高操作系统性能
bHbase优化
分区:RegionSplit()方法NUMREGIONS=9
column不超过3个
硬盘配置,便于regionServer管理和数据备份及恢复
分配合适的内存给regionserver
其他:
hbase查询
(1)get
(2)scan
使用startRow和endRow限制
4Linux常用操作
aawk:
awk-F:`BEGIN{print"nameip"}{print$1$7}END{print"结束"}`/etc/passwd
last|head-5|awk`BEGIN{print"nameip"}{print$1$3}END{print"结束了"}`
bsed
5java线程2种方式实现、设计模式、链表操作、排序
(1)2种线程实现
aThread类继承
TestCLth=newTestCL()//类继承Thread
th.start()
b实现Runnable接口
Threadth=newThread(newRunnable(){
publicvoidrun(){
//实现
}
})
th.start()
(2)设计模式,分为4类
a创建模式:如工厂模式、单例模式
b结构模式:代理模式
c行为模式:观察者模式
d线程池模式
6【最熟悉的一个项目简介、架构图、使用的技术、你负责哪块】
7cdh集群监控
(1)数据库监控(2)主机监控(3)服务监控(4)活动监控
8计算机网络工作原理
将分散的机器通过数据通信原理连接起来,实现共享!
9hadoop生态系统
hdfsmapreducehivehbasezookeeperlume
hdfs原理及各个模块的功能mapreduce原理mapreduce优化数据倾斜
11系统维护:hadoop升级datanode节点
12【讲解项目要点:数据量、多少人、分工、运行时间、项目使用机器、算法、技术】
13【学会向对方提问】
14jvm运行机制及内存原理
运行:
I加载.class文件
II管理并且分配内存
III垃圾回收
内存原理:
IJVM装载环境和配置
II装载JVM.dll并初始化JVM.dll
IV处理class类
15hdfs、yarn参数调优
mapreduce.job.jvm.num.tasks
默认为1,设置为-1,重用jvm
16Hbase、Hive、impala、zookeeper、Storm、spark原理和使用方法、使用其架构图讲解
【某公司笔试题】
1、如何为一个hadoop任务设置mappers的数量
答案:
使用job.setNumMapTask(intn)手动分割,这是不靠谱的
官方文档:“Note:Thisisonlyahinttotheframework”说明这个方法只是提示作用,不起决定性作用
实际上要用公式计算:
Max(min.split,min(max.split,block))就设置分片的最大最下值computeSplitSize()设置
参考:深度分析如何在Hadoop中控制Map的数量 - 张贵宾的技术专栏 - 博客频道 - CSDN.NET
2、有可能使hadoop任务输出到多个目录中么?如果可以,怎么做?
答案:在1.X版本后使用MultipleOutputs.java类实现
源码:
MultipleOutputs.addNamedOutput(conf,"text2",TextOutputFormat.class,Long.class,String.class);
MultipleOutputs.addNamedOutput(conf,"text3",TextOutputFormat.class,Long.class,String.class);
参考:MapReduce中的自定义多目录/文件名输出HDFS - leejun2005的个人页面 - 开源中国社区
发音:Multiple['m?lt?pl]--》许多的
3、如何为一个hadoop任务设置要创建的reducer的数量
答案:job.setNumReduceTask(intn)
或者调整hdfs-site.xml中的mapred.tasktracker.reduce.tasks.maximum默认参数值
4、在hadoop中定义的主要公用InputFormats中,哪一个是默认值:
(A)TextInputFormat
(B)KeyValueInputFormat
(C)SequenceFileInputFormat
答案:A
5、两个类TextInputFormat和KeyValueTextInputFormat的区别?
答案:
?FileInputFormat的子类:
TextInputFormat(默认类型,键是LongWritable类型,值为Text类型,key为当前行在文件中的偏移量,value为当前行本身);
?KeyValueTextInputFormat(适合文件自带key,value的情况,只要指定分隔符即可,比较实用,默认是分割);
源码:
StringsepStr=job.get("mapreduce.input.keyvaluelinerecordreader.key.value.separator","");
注意:在自定义输入格式时,继承FileInputFormat父类
6、在一个运行的hadoop任务中,什么是InputSpilt?
答案:InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定),默认情况下与block一样大。
拓展:需要在定义InputSplit后,展开讲解mapreduce的原理
7、Hadoop框架中,文件拆分是怎么被调用的?
答案:JobTracker,创建一个InputFormat的实例,调用它的getSplits()方法,把输入目录的文件拆分成FileSplist作为Mappertask的输入,生成Mappertask加入Queue。
源码中体现了拆分的数量
longgoalSize=totalSize/(numSplits==0?1:numSplits);
longminSize=Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE,1),minSplitSize);//minSplitSize默认是1
8、分别举例什么情况下使用combiner,什么情况下不会使用?
答案:Combiner适用于对记录汇总的场景(如求和),但是,求平均数的场景就不能使用Combiner了
9、Hadoop中job和Tasks之间的区别是什么?
答案:
job是工作的入口,负责控制、追踪、管理任务,也是一个进程
包含maptask和reducetask
Tasks是map和reduce里面的步骤,主要用于完成任务,也是线程
10、Hadoop中通过拆分任务到多个节点运行来实现并行计算,但是某些节点运行较慢会拖慢整个任务的运行,hadoop采用何种机制应对这种情况?
答案:结果查看监控日志,得知产生这种现象的原因是数据倾斜问题
解决:
(1)调整拆分mapper的数量(partition数量)
(2)增加jvm
(3)适当地将reduce的数量变大
11、流API中的什么特性带来可以使mapreduce任务可以以不同语言(如perlubyawk等)实现的灵活性?
答案:用可执行文件作为Mapper和Reducer,接受的都是标准输入,输出的都是标准输出
12、参考下面的M/R系统的场景:
--HDFS块大小为64MB
--输入类型为FileInputFormat
--有3个文件的大小分别是:64k65MB127MB
Hadoop框架会把这些文件拆分为多少块?
答案:
64k------->一个block
65MB---->两个文件:64MB是一个block,1MB是一个block
127MB--->两个文件:64MB是一个block,63MB是一个block
13、Hadoop中的RecordReader的作用是什么?
答案:属于split和mapper之间的一个过程
将inputsplit输出的行为一个转换记录,成为key-value的记录形式提供给mapper
14、Map阶段结束后,Hadoop框架会处理:Partitioning,shuffle和sort,在这个阶段都会发生了什么?
答案:
MR一共有四个阶段,splitmapshuffreduce在执行完map之后,可以对map的输出结果进行分区,
分区:这块分片确定到哪个reduce去计算(汇总)
排序:在每个分区中进行排序,默认是按照字典顺序。
Group:在排序之后进行分组
15、如果没有定义partitioner,那么数据在被送达reducer前是如何被分区的?
答案:
Partitioner是在map函数执行context.write()时被调用。
用户可以通过实现自定义的?Partitioner来控制哪个key被分配给哪个?Reducer。
查看源码知道:
如果没有定义partitioner,那么会走默认的分区Hashpartitioner
publicclassHashPartitionerextendsPartitioner{
/**Use{@linkObject#hashCode()}topartition.*/
publicintgetPartition(Kkey,Vvalue,intnumReduceTasks){
return(key.hashCode()&Integer.MAX_VALUE)%numReduceTasks;
}
}
16、什么是Combiner?
答案:这是一个hadoop优化性能的步骤,它发生在map与reduce之间
目的:解决了数据倾斜的问题,减轻网络压力,实际上时减少了maper的输出
源码信息如下:
publicvoidreduce(Textkey,Iteratorvalues,
OutputCollectoroutput,Reporterreporter)
throwsIOException{
LongWritablemaxValue=null;
while(values.hasNext()){
LongWritablevalue=values.next();
if(maxValue==null){
maxValue=value;
}elseif(value.compareTo(maxValue)>0){
maxValue=value;
}
}
output.collect(key,maxValue);
}
在collect实现类中,有这样一段方法
publicsynchronizedvoidcollect(Kkey,Vvalue)
throwsIOException{
outCounter.increment(1);
writer.append(key,value);
if((outCounter.getValue()%progressBar)==0){
progressable.progress();
}
}
**********************************************
Spark面试经典系列之数据倾斜
Spark面试经典系列之数据倾斜:数据倾斜之痛
1、Spark性能真正的杀手
2、数据倾斜之痛
数据倾斜两大直接致命性的后果:
1、OOM,一般OOM都是由于数据倾斜所致
2、速度变慢
数据倾斜基本形态特征:个别Task处理大量数据
数据倾斜的定位:
1、Web UI,可以清晰看见哪些Task运行的数据量大小
2、Log,Log的一个好处是可以清晰的告诉是哪一行出现问题OOM,同时可以清晰的看到在具体哪个Stage出现了数据倾斜(数据倾斜一般会在Shuffle过程中产生的),从而定位具体Shuffle的代码。也有可能发现绝大多数Task非常快,但是个别Task非常慢。
3、代码走读,重点看join、groupByKey、reduceByKey等关键代码
4、对数据特征分布进行分析;
解决原理和方法总论
1、Spark数据倾斜解决的原理总论
2、Spark数据倾斜解决方法总论
使数据膨胀,tachyon,复用RDD
Map端Reduce及问题思考
1、Spark数据倾斜解决之Map端Reduce
2、Map端Reduce的问题思考
给小的一段broadcast,然后在大的一端使用mapPartition。
如果数据量太大,有可能引起OOM
采样分而治之解决方案
1、采样算法解决数据倾斜的思想
2、采样算法在Spark数据倾斜中的具体操作
某个或某几个Key的Value非常大,从而导致数据倾斜
RDD1和RDD2进行join操作,其中我们采用采样的方式发现RDD1中有严重的数据倾斜的Key
第一步:采用Spark RDD中提供的采样接口,我们可以很方便的对全体(例如100亿条数据)进行采样,然后基于采样的数据,我们可以计算出哪个(哪些)Key的Value个数最多;
第二步:把全体数据分成两部分,即把原来RDD1变成RDD11和RDD12,其中RDD11代表导致数据倾斜的Key,RDD12中包含的是不会产生数据倾斜的Key;
第三步:把RDD11和RDD2进行join操作,且把RDD12和RDD2进行join操作,然后把分别join操作后的结果进行Union操作,从而得出和RDD1与RDD2直接进行join操作相同的结果
Spark自己的机制保证的不会产生数据倾斜。
上述流程中:
第一种情况:如果RDD11中的数据量不是很多,可以采用map端的join操作,避免了shuffle和数据倾斜。
第二种情况:如果RDD11中的数据量特别多,此时之所以能够缓解数据倾斜是因为采用了Spark Core天然的并行机制对RDD11中的同样一个Key的数据进行了拆分。从而达到让原本倾斜的Key分散到不同的Task的目的,就缓解了数据倾斜。
思考:在上述过程中如果把倾斜的Key加上随机数,会怎么样?
增加随机数,并行Task数量可能增加,具体是如何操作的?
RDD11中倾斜的Key加上1000以内的随机数,然后和RDD2进行join操作?不行!此时一定需要把RDD11中的Key在RDD2中的相同的Key进行1000以内的随机数,然后再进行join操作,这样做的好处:让倾斜的Key更加不倾斜,在实际生产环境下,会极大的解决在两个进行join的RDD数量都很大且其中一个RDD有一个或者两三个明显倾斜的Key的情况下的数据倾斜问题。
对于两个RDD数据量都很大且倾斜的Key特别多如何解决?
1、数据量都很大且倾斜的Key多的情况
2、此种情况下具体操作步骤
两个RDD数据都特别多且倾斜的Key成千上万个,该如何解决数据倾斜的问题?
初步的想法:在倾斜的Key上面加上随机数
该想法的原因:shuffle的时候把key的数据分到不同的task里去
但是现在的倾斜的key非常多,成千上万,所以如果说采样找出倾斜的key的话并不是一个非常好的想法
扩容?
首先,什么是扩容?就是把该RDD中的每一条数据变成5条、10条、20条等,例如RDD中原来是10亿条数据,扩容后可能变成1000亿条数据;
其次,如何做到扩容?flatMap中对要进行扩容的每一条数据都通过0~N-1个不同的前缀变成N条数据(例如变成)
问题:N的值可以随便取吗?需要考虑当前程序能够使用的Core的数量
答案:N的数值一般不能取的太大,通常小于50,否则会对磁盘、内存、网络都会形成极大负担,例如会造成OOM
N这个数值取成10和1000除了OOM等不同以外,是否还有其他影响呢?其实N的数值的大小还会对数据倾斜的解决程度构成直接的影响!N越大,越不容易倾斜,但是也会占用更多的内存、磁盘、网络以及(不必要的)消耗更多的CPU时间
模拟代码:
RDD1 join RDD2
rdd22 = RDD2.flatMap{
for(0 to 9) {
0_item
}
}
rdd11 = RDD1.map{
Random(10)
random_item
}
result = rdd11.join(rdd22)
result.map{
item_1.split 去掉前缀
}
并行度的深度使用
1、并行度的初级使用
2、并行度的高级使用
用并行度解决数据倾斜的基本应用:例如reduceByKey
改变并行度之所以能够改善数据倾斜的原因在于,如果某个Task有100个Key且数据量特别大,就极有可能导致OOM或者任务运行特别缓慢,此时如果把并行度变大,则可以分解该Task的数据量,例如把原本该Task的100个Key分解给10个Task,这个就可以减少每个Task的数据量,从而有可能解决OOM和任务慢的问题。
对于reduceByKey而言,你可以传入并行度的参数,也可以自定义Partitioner
增加Executor:改变计算资源,从仅仅数据倾斜的角度来看并不能够直接去解决数据倾斜的问题,但是也有好处,好处是可以同时并发运行更多的Task,结果是可能加快了运行速度。
用并行度解决数据倾斜的高级使用:例如reduceByKey
假设说有倾斜的Key,我们给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么破?进行map操作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作。(当然,如果你很无聊,可以再次做随机数前缀),这样我们就可以把原本倾斜的Key通过分而治之方案分散开来,最后又进行了全局聚合,在这里的本质还是通过改变并行度去解决数据倾斜的问题。
解决方案的“银弹”是什么?
1、数据倾斜解决方案总结
2、方案之外的方案
3、数据倾斜解决方案的“银弹”?
逃离Spark技术本身之外如何解决数据倾斜的问题?
之所以会有这样的想法,是因为从结果上来看,数据倾斜的产生来自于数据和数据的处理技术,前面几节课和大家分享都是数据的处理技术层面如何解决数据倾斜,因此,我们现在需要回到数据的层面去解决数据倾斜的问题。
数据本身就是Key-Value的存在方式,所谓的数据倾斜就是说某(几)个Key的Values特别多,所以如果要解决数据倾斜,实质上是解决单一的Key的Values的个数特别多的情况。新的数据倾斜解决方案由此诞生了。
1、把一个大的Key-Values数据分解成为Key-subKey-Values的方式
2、预先和其他的表进行join,将数据倾斜提前到上游的Hive ETL
3、可以把大的Key-Values中的Values组拼成为一个字符串,从而形成只有一个元素的Key-Value。
4、加一个中间适配层,当数据进来的时候进行Key的统计和动态排名,基于该排名动态调整Key分布
假如10万个Key都发生了数据倾斜,如何解决呢?此时一般就是加内存和Core
******************************************************
1、将现有逻辑在Spark上面实现。
2、数据倾斜怎么处理?
数据倾斜有很多解决方案,本例子简要介绍一种实现方式,假设表A 和表B连接,表A 数据倾斜,只有一个key倾斜,首先对A进行采样,统计出最倾斜的key,将A 表分隔为A1 只有倾斜 key, A2 不包含倾斜key, 然后分别与 表B 连接。
最后将结果合并, union
3 、各完成一个awk和sed的例子,最简单的应用即可,并说明。
cat /etc/passwd |awk -F ':' '{print $1}' 读取文件,指定文本分隔符,然后打印第一个域
cat test.sh | sed '2,50d' 读取文件的每一行,然后对每一行进行处理后打印在控制台, 有很多模式,很多方法。还可以直接修改文件
4、简要描述你知道的数据挖掘算法和使用场景
(一)基于分类模型的案例
( 1)垃圾邮件的判别 通常会采用朴素贝叶斯的方法进行判别
(2)医学上的肿瘤判断 通过分类模型识别
(二)基于预测模型的案例
(1)红酒品质的判断 分类回归树模型进行预测和判断红酒的品质
( 2)搜索引擎的搜索量和股价波动
(三)基于关联分析的案例:沃尔玛的啤酒尿布
(四)基于聚类分析的案例:零售客户细分
(五)基于异常值分析的案例:支付中的交易欺诈侦测
(六)基于协同过滤的案例:电商猜你喜欢和推荐引擎
(七)基于社会网络分析的案例:电信中的种子客户
(八)基于文本分析的案例
(1)字符识别:扫描王APP
(2)文学著作与统计:红楼梦归属
5、列举你知道的常用的Hadoop管理和监控的命令、比如hdfs dfs -mkdir /usr
-ls -cat -text -cp -put -chmod -chown
-du -get -copyFromLocal -copyToLocal
-mv -rm - tail -chgrp
6、评述hadoop运行原理
站在高处,大的方面讲解
1、有hdfs 负责数据存放 是Hadoop的分布式文件存储系统
- 将大文件分解为多个Block,每个Block保存多个副本。提供容错机制,副本丢失或者宕机时自动恢复。
- 默认每个Block保存3个副本,64M为1个Block。
- 将Block按照key-value映射到内存当中。
2、有mapreduce负责计算,Map(映射)和Reduce(归约)
7、讲述mapreduce的原理
ApplicationMaster 是一个详细的框架库,它结合从 ResourceManager 获得的资源和 NodeManager 协同工作来运行和监控任务。
ResourceManager 支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,
ApplicationMaster 的职责有:向调度器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败原因。
输入分片(input split)
map阶段:
combiner阶段:
三个代:年轻代(Young Generation)、年老代(Old Generation)和持久代(Permanent Generation)。
9、找出公共好友:
http://www.cnblogs.com/lucius/p/3483494.html
http://www.aboutyun.com/thread-18826-1-1.html
原理:A 有好友 B ,C,D F 有好友 D E F
其实A 的好友也是他好友的好友
其实F 的好友也是他的好友的好友
那么D 的共同好友就是 A F
10、combiner作用
Combiner最基本是实现本地key的聚合,对map输出的key排序、value进行迭代。
Combiner还有本地reduce功能(其本质上就是一个reduce):
使用Combiner先完成在map端的本地聚合,可以减少网络传输数据量,提高性能。
平均数的归约算法不能多次调用。
11、在mr环节中,那些环节需要优化,如何优化,请详细说明。
1、 setNumReduceTasks 适当的设置reduce的数量,如果数据量比较大,那么可以增加reduce的数量
2、适当的时候使用 combine 函数,减少网络传输数据量
3、压缩map和reduce的输出数据
4、使用SequenceFile二进制文件。
5、通过application 的ui页面观察job的运行参数
6、太多小文件,造成map任务过多的问题,应该可以先合并小文件,或者有一个特定的map作为处理小文件的输入
7、map端效率低原因分析
- 源文件的大小远小于HDFS的块的大小。这意味着任务的开启和停止要耗费更多的时间,就没有足够的时间来读取并处理输入数据。
- 源文件无法分块。这导致需要通过网络IO从其他节点读取文件块。
- 一个节点的本地磁盘或磁盘控制器运行在降级模式中,读取写入性能都很差。这会影响某个节点,而不是全部节点。
- 源文件不来自于HDFS。则可能是Hadoop节点和数据源之间的延迟导致了性能低下。
- Map任务从其他数据节点读取数据。可以从JobTracker的map任务细节信息和任务运行尝试中找到输入块的位置。如果输入块的位置不是任务执行的节点,那就不是本地数据了。
技术33 Reduce实例不足或过多
技术34 诊断reduce段的数据倾斜的问题
技术35 确定reduce任务是否存在整体吞吐量过低
技术36 缓慢的洗牌(shuffle)和排序
技术37 作业竞争和调度器限制
1.查找硬件的失效
CPU竞争
3 内存交换
4 磁盘健康
网络
技术46 规避使用reduce
技术48 使用combine
技术50 收集倾斜数据
减小Reduce端数据倾斜的性能损失
抽样和范围分区
方法2:自定义分区
方法3:Combine
方法4:Map端连接和半连接
数据大小倾斜的自定义策略
1.正则表达式(少用)
2.字符串令牌化(TOKENIZATION)Apache commons中的StringUtils类效率要更好
3.对象重用
4字符串连接
5对象的内存资源消耗
6.4.6 优化数据序列化
压缩
- 确保精确的度量,理解如何获得MapReduce和系统的性能指标
- 使用性能指标来减少潜在的性能问题
- 通过检查MapRecue/HDFS配置,优化MapReduce洗牌/排序阶段,优化用户JAVA代码,来修复常见的性能问题
技术24 使用Avro存储多个小文件
最简单的方案就是将HDFS中的小文件打包到一个大的文件容器中。这个技术中将本地磁盘中所有的目标文件存储到HDFS中的一个单独的Avro文件。然后在MapReduce中处理Avro文件和其中的小文件。
压缩依赖
CombineFileInputFormat
基于压缩的高效存储
-------------------------------
Hadoop(1)MapReduce 性能调优:性能测量(Measuring)
http://www.aboutyun.com/thread-15514-1-1.html
Hadoop(2)MapReduce 性能调优:理解性能瓶颈,诊断map性能瓶颈
http://www.aboutyun.com/thread-15517-1-1.html
Hadoop(3)MapReduce 性能调优:诊断reduce性能瓶颈
http://www.aboutyun.com/thread-15522-1-1.html
Hadoop(4)MapReduce 性能调优:诊断一般性能瓶颈
http://www.aboutyun.com/thread-15660-1-1.html
Hadoop(5)MapReduce 性能调优:诊断硬件性能瓶颈
http://www.aboutyun.com/thread-15534-1-1.html
Hadoop(6)MapReduce 性能调优:优化洗牌(shuffle)和排序阶段
http://www.aboutyun.com/thread-15545-1-1.html
Hadoop(7)MapReduce 性能调优:减小数据倾斜的性能损失
http://www.aboutyun.com/thread-15544-1-1.html
Hadoop(8)MapReduce 性能调优:优化MapReduce的用户Java代码
http://www.aboutyun.com/thread-15583-1-1.html
Hadoop(9)MapReduce 性能调优:优化数据序列化
http://www.aboutyun.com/thread-15658-1-1.html
Hadoop(10)MapReduce 文件处理:小文件
http://www.aboutyun.com/thread-15592-1-1.html
Hadoop(11)MapReduce 文件处理:基于压缩的高效存储(一)
http://www.aboutyun.com/thread-15626-1-1.html
Hadoop(12)MapReduce 文件处理:基于压缩的高效存储(二)
http://www.aboutyun.com/thread-15629-1-1.html
****************************************************************
使用案例 机器学习
- 1.商品特征未读降维 :SVD 、PCA
- 2.商品挂错页面检查:TF-IDF、SVM、Logistic - -Regression
- 3.相关推荐算法模型训练:Loginistic Regression、kmeans、SVM
- 4.商品爆品预测 :Loginistic Regression
- 5.关联性分析:FPGrowth
- 6.开发了基于Mllib的机器学习平台
经验分享
用户常见错误
1.
问题:Collect 大量数据到Driver端,导致driver oom;算法开发的时候没有注意
解决:driver不能堆积大量数据,尽量不要在driver保存数据
2.
问题:维表数据没用cache内存或者repartition数目太多
解决:将维表数据cache到内存,分区数目不能太多
3.
问题:未对Spark的持久化级别进行选择,需要根据实际的业务需求进行选择
解决:统计RDD的数据量,大数据量将Memory_AND_DISK作为首选
4.
问题:读写DB没有设置合理的分区数目,并发量太高,影响业务
解决:统计DB的表分区结构,监控DB服务load,压测到位
5.
问题 :Spark使用Hbase scan性能不稳定
解决:Get性能相对稳定,尽量使用Get
6.
问题:History server 重启需要回放180G日志,需要4个小时,新完成的app在History server无法立即看到
解决: 改为多线程会放 SPARK-13988
7.
问题 经常回出现class not found ,但是class文件再包里面存在
解决办法 打印classloadder分析,建议不要轻易修改源码classloader
8.
PCA算法只能支持小于14W feature特性
解决办法 使用SVD进行降维
9.
问题 FPGrowth不支持 KryoSerializer
解决办法 1.6.2 之前使用Java Serializer
10.
Spark在使用JDBC接口建立DataFrame时,需通过执行SQL来获取该JDBC数据源的Schema,导致创建大量的DataFrame的时候非常耗时
解决办法:Schema相同的table可以不用重复获取schema
地址:https://github.com/ouyangshourui/SparkJDBCSchema/wiki
4000个DataFrame的初始化时间从原先的25分钟缩短为10分钟以内
Spark平台权限
1.4.0 Standalone cluster模式不支持多用户
相关组件读写权限问题无法解决,比如读取Hive、hbase、HDFS数据的权限问题
解决办法:修改SparkContext sparkuser 和system username
代码地址:https://github.com/ouyangshourui/ouyangshourui/StandalongClusterAuthorization/wiki
Spark Sql hive元数据密码加密,javax.jdo.option.ConnectionPassword暴露给用户比较危险
解决办法:修改HiveContext.Scala文件中的metadataHive变量,选择自定义的解密算法解密
代码地址: https://github.com/ouyangshourui/HivePasswordEncryptionDecryption/wiki
Spark1.5.2 Sql放大了Hive读权限,任何用户都可以读取别的用户的Hive表数据
临时解决办法,生成HiveTableScan operator时调用driver已有的Hive Client 权限接口检查当前用户的读权限
https://github.com/ouyangshourui/HiveReadpermission/wiki
升级遇到的问题
1.4.0 Standalone 升级到1.5.2 on Yarn
用户代码使用system.exit(-1) RM web UI却显示正常,建议直接hrow exception
-
自定义封装MySQL PostgreSql JDBC 没有考虑driver JDBC Dialect的实现导致数据无法返回。
-
每个Executor都与Hive简历connection去获取HiveConf 没有broadcast HiveConf(SPARK -10679)
多版本Spark Dynamic Persource Allocation无法共存,DRA需重启Yarn NodeManger ,耦合性太强(没有解决)
TODO
Spark Streaming 全面落地,吸收Apache Bean思想
Spark Sql替代大部分Hive任务
SPark现有任务优化加速
完善机器学习平台 覆盖大部分电商和金融领域机器学习算法库
全面拥抱Spark2.0 参与社区
************************************************************************
Big Data 面试题总结
JAVA相关
1-1)List 与set 的区别?
老掉牙的问题了,还在这里老生常谈:List特点:元素有放入顺序,元素可重复 ,Set特点:元素无放入顺序,元素不可重复。
1-2)数据库的三大范式?
原子性、一致性、唯一性
1-3)java 的io类的图解
1-4)对象与引用对象的区别
对象就是好没有初始化的对象,引用对象即使对这个对象进行了初始化,这个初始化可以使自己的直接new的也可以是直接其他的赋值的,那么背new或者背其他赋值的我们叫做是引用对象,最大的区别于
1-5)谈谈你对反射机制的理解及其用途?
反射有三种获取的方式,分别是:forName / getClass / 直接使用class方式 使用反射可以获取类的实例
1-6)列出至少五种设计模式
设计方式有工厂法,懒加载,观察者模式,静态工厂,迭代器模式,外观模式、、、、
1-7)RPC 原理?
Rpc分为同步调用和一部调用,异步与同步的区别在于是否等待服务器端的返回值。Rpc的组件有RpcServer,RpcClick,RpcProxy,RpcConnection,RpcChannel,RpcProtocol,RpcInvoker等组件,
1-8)ArrayList、Vector、LinkedList 的区别及其优缺点?HashMap、HashTable 的区别及优缺点?
ArrayList 和 Vector 是采用数组方式存储数据的,是根据索引来访问元素的,都可以
根据需要自动扩展内部数据长度,以便增加和插入元素,都允许直接序号索引元素,但
是插入数据要涉及到数组元素移动等内存操作,所以索引数据快插入数据慢,他们最大
的区别就是 synchronized 同步的使用。
LinkedList 使用双向链表实现存储,按序号索引数据需要进行向前或向后遍历,但
是插入数据时只需要记录本项的前后项即可,所以插入数度较快!
如果只是查找特定位置的元素或只在集合的末端增加、移除元素,那么使用 Vector
或 ArrayList 都可以。如果是对其它指定位置的插入、删除操作,最好选择 LinkedList
HashMap、HashTable 的区别及其优缺点:
HashTable 中的方法是同步的 HashMap 的方法在缺省情况下是非同步的 因此在多线程环境下需要做额外的同步机制。
HashTable 不允许有 null 值 key 和 value 都不允许,而 HashMap 允许有 null 值 key和 value 都允许 因此 HashMap 使用 containKey()来判断是否存在某个键。
HashTable 使用 Enumeration ,而 HashMap 使用 iterator。
Hashtable 是 Dictionary 的子类,HashMap 是 Map 接口的一个实现类。
1-9)使用 StringBuffer 而不是 String
当需要对字符串进行操作时,使用 StringBuffer 而不是 String,String 是 read-only 的,如果对它进行修改,会产生临时对象,而 StringBuffer 是可修改的,不会产生临时对象。
1-10)集合的扩充
ArrayList list = new ArrayList(90000); list扩充多少次??
public ArrayList() {
this(10);
}
默认的扩充是10由此计算
1-11)java的拆包与封包的问题
System.out.println("5" + 2);
52
1-12)Java中Class.forName和ClassLoader.loadClass的区别
Class.forName("xx.xx")等同于Class.forName("xx.xx",true,CALLClass.class.getClassLoader()),第二个参数(bool)表示装载类的时候是否初始化该类,即调用类的静态块的语句及初始化静态成员变量。
ClassLoader loader = Thread.currentThread.getContextClassLoader(); //也可以用(ClassLoader.getSystemClassLoader())
Class cls = loader.loadClass("xx.xx"); //这句话没有执行初始化
forName可以控制是否初始化类,而loadClass加载时是没有初始化的。
1-13)hashMap与hashTable的区别
HashMap Hashtable
父类 AbstractMap Dictiionary
是否同步 否 是
k,v可否null 是 否
Hashtable和HashMap采用的hash/rehash算法都大概一样,所以性能不会有很大的差异。
1-14)怎样实现数组的反转
ArrayList arrayList = new ArrayList();
arrayList.add("A");
arrayList.add("B");
对数组进行反转
Collections.reverse(arrayList);
1-15)请使用JAVA实现二分查找
一般的面试者都是些向看看你的思路,所以一般答题时只需要把思路写出来即可。
具体的实现如下:
二分查找就是折半查找,要想折半就必须把原来的数据进行排序,才能方便的查找:
实现代码如下:
public static int binarySearch(int[] srcArray, int des){
int low = 0;
int high = srcArray.length-1;
while(low <= high) {
int middle = (low + high)/2;
if(des == srcArray[middle]) {
return middle;
}else if(des <srcArray[middle]) {
high = middle - 1;
}else {
low = middle + 1;
}
}
return -1;
}
1-16)java 中有两个线程怎样等待一个线程执行完毕
可以使用join关键字
1-17)hashmap hashtable currentHashMap的使用区别
hashmap hashtable 的醉的的区别在于hashtable 是线程安全的,而hashmap 不是线程安全的,currentHashMap也是线程安全的。
ConcurrentHashMap是使用了锁分段技术技术来保证线程安全的。所分段的技术是:讲数据分成一段一段的储存,给每一段的数据添加一把锁,当线程访问一个数据时,其他的数据可以被访问。
1-18)简单描述一下java的gc机制,常用的JAVA调优的方法,OOM如何产生的,如何处理OOM问题???
1、程序在运行时会产生很多的对象的信息,当这些对象的信息没有用时,则会被gc回收
2、调优的方式主要是调节年轻代与老年代的内存的大小
3、OOM是OutOfMemory的缩写(搞得跟多高大上似的)就是线程创建的多了,没有及时的回收过来所产生的,代码如下:
public class JavaVMStackOOM {
private void dontStop() {
while (true) {
}
}
public void stackLeakByThread() {
while (true) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
dontStop();
}
});
thread.start();
}
}
public static void main(String[] args) {
JavaVMStackOOM oom = new JavaVMStackOOM();
oom.stackLeakByThread();
}
4、既然知道以上的现象,在写代码时应该注意,不要过多的创建线程的数目。
Linux 相关
2-1)关闭不必要的服务
A、使用ntsysv命令查看开启与关闭的服务
B、停止打印服务
[root@hadoop1 /]# /etc/init.d/cups stop
[root@hadoop1 /]# chkconfig cups off
2-2)关闭IP6
[root@hadoop1 /]# vim /etc/modprobe.conf
在下面添加一下配置:
alias net-pf-10 off
alias ipv6 off
2-3)调整文件的最大的打开数
查看当前的文件的数量:[root@hadoop1 /]#ulimit -a
修改配置:
[root@hadoop1 /]# vi /etc/security/limits.conf 在文件最后加上:
* soft nofile 65535
* hard nofile 65535
* soft nproc 65535
* hard nproc 65535
2-4)修改 linux 内核参数
[root@hadoop1 /]# vi /etc/sysctl.conf
在文本的最后追加一下内容:
net.core.somaxconn = 32768
表示物理内存使用到 90%(100-10=90)的时候才使用 swap 交换区
2-5)关闭 noatime
在最后追加一下内容
/dev/sda2 /data ext3 noatime,nodiratime 0 0
2-6)请用shell命令把某一个文件下的所有的文件分发到其他的机器上
Scp -r /user/local hadoop2:/user/local
2-7)echo 1+1 && echo "1+1" 会输出什么
[root@hadoop1 ~]# echo 1+1 && echo "1+1"
1+1
1+1
[root@hadoop1 ~]# echo 1+1 && echo "1+1" && echo "1+" 1
1+1
1+1
1+ 1
2-8)在当前的额目录下找出包含祖母a并且文件的额大小大于55K的文件
[root@hadoop1 test]# find .| grep -ri "a"
a.text:a
后半句没有写出来,有时间在搞
2-9)linux用什么命令查看cpu,硬盘,内存的信息?
Top 命令
Hadoop相关
3-1)简单概述hdfs原理,以及各个模块的职责
1、客户端向 nameNode 发送要上传文件的请求
2、nameNode 返回给用户是否能上传数据的状态
3、加入用户端需要上传一个 1024M 的文件,客户端会通过 Rpc 请求 NameNode,并返回需要上传给那些DataNode(分配机器的距离以及空间的大小等),namonode会选择就近原则分配机器。
4、客户端请求建立 block 传输管道 chnnel 上传数据
5、在上传是 datanode 会与其他的机器建立连接并把数据块传送到其他的机器上
6、dataNode 向 namenode 汇报自己的储存情况以及自己的信息
7、档第一个快上传完后再去执行其他的复制的传送
3-2)mr的工作原理
1、当执行mr程序是,会执行一个Job
2、客户端的jobClick会请求namenode的jobTracker要执行任务
3、jobClick会去HDFS端复制作业的资源文件
4、客户端的jobClick会向namenode提交作业,让namenode做准备
5、Namenode的jobTracker会去初始化创建的对象
6、Namenode会获取hdfs的划分的分区
7、Namenode去检查TaskTracker的心跳信息,查看存活的机器
8、当执行的datenode执行任务时Datenode会去HDFS获取作业的资源的文件
9、TaskTracker会去执行代码,并登陆JVM的执行渠道
10、JVM或执行MapTask或者ReduceTask
11、执行终结
3-3)怎样判断文件时候存在
这是Linux上的知识,只需要在IF[ -f ] 括号中加上-f参数即可判断文件是否存在
3-4)fsimage和edit的区别?
大家都知道namenode与secondary namenode 的关系,当他们要进行数据同步时叫做checkpoint时就用到了fsimage与edit,fsimage是保存最新的元数据的信息,当fsimage数据到一定的大小事会去生成一个新的文件来保存元数据的信息,这个新的文件就是edit,edit会回滚最新的数据。
3-5)hdfs中的block默认保存几份?
不管是hadoop1.x 还是hadoop2.x 都是默认的保存三份,可以通过参数dfs.replication就行修改,副本的数目要根据机器的个数来确定。
3-6)列举几个配置文件优化?
Core-site.xml 文件的优化
fs.trash.interval
默认值: 0
说明: 这个是开启hdfs文件删除自动转移到垃圾箱的选项,值为垃圾箱文件清除时间。一般开启这个会比较好,以防错误删除重要文件。单位是分钟。
dfs.namenode.handler.count
默认值:10
说明:Hadoop系统里启动的任务线程数,这里改为40,同样可以尝试该值大小对效率的影响变化进行最合适的值的设定。
mapreduce.tasktracker.http.threads
默认值:40
说明:map和reduce是通过http进行数据传输的,这个是设置传输的并行线程数。
3-7) 谈谈数据倾斜,如何发生的,并给出优化方案
数据的倾斜主要是两个的数据相差的数量不在一个级别上,在只想任务时就造成了数据的倾斜,可以通过分区的方法减少reduce数据倾斜性能的方法,例如;抽样和范围的分区、自定义分区、数据大小倾斜的自定义侧咯
3-8)简单概括安装hadoop的步骤
1.创建 hadoop 帐户。
2.setup.改 IP。
3.安装 Java,并修改/etc/profile 文件,配置 java 的环境变量。
4.修改 Host 文件域名。
5.安装 SSH,配置无密钥通信。
6.解压 hadoop。
7.配置 conf 文件下 hadoop-env.sh、core-site.sh、mapre-site.sh、hdfs-site.sh。
8.配置 hadoop 的环境变量。
9.Hadoop namenode -format
10.Start-all.sh
3-9)简单概述hadoop中的角色的分配以及功能
Namenode:负责管理元数据的信息
SecondName:做namenode冷备份,对于namenode的机器当掉后能快速切换到制定的Secondname上
DateNode:主要做储存数据的。
JobTracker:管理任务,并把任务分配到taskTasker
TaskTracker:执行任务的
3-10)怎样快速的杀死一个job
1、执行hadoop job -list 拿到job-id
2、Hadoop job kill hadoop-id
3-11)新增一个节点时怎样快速的启动
Hadoop-daemon.sh start datanode
3-12)你认为用java , streaming , pipe 方式开发map/reduce,各有什么优点
开发mapReduce只用过java与Hive,不过使用java开发mapreduce显得笨拙,效率也慢,基于java慢的原因于是hive,这样就方便了查询与设计
3-13)简单概述hadoop的join的方法
Hadoop 常用的jion有reduce side join , map side join , SemiJoin 不过reduce side join 与 map side join 比较常用,不过都是比较耗时的。
3-14)简单概述hadoop的combinet与partition的区别
combine和partition都是函数,中间的步骤应该只有shuffle! combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,可以自定义的,partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。这里其实可以理解归类。
3-15 ) hdfs 的数据压缩算法
Hadoop 的压缩算法有很多,其中比较常用的就是gzip算法与bzip2算法,都可以可通过CompressionCodec来实现
3-16)hadoop的调度
Hadoop 的调度有三种其中fifo的调度hadoop的默认的,这种方式是按照作业的优先级的高低与到达时间的先后执行的,还有公平调度器:名字见起意就是分配用户的公平获取共享集群呗!容量调度器:让程序都能货到执行的能力,在队列中获得资源。
3-17)reduce 后输出的数据量有多大?
输出的数据量还不是取决于map端给他的数据量,没有数据reduce也没法运算啊!!
3-18) datanode 在什么情况下不会备份?
Hadoop保存的三个副本如果不算备份的话,那就是在正常运行的情况下不会备份,也是就是在设置副本为1的时候不会备份,说白了就是单台机器呗!!还有datanode 在强制关闭或者非正常断电不会备份。
3-19)combine 出现在那个过程?
Hadoop的map过程,根据意思就知道结合的意思吗,剩下的你们就懂了。想想wordcound
3-20) hdfs 的体系结构?
HDFS有 namenode、secondraynamenode、datanode 组成。
namenode 负责管理 datanode 和记录元数据
secondraynamenode 负责合并日志
datanode 负责存储数据
3-21) hadoop flush 的过程?
Flush 就是把数据落到磁盘,把数据保存起来呗!
3-22) 什么是队列
队列的实现是链表,消费的顺序是先进先出。
3-23)三个 datanode,当有一个 datanode 出现错误会怎样?
第一不会给储存带来影响,因为有其他的副本保存着,不过建议尽快修复,第二会影响运算的效率,机器少了,reduce在保存数据时选择就少了,一个数据的块就大了所以就会慢。
3-24)mapReduce 的执行过程
首先map端会Text 接受到来自的数据,text可以把数据进行操作,最后通过context把key与value写入到下一步进行计算,一般的reduce接受的value是个集合可以运算,最后再通过context把数据持久化出来。
3-25)Cloudera 提供哪几种安装 CDH 的方法
· Cloudera manager
· Tarball
· Yum
· Rpm
3-26)选择题与判断题
http://blog.csdn.NET/jiangheng0535/article/details/16800415
3-27)hadoop的combinet与partition效果图
3-28)hadoop 的机架感知(或者说是扩普)
看图说话
数据块会优先储存在离namenode进的机器或者说成离namenode机架近的机器上,正好是验证了那句话不走网络就不走网络,不用磁盘就不用磁盘。
3-29)文件大小默认为 64M,改为 128M 有啥影响?
这样减少了namenode的处理能力,数据的元数据保存在namenode上,如果在网络不好的情况下会增到datanode的储存速度。可以根据自己的网络来设置大小。
3-30)datanode 首次加入 cluster 的时候,如果 log 报告不兼容文件版本,那需要namenode 执行格式化操作,这样处理的原因是?
这样处理是不合理的,因为那么 namenode 格式化操作,是对文件系统进行格式
化,namenode 格式化时清空 dfs/name 下空两个目录下的所有文件,之后,会在目
录 dfs.name.dir 下创建文件。
文本不兼容,有可能时 namenode 与 datanode 的 数据里的 namespaceID、
clusterID 不一致,找到两个 ID 位置,修改为一样即可解决。
3-31)什么 hadoop streaming?
提示:指的是用其它语言处理
3-32)MapReduce 中排序发生在哪几个阶段?这些排序是否可以避免?为什么?
一个 MapReduce 作业由 Map 阶段和 Reduce 阶段两部分组成,这两阶段会对数
据排序,从这个意义上说,MapReduce 框架本质就是一个 Distributed Sort。在 Map
阶段,在 Map 阶段,Map Task 会在本地磁盘输出一个按照 key 排序(采用的是快速
排序)的文件(中间可能产生多个文件,但最终会合并成一个),在 Reduce 阶段,每
个 Reduce Task 会对收到的数据排序,这样,数据便按照 Key 分成了若干组,之后以
组为单位交给 reduce()处理。很多人的误解在 Map 阶段,如果不使用 Combiner
便不会排序,这是错误的,不管你用不用 Combiner,Map Task 均会对产生的数据排
序(如果没有 Reduce Task,则不会排序,实际上 Map 阶段的排序就是为了减轻 Reduce
端排序负载)。由于这些排序是 MapReduce 自动完成的,用户无法控制,因此,在
hadoop 1.x 中无法避免,也不可以关闭,但 hadoop2.x 是可以关闭的。
3-33)hadoop的shuffer的概念
Shuffer是一个过程,实在map端到reduce在调reduce数据之前都叫shuffer,主要是分区与排序,也就是内部的缓存分分区以及分发(是reduce来拉数据的)和传输
3-34)hadoop的优化
1、优化的思路可以从配置文件和系统以及代码的设计思路来优化
2、配置文件的优化:调节适当的参数,在调参数时要进行测试
3、代码的优化:combiner的个数尽量与reduce的个数相同,数据的类型保持一致,可以减少拆包与封包的进度
4、系统的优化:可以设置linux系统打开最大的文件数预计网络的带宽MTU的配置
5、为 job 添加一个 Combiner,可以大大的减少shuffer阶段的maoTask拷贝过来给远程的 reduce task的数据量,一般而言combiner与reduce相同。
6、在开发中尽量使用stringBuffer而不是string,string的模式是read-only的,如果对它进行修改,会产生临时的对象,二stringBuffer是可修改的,不会产生临时对象。
7、修改一下配置:
一下是修改 mapred-site.xml 文件
修改最大槽位数
槽位数是在各个 tasktracker 上的 mapred-site.xml 上设置的,默认都是 2
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
task 的最大数
<value>2</value>
</property>
<property>
<name>mapred.tasktracker.reduce.tasks.maximum</name>
ducetask 的最大数
<value>2</value>
</property>
调整心跳间隔
集群规模小于 300 时,心跳间隔为 300 毫秒
mapreduce.jobtracker.heartbeat.interval.min 心跳时间
北京市昌平区建材城西路金燕龙办公楼一层 电话:400-618-9090
mapred.heartbeats.in.second 集群每增加多少节点,时间增加下面的值
mapreduce.jobtracker.heartbeat.scaling.factor 集群每增加上面的个数,心跳增多少
启动带外心跳
mapreduce.tasktracker.outofband.heartbeat 默认是 false
配置多块磁盘
mapreduce.local.dir
配置 RPC hander 数目
mapred.job.tracker.handler.count 默认是 10,可以改成 50,根据机器的能力
配置 HTTP 线程数目
tasktracker.http.threads 默认是 40,可以改成 100 根据机器的能力
选择合适的压缩方式
以 snappy 为例:
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
<property>
<name>mapred.map.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
3-35)3 个 datanode 中有一个 个datanode 出现错误会怎样?
这个 datanode 的数据会在其他的 datanode 上重新做备份。
3-36)怎样决定mapreduce的中的map以及reduce的数量
在mapreduce中map是有块的大小来决定的,reduce的数量可以按照用户的业务来配置。
3-37)两个文件合并的问题
给定a、b两个文件,各存放50亿个url,每个url各占用64字节,内存限制是4G,如何找出a、b文件共同的url?
主要的思想是把文件分开进行计算,在对每个文件进行对比,得出相同的URL,因为以上说是含有相同的URL所以不用考虑数据倾斜的问题。详细的解题思路为:
可以估计每个文件的大小为5G*64=300G,远大于4G。所以不可能将其完全加载到内存中处理。考虑采取分而治之的方法。
遍历文件a,对每个url求取hash(url)%1000,然后根据所得值将url分别存储到1000个小文件(设为a0,a1,...a999)当中。这样每个小文件的大小约为300M。遍历文件b,采取和a相同的方法将url分别存储到1000个小文件(b0,b1....b999)中。这样处理后,所有可能相同的url都在对应的小文件(a0 vs b0, a1 vs b1....a999 vs b999)当中,不对应的小文件(比如a0 vs b99)不可能有相同的url。然后我们只要求出1000对小文件中相同的url即可。
比如对于a0 vs b0,我们可以遍历a0,将其中的url存储到hash_map当中。然后遍历b0,如果url在hash_map中,则说明此url在a和b中同时存在,保存到文件中即可。
如果分成的小文件不均匀,导致有些小文件太大(比如大于2G),可以考虑将这些太大的小文件再按类似的方法分成小小文件即可
3-38)怎样决定一个job的map和reduce的数量
map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,reduce端是复制map端的数据,相对于map端的任务,reduce节点资源是相对于比较缺少的,同时运行的速度会变慢,争取的任务的个数应该是0.95过着1.75。
3-39)hadoop的sequencefile的格式,并说明下什么是JAVA的序列化,如何实现JAVA的序列化
1、hadoop的序列化(sequencefile)是一二进制的形式来保存的
2、Java的序列化是讲对象的内容进行流化
3、实现序列化需要实现Serializable接口便可以了
3-40)简单概述一下hadoop1与hadoop2的区别
Hadoop2与hadoop1最大的区别在于HDFS的架构与mapreduce的很大的区别,而且速度上有很大的提升,hadoop2最主要的两个变化是:namenode可以集群的部署了,hadoop2中的mapreduce中的jobTracker中的资源调度器与生命周期管理拆分成两个独立的组件,并命名为YARN
3-41)YARN的新特性
YARN是hadoop2.x之后才出的,主要是hadoop的HA(也就是集群),磁盘的容错,资源调度器
3-42)hadoop join的原理
实现两个表的join首先在map端需要把表标示一下,把其中的一个表打标签,到reduce端再进行笛卡尔积的运算,就是reduce进行的实际的链接操作。
3-43)hadoop的二次排序
Hadoop默认的是HashPartitioner排序,当map端一个文件非常大另外一个文件非常小时就会产生资源的分配不均匀,既可以使用setPartitionerClass来设置分区,即形成了二次分区。
3-44)hadoop的mapreduce的排序发生在几个阶段?
发生在两个阶段即使map与reduce阶段
3-45)请描述mapreduce中shuffer阶段的工作流程,如何优化shuffer阶段的?
Mapreduce的shuffer是出在map task到reduce task的这段过程中,首先会进入到copy过程,会通过http方式请求map task所在的task Tracker获取map task 的输出的文件,因此当map task结束,这些文件就会落到磁盘中,merge实在map端的动作,只是在map拷贝过来的数值,会放到内存缓冲区中,给shuffer使用,reduce阶段,不断的merge后最终会把文件放到磁盘中。
3-46)mapreduce的combiner的作用是什么,什么时候不易使用??
Mapreduce中的Combiner就是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。
在数据量较少时不宜使用。
3-47)
Zookeeper 相关
4-1)写出你对zookeeper的理解
随着大数据的快速发展,多机器的协调工作,避免主要机器单点故障的问题,于是就引入管理机器的一个软件,他就是zookeeper来协助机器正常的运行。
Zookeeper有两个角色分别是leader与follower ,其中leader是主节点,其他的是副节点,在安装配置上一定要注意配置奇数个的机器上,便于zookeeper快速切换选举其他的机器。
在其他的软件执行任务时在zookeeper注册时会在zookeeper下生成相对应的目录,以便zookeeper去管理机器。
4-2)zookeeper 的搭建过程
主要是配置文件zoo.cfg 配置dataDir 的路径一句dataLogDir 的路径以及myid的配置以及server的配置,心跳端口与选举端口
Hive 相关
5-1)hive是怎样保存元数据的
保存元数据的方式有:内存数据库rerdy,本地MySQL数据库,远程mysql数据库,但是本地的mysql数据用的比较多,因为本地读写速度都比较快
5-2)外部表与内部表的区别
先来说下Hive中内部表与外部表的区别:
Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。在删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
5-3)对于 hive,你写过哪些 UDF 函数,作用是什么
UDF: user defined function 的缩写,编写hive udf的两种方式extends UDF 重写evaluate第二种extends GenericUDF重写initialize、getDisplayString、evaluate方法
5-4)Hive 的 sort by 和 order by 的区别
order by 会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。
sort by不是全局排序,其在数据进入reducer前完成排序.
因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1, 则sort by只保证每个reducer的输出有序,不保证全局有序。
5-5)hive保存元数据的方式以及各有什么特点?
1、Hive有内存数据库derby数据库,特点是保存数据小,不稳定
2、mysql数据库,储存方式可以自己设定,持久化好,一般企业开发都用mysql做支撑
5-6)在开发中问什么建议使用外部表?
1、外部表不会加载到hive中只会有一个引用加入到元数据中
2、在删除时不会删除表,只会删除元数据,所以不必担心数据的
5-7)hive partition 分区
分区表,动态分区
5-8)insert into 和 override write 区别?
insert into:将某一张表中的数据写到另一张表中
override write:覆盖之前的内容。
Hbase 相关
6-1)Hbase 的 rowkey 怎么创建比较好?列族怎么创建比较好?
Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。在查找时有索引会加快速度。
Rowkey散列原则 、 Rowkey唯一原则 、 针对事务数据Rowkey设计 、 针对统计数据的Rowkey设计 、 针对通用数据的Rowkey设计、 支持多条件查询的RowKey设计。
总结设计列族:
1、一般不建议设计多个列族
2、数据块的缓存的设计
3、激进缓存设计
4、布隆过滤器的设计(可以提高随机读取的速度)
5、生产日期的设计
6、列族压缩
7、单元时间版本
6-2)Hbase 的实现原理
Hbase 的实现原理是rpc Protocol
6-3) hbase 过滤器实现原则
感觉这个问题有问题,过滤器多的是啦,说的是哪一个不知道!!!!
hbase的过滤器有:RowFilter、PrefixFilter、KeyOnlyFilter、RandomRowFilter、InclusiveStopFilter、FirstKeyOnlyFilter、ColumnPrefixFilter、ValueFilter、ColumnCountGetFilter、SingleColumnValueFilter、SingleColumnValueExcludeFilter、WhileMatchFilter、FilterList
你看这么多过滤波器呢,谁知道你问的那个啊!!
比较常用的过滤器有:RowFilter 一看就知道是行的过滤器,来过滤行的信息。PrefixFilter前缀的过滤器,就是把前缀作为参数来查找数据呗!剩下的不解释了看过滤器的直接意思就OK了很简单。
6-4)描述 HBase, zookeeper 搭建过程
Zookeeper 的问题楼上爬爬有步骤,hbase 主要的配置文件有hbase.env.sh 主要配置的是JDK的路径以及是否使用外部的ZK,hbase-site.xml 主要配置的是与HDFS的链接的路径以及zk的信息,修改regionservers的链接其他机器的配置。
6-5)hive 如何调优?
在优化时要注意数据的问题,尽量减少数据倾斜的问题,减少job的数量,同事对小的文件进行成大的文件,如果优化的设计那就更好了,因为hive的运算就是mapReduce所以调节mapreduce的参数也会使性能提高,如调节task的数目。
6-6)hive的权限的设置
Hive的权限需要在hive-site.xml文件中设置才会起作用,配置默认的是false,需要把hive.security.authorization.enabled设置为true,并对不同的用户设置不同的权限,例如select ,drop等的操作。
6-7 ) hbase 写数据的原理
1. 首先,Client通过访问ZK来请求目标数据的地址。
2. ZK中保存了-ROOT-表的地址,所以ZK通过访问-ROOT-表来请求数据地址。
3. 同样,-ROOT-表中保存的是.META.的信息,通过访问.META.表来获取具体的RS。
4. .META.表查询到具体RS信息后返回具体RS地址给Client。
5. Client端获取到目标地址后,然后直接向该地址发送数据请求。
6-8)hbase宕机了如何处理?
HBase的RegionServer宕机超过一定时间后,HMaster会将其所管理的region重新分布到其他活动的RegionServer上,由于数据和日志都持久在HDFS中,
该操作不会导致数据丢失。所以数据的一致性和安全性是有保障的。
但是重新分配的region需要根据日志恢复原RegionServer中的内存MemoryStore表,这会导致宕机的region在这段时间内无法对外提供服务。
而一旦重分布,宕机的节点重新启动后就相当于一个新的RegionServer加入集群,为了平衡,需要再次将某些region分布到该server。
因此,Region Server的内存表memstore如何在节点间做到更高的可用,是HBase的一个较大的挑战。
6-9)Hbase 中的 metastore 用来做什么的?
Hbase的metastore是用来保存数据的,其中保存数据的方式有有三种第一种于第二种是本地储存,第二种是远程储存这一种企业用的比较多
6-10)hbase客户端在客户端怎样优化?
Hbase使用JAVA来运算的,索引Java的优化也适用于hbase,在使用过滤器事记得开启bloomfilter可以是性能提高3-4倍,设置HBASE_HEAPSIZE设置大一些
6-11)hbase是怎样预分区的?
如何去进行预分区,可以采用下面三步:
1.取样,先随机生成一定数量的rowkey,将取样数据按升序排序放到一个集合里
2.根据预分区的region个数,对整个集合平均分割,即是相关的splitKeys.
3.HBaseAdmin.createTable(HTableDescriptor tableDescriptor,byte[][] splitkeys)可以指定预分区的splitKey,即是指定region间的rowkey临界值
6-12)怎样将 mysql 的数据导入到 hbase 中?
不能使用 sqoop,速度太慢了,提示如下:
A、一种可以加快批量写入速度的方法是通过预先创建一些空的 regions,这样当
数据写入 HBase 时,会按照 region 分区情况,在集群内做数据的负载均衡。
B、hbase 里面有这样一个 hfileoutputformat 类,他的实现可以将数据转换成 hfile
格式,通过 new 一个这个类,进行相关配置,这样会在 hdfs 下面产生一个文件,这个
时候利用 hbase 提供的 jruby 的 loadtable.rb 脚本就可以进行批量导入。
6-13)谈谈 HBase 集群安装注意事项?
需要注意的地方是 ZooKeeper 的配置。这与 hbase-env.sh 文件相关,文件中
HBASE_MANAGES_ZK 环境变量用来设置是使用 hbase 默认自带的 Zookeeper 还
是使用独立的 ZooKeeper。HBASE_MANAGES_ZK=false 时使用独立的,为 true 时
使用默认自带的。
某个节点的 HRegionServer 启动失败,这是由于这 3 个节点的系统时间不一致相
差超过集群的检查时间 30s。
6-14)简述 HBase 的瓶颈
Hbase主要的瓶颈就是传输问题,在操作时大部分的操作都是需要对磁盘操作的
6-15)Redis, 传统数据库,hbase,hive 每个之间的区别
Redis 是基于内存的数据库,注重实用内存的计算,hbase是列式数据库,无法创建主键,地从是基于HDFS的,每一行可以保存很多的列,hive是数据的仓库,是为了减轻mapreduce而设计的,不是数据库是用来与红薯做交互的。
6-16)Hbase 的特性,以及你怎么去设计 rowkey 和 columnFamily ,怎么去建一个 table
因为hbase是列式数据库,列非表schema的一部分,所以只需要考虑rowkey和columnFamily 即可,rowkey有为的相关性,最好数据库添加一个前缀,文件越小,查询速度越快,再设计列是有一个列簇,但是列簇不宜过多。
6-17)Hhase与hive的区别
Apache HBase是一种Key/Value系统,它运行在HDFS之上。和Hive不一样,Hbase的能够在它的数据库上实时运行,而不是运行MapReduce任务。Hive被分区为表格,表格又被进一步分割为列簇。列簇必须使用schema定义,列簇将某一类型列集合起来(列不要求schema定义)。例如,“message”列簇可能包含:“to”, ”from” “date”,“subject”, 和”body”. 每一个 key/value对在Hbase中被定义为一个cell,每一个key由row-key,列簇、列和时间戳。在Hbase中,行是key/value映射的集合,这个映射通过row-key来唯一标识。Hbase利用Hadoop的基础设施,可以利用通用的设备进行水平的扩展。
6-18)描述hbase的scan和get功能以及实现的异同
HBase的查询实现只提供两种方式: 1、按指定RowKey获取唯一一条记录,get方法(org.apache.hadoop.hbase.client.Get) 2、按指定的条件获取一批记录,scan方法(org.apache.hadoop.hbase.client.Scan) 实现条件查询功能使用的就是scan方式
6-19)HBase scan setBatch和setCaching的区别
can可以通过setCaching与setBatch方法提高速度(以空间换时间),
setCaching设置的值为每次rpc的请求记录数,默认是1;cache大可以优化性能,但是太大了会花费很长的时间进行一次传输。
setBatch设置每次取的column size;有些row特别大,所以需要分开传给client,就是一次传一个row的几个column。
6-20)hbase 中cell的结构
cell中的数据是没有类型的,全部是字节码形式存贮。
6-21)hbase 中region太多和region太大带来的冲突
Hbase的region会自动split,当region太时,regio太大时分布会不均衡,同时对于大批量的代入数据建议如下:
1、还是必须让业务方对rowkey进行预分片,对业务数据rowkey进行md5或者其他的hash策略,让数据尽量随机分布而不是顺序写入。
2、随时观察region的大小,是否出现大region的情况。
Flume相关
7-1)flume 不采集 Nginx 日志,通过 Logger4j 采集日志,优缺点是什么?
在nginx采集日志时无法获取session的信息,然而logger4j则可以获取session的信息,logger4j的方式比较稳定,不会宕机。缺点:不够灵活,logger4j的方式和项目结合过滤紧密,二flume的方式就比较灵活,便于插拔式比较好,不会影响项目的性能。
7-2)flume 和 kafka 采集日志区别,采集日志时中间停了,怎么记录之前的日志。
Flume 采集日志是通过流的方式直接将日志收集到存储层,而 kafka 试讲日志缓存在 kafka
集群,待后期可以采集到存储层。Flume 采集中间停了,可以采用文件的方式记录之前的日志,而 kafka 是采用offset(偏移量) 的方式记录之前的日志。
Kafka 相关
8-1)kafka 中怎样储存数据,哟及结构的,data.....目录下有多少个分区,每个分区的存储格式是什么样的?
1、topic 是按照“主题名-分区”存储的
2、分区个数由配置文件决定
3、每个分区下最重要的两个文件是 0000000000.log 和 000000.index,0000000.log
以默认 1G 大小回滚。
Spark 相关
9-1)mr 和 spark 区别,怎么理解 spark-rdd
Mr 是文件方式的分布式计算框架,是将中间结果和最终结果记录在文件中,map 和 reduce的数据分发也是在文件中。
Spark 是内存迭代式的计算框架,计算的中间结果可以缓存内存,也可以缓存硬盘,但是不是每一步计算都需要缓存的。
Spark-rdd 是一个数据的分区记录集合,是利用内存来计算的,spark之所以快是因为有内存的模式
9-2)简单描述spark的wordCount的执行过程
Scala> sc.textFile("/usr/local/words.txt")
res0: org.apache.spark.rdd.RDD[String] = /usr/local/words.txt MapPartitionsRDD[1] at textFile at <console>:22
scala> sc.textFile("/usr/local/words.txt").flatMap(_.split(" "))
res2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at <console>:22
scala> sc.textFile("/usr/local/words.txt").flatMap(_.split(" ")).map((_,1))
res3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:22
scala> sc.textFile("/usr/local/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
res5: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[17] at reduceByKey at <console>:22
scala> sc.textFile("/usr/local/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res6: Array[(String, Int)] = Array((dageda,1), (xiaoli,1), (hellow,4), (xisdsd,1), (xiaozhang,1))
9-3)按照需求使用spark编写一下程序
A、当前文件a.text的格式为,请统计每个单词出现的个数
A,b,c,d
B,b,f,e
A,a,c,f
sc.textFile(“/user/local/a.text”).flatMap(_.split(“,”)).map((_,1)).ReduceByKey(_+_).collect()
或:
package cn.bigdata
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object Demo {
/*
a,b,c,d
b,b,f,e
a,a,c,f
c,c,a,d
* 计算第四列每个元素出现的个数
*/
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val data: RDD[String] = sc.textFile("f://demo.txt")
//数据切分
val fourthData: RDD[(String, Int)] = data.map { x =>
val arr: Array[String] = x.split(",")
val fourth: String = arr(3)
(fourth, 1)
}
val result: RDD[(String, Int)] = fourthData.reduceByKey(_ + _);
println(result.collect().toBuffer)
}
}
B、HDFS中有两个文件a.text与b.text,文件的格式为(ip,username),如:a.text,b.text
a.text
127.0.0.1 xiaozhang
127.0.0.1 xiaoli
127.0.0.2 wangwu
127.0.0.3 lisi
B.text
127.0.0.4 lixiaolu
127.0.0.5 lisi
每个文件至少有1000万行,请用程序完成一下工作,
1)每个文件的个子的IP
2)出现在b.text而没有出现在a.text的IP
3)每个user出现的次数以及每个user对应的IP的个数
代码如下:
1)各个文件的ip数
package cn.bigdata
import java.util.concurrent.Executors
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.LocatedFileStatus
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.RemoteIterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
//各个文件的ip数
object Demo2 {
val cachedThreadPool = Executors.newCachedThreadPool()
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("demo2").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
val hdpConf: Configuration = new Configuration
val fs: FileSystem = FileSystem.get(hdpConf)
val listFiles: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path("f://txt/2/"), true)
while (listFiles.hasNext) {
val fileStatus = listFiles.next
val pathName = fileStatus.getPath.getName
cachedThreadPool.execute(new Runnable() {
override def run(): Unit = {
println("=======================" + pathName)
analyseData(pathName, sc)
}
})
}
}
def analyseData(pathName: String, sc: SparkContext): Unit = {
val data: RDD[String] = sc.textFile("f://txt/2/" + pathName)
val dataArr: RDD[Array[String]] = data.map(_.split(" "))
val ipAndOne: RDD[(String, Int)] = dataArr.map(x => {
val ip = x(0)
(ip, 1)
})
val counts: RDD[(String, Int)] = ipAndOne.reduceByKey(_ + _)
val sortedSort: RDD[(String, Int)] = counts.sortBy(_._2, false)
sortedSort.saveAsTextFile("f://txt/3/" + pathName)
}
}
2)出现在b.txt而没有出现在a.txt的ip
package cn.bigdata
import java.util.concurrent.Executors
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/*
* 出现在b.txt而没有出现在a.txt的ip
*/
object Demo3 {
val cachedThreadPool = Executors.newCachedThreadPool()
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Demo3").setMaster("local")
val sc = new SparkContext(conf)
val data_a = sc.textFile("f://txt/2/a.txt")
val data_b = sc.textFile("f://txt/2/b.txt")
val splitArr_a = data_a.map(_.split(" "))
val ip_a: RDD[String] = splitArr_a.map(x => x(0))
val splitArr_b = data_b.map(_.split(" "))
val ip_b: RDD[String] = splitArr_b.map(x => x(0))
val subRdd: RDD[String] = ip_b.subtract(ip_a)
subRdd.saveAsTextFile("f://txt/4/")
}
}
3)
package cn.bigdata
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.collection.mutable.Set
/*
* 每个user出现的次数以及每个user对应的ip数
*/
object Demo4 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Demo4").setMaster("local")
val sc = new SparkContext(conf)
val data: RDD[String] = sc.textFile("f://txt/5/")
val lines = data.map(_.split(" "))
val userIpOne = lines.map(x => {
val ip = x(0)
val user = x(1)
(user, (ip, 1))
})
val userListIpCount: RDD[(String, (Set[String], Int))] = userIpOne.combineByKey(
x => (Set(x._1), x._2),
(a: (Set[String], Int), b: (String, Int)) => {
(a._1 + b._1, a._2 + b._2)
},
(m: (Set[String], Int), n: (Set[String], Int)) => {
(m._1 ++ n._1, m._2 + n._2)
})
val result: RDD[String] = userListIpCount.map(x => {
x._1 + ":userCount:" + x._2._2 + ",ipCount:" + x._2._1.size
})
println(result.collect().toBuffer)
}
}
Sqoop 相关
10-1)sqoop在导入到MySql数据库是怎样保证数据重复,如果重复了该怎么办??
在导入时在语句的后面加上一下命令作为节点:
--incremental append \
--check-column id \
--last-value 1208
Redis 相关
10-1)redis保存磁盘的时间
# Note: you can disable saving at all commenting all the "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""
save 900 1
save 300 10
save 60 10000
环境配置
1)你们的集群规模?
这个得看个人在公司的规模,下面介绍一下我们公司的一些配置:
联想System x3750 服务器,价格3.5万,内存容量32G,产品类型机架式,硬盘接口SSD,CPU频率2.6GH,CPU数量2颗,三级缓存15MB,cpu核心6核,cpu线程数12线程,最大内存支持1.5T,网络是千兆网卡,可插拔时硬盘接口12个卡槽,配置1T的容量
详细:http://detail.zol.com.cn/server/index1101243.shtml
名字 软件 运行管理
Hadoop1 JDK,hadoop namenode
Hadoop2 JDK,hadoop namenode
Hadoop3 JDK,hadoop secondaryNamenode
Hadoop4 JDK,hadoop secondaryNamenode
Hadoop5 JDK,hadoop datanode
Hadoop6 JDK,hadoop datanode
Hadoop7 JDK,hadoop datanode
Hadoop8 JDK,hadoop datanode
Hadoop9 JDK,hadoop datanode
Hadoop10 JDK,zookeeper,tomcat,mvn,kafka leader
Hadoop11 JDK,zookeeper,tomcat,mvn,kafka follower
Hadoop12 JDK,zookeeper,tomcat,mvn,kafka follower
Hadoop13 JDK,hive,mysql,svn,logstarh hive,mysql,svn
Hadoop14 JDK,hbase,mysql备份 datanode
Hadoop15 JDK,nginx,Log日志手机 datanode
数据就是每天访问的Log日志不是很大,有的时候大有的时候小的可怜
2)你在项目中遇到了哪些难题,是怎么解决的?
1、在执行任务时发现副本的个数不对,经过一番的查找发现是超时的原因,修改了配置文件hdfs-site.xml:中修改了超时时间。
2、由于当时在分配各个目录空间大小时,没有很好的分配导致有的目录的空间浪费,于是整体商量后把储存的空间调大了一些。
设计题
1-1)采集nginx产生的日志,日志的格式为user ip time url htmlId 每天产生的文件的数据量上亿条,请设计方案把数据保存到HDFS上,并提供一下实时查询的功能(响应时间小于3s)
A、某个用户某天访问某个URL的次数
B、某个URL某天被访问的总次数
实时思路是:使用Logstash + Kafka + Spark-streaming + redis + 报表展示平台
离线的思路是:Logstash + Kafka + Elasticsearch + Spark-streaming + 关系型数据库
A、B、数据在进入到Spark-streaming 中进行过滤,把符合要求的数据保存到Redis中
*****************************************************************************
面试回来之后把其中比较重要的问题记了下来写了个总结:
(答案在后面)
1、简答说一下hadoop的map-reduce编程模型
2、hadoop的TextInputFormat作用是什么,如何自定义实现
3、hadoop和spark的都是并行计算,那么他们有什么相同和区别
4、为什么要用flume导入hdfs,hdfs的构架是怎样的
5、map-reduce程序运行的时候会有什么比较常见的问题
6、简单说一下hadoop和spark的shuffle过程
以下是自己的理解,如果有不对的地方希望各位大侠可以帮我指出来~:
1、简答说一下hadoop的map-reduce编程模型
首先map task会从本地文件系统读取数据,转换成key-value形式的键值对集合
使用的是hadoop内置的数据类型,比如longwritable、text等
将键值对集合输入mapper进行业务处理过程,将其转换成需要的key-value在输出
之后会进行一个partition分区操作,默认使用的是hashpartitioner,可以通过重写hashpartitioner的getpartition方法来自定义分区规则
之后会对key进行进行sort排序,grouping分组操作将相同key的value合并分组输出,在这里可以使用自定义的数据类型,重写WritableComparator的Comparator方法来自定义排序规则,重写RawComparator的compara方法来自定义分组规则
之后进行一个combiner归约操作,其实就是一个本地段的reduce预处理,以减小后面shufle和reducer的工作量
reduce task会通过网络将各个数据收集进行reduce处理,最后将数据保存或者显示,结束整个job
2、hadoop的TextInputFormat作用是什么,如何自定义实现
InputFormat会在map操作之前对数据进行两方面的预处理
1是getSplits,返回的是InputSplit数组,对数据进行split分片,每片交给map操作一次
2是getRecordReader,返回的是RecordReader对象,对每个split分片进行转换为key-value键值对格式传递给map
常用的InputFormat是TextInputFormat,使用的是LineRecordReader对每个分片进行键值对的转换,以行偏移量作为键,行内容作为值
自定义类继承InputFormat接口,重写createRecordReader和isSplitable方法
在createRecordReader中可以自定义分隔符
3、hadoop和spark的都是并行计算,那么他们有什么相同和区别
两者都是用mr模型来进行并行计算,hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束
spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job
这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算
hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错
4、为什么要用flume导入hdfs,hdfs的构架是怎样的
flume可以实时的导入数据到hdfs中,当hdfs上的文件达到一个指定大小的时候会形成一个文件,或者超过指定时间的话也形成一个文件
文件都是存储在datanode上面的,namenode记录着datanode的元数据信息,而namenode的元数据信息是存在内存中的,所以当文件切片很小或者很多的时候会卡死
5、map-reduce程序运行的时候会有什么比较常见的问题
比如说作业中大部分都完成了,但是总有几个reduce一直在运行
这是因为这几个reduce中的处理的数据要远远大于其他的reduce,可能是因为对键值对任务划分的不均匀造成的数据倾斜
解决的方法可以在分区的时候重新定义分区规则对于value数据很多的key可以进行拆分、均匀打散等处理,或者是在map端的combiner中进行数据预处理的操作
6、简单说一下hadoop和spark的shuffle过程
hadoop:map端保存分片数据,通过网络收集到reduce端
spark:spark的shuffle是在DAGSchedular划分Stage的时候产生的,TaskSchedule要分发Stage到各个worker的executor
减少shuffle可以提高性能
部分答案不是十分准确欢迎补充:-)
——-补充更新———
1、Hive中存放是什么?
表。
存的是和hdfs的映射关系,hive是逻辑上的数据仓库,实际操作的都是hdfs上的文件,HQL就是用sql语法来写的mr程序。
2、Hive与关系型数据库的关系?
没有关系,hive是数据仓库,不能和数据库一样进行实时的CURD操作。
是一次写入多次读取的操作,可以看成是ETL工具。
3、Flume工作机制是什么?
核心概念是agent,里面包括source、chanel和sink三个组件。
source运行在日志收集节点进行日志采集,之后临时存储在chanel中,sink负责将chanel中的数据发送到目的地。
只有成功发送之后chanel中的数据才会被删除。
首先书写flume配置文件,定义agent、source、chanel和sink然后将其组装,执行flume-ng命令。
4、Sqoop工作原理是什么?
hadoop生态圈上的数据传输工具。
可以将关系型数据库的数据导入非结构化的hdfs、hive或者bbase中,也可以将hdfs中的数据导出到关系型数据库或者文本文件中。
使用的是mr程序来执行任务,使用jdbc和关系型数据库进行交互。
import原理:通过指定的分隔符进行数据切分,将分片传入各个map中,在map任务中在每行数据进行写入处理没有reduce。
export原理:根据要操作的表名生成一个java类,并读取其元数据信息和分隔符对非结构化的数据进行匹配,多个map作业同时执行写入关系型数据库
5、Hbase行健列族的概念,物理模型,表的设计原则?
行健:是hbase表自带的,每个行健对应一条数据。
列族:是创建表时指定的,为列的集合,每个列族作为一个文件单独存储,存储的数据都是字节数组,其中的数据可以有很多,通过时间戳来区分。
物理模型:整个hbase表会拆分为多个region,每个region记录着行健的起始点保存在不同的节点上,查询时就是对各个节点的并行查询,当region很大时使用.META表存储各个region的起始点,-ROOT又可以存储.META的起始点。
rowkey的设计原则:各个列簇数据平衡,长度原则、相邻原则,创建表的时候设置表放入regionserver缓存中,避免自动增长和时间,使用字节数组代替string,最大长度64kb,最好16字节以内,按天分表,两个字节散列,四个字节存储时分毫秒。
列族的设计原则:尽可能少(按照列族进行存储,按照region进行读取,不必要的io操作),经常和不经常使用的两类数据放入不同列族中,列族名字尽可能短。
6、Spark Streaming和Storm有何区别?
一个实时毫秒一个准实时亚秒,不过storm的吞吐率比较低。
7、mllib支持的算法?
大体分为四大类,分类、聚类、回归、协同过滤。
8、简答说一下hadoop的map-reduce编程模型?
首先map task会从本地文件系统读取数据,转换成key-value形式的键值对集合。
将键值对集合输入mapper进行业务处理过程,将其转换成需要的key-value在输出。
之后会进行一个partition分区操作,默认使用的是hashpartitioner,可以通过重写hashpartitioner的getpartition方法来自定义分区规则。
之后会对key进行进行sort排序,grouping分组操作将相同key的value合并分组输出。
在这里可以使用自定义的数据类型,重写WritableComparator的Comparator方法来自定义排序规则,重写RawComparator的compara方法来自定义分组规则。
之后进行一个combiner归约操作,其实就是一个本地段的reduce预处理,以减小后面shufle和reducer的工作量。
reduce task会通过网络将各个数据收集进行reduce处理,最后将数据保存或者显示,结束整个job。
9、Hadoop平台集群配置、环境变量设置?
zookeeper:修改zoo.cfg文件,配置dataDir,和各个zk节点的server地址端口,tickTime心跳时间默认是2000ms,其他超时的时间都是以这个为基础的整数倍,之后再dataDir对应目录下写入myid文件和zoo.cfg中的server相对应。
hadoop:修改
hadoop-env.sh配置java环境变量
core-site.xml配置zk地址,临时目录等
hdfs-site.xml配置nn信息,rpc和http通信地址,nn自动切换、zk连接超时时间等
yarn-site.xml配置resourcemanager地址
mapred-site.xml配置使用yarn
slaves配置节点信息
格式化nn和zk。
hbase:修改
hbase-env.sh配置java环境变量和是否使用自带的zk
hbase-site.xml配置hdfs上数据存放路径,zk地址和通讯超时时间、master节点
regionservers配置各个region节点
zoo.cfg拷贝到conf目录下
spark:
安装Scala
修改spark-env.sh配置环境变量和master和worker节点配置信息
环境变量的设置:直接在/etc/profile中配置安装的路径即可,或者在当前用户的宿主目录下,配置在.bashrc文件中,该文件不用source重新打开shell窗口即可,配置在.bash_profile的话只对当前用户有效。
10、Hadoop性能调优?
调优可以通过系统配置、程序编写和作业调度算法来进行。
hdfs的block.size可以调到128/256(网络很好的情况下,默认为64)
调优的大头:mapred.map.tasks、mapred.reduce.tasks设置mr任务数(默认都是1)
mapred.tasktracker.map.tasks.maximum每台机器上的最大map任务数
mapred.tasktracker.reduce.tasks.maximum每台机器上的最大reduce任务数
mapred.reduce.slowstart.completed.maps配置reduce任务在map任务完成到百分之几的时候开始进入
这个几个参数要看实际节点的情况进行配置,reduce任务是在33%的时候完成copy,要在这之前完成map任务,(map可以提前完成)
mapred.compress.map.output,mapred.output.compress配置压缩项,消耗cpu提升网络和磁盘io
合理利用combiner
注意重用writable对象
11、Hadoop高并发?
首先肯定要保证集群的高可靠性,在高并发的情况下不会挂掉,支撑不住可以通过横向扩展。
datanode挂掉了使用hadoop脚本重新启动。
12、hadoop的TextInputFormat作用是什么,如何自定义实现?
InputFormat会在map操作之前对数据进行两方面的预处理。
1是getSplits,返回的是InputSplit数组,对数据进行split分片,每片交给map操作一次 。
2是getRecordReader,返回的是RecordReader对象,对每个split分片进行转换为key-value键值对格式传递给map。
常用的InputFormat是TextInputFormat,使用的是LineRecordReader对每个分片进行键值对的转换,以行偏移量作为键,行内容作为值。
自定义类继承InputFormat接口,重写createRecordReader和isSplitable方法 。
在createRecordReader中可以自定义分隔符。
13、hadoop和spark的都是并行计算,那么他们有什么相同和区别?
两者都是用mr模型来进行并行计算,hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。
这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。
hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。
14、为什么要用flume导入hdfs,hdfs的构架是怎样的?
flume可以实时的导入数据到hdfs中,当hdfs上的文件达到一个指定大小的时候会形成一个文件,或者超过指定时间的话也形成一个文件。
文件都是存储在datanode上面的,namenode记录着datanode的元数据信息,而namenode的元数据信息是存在内存中的,所以当文件切片很小或者很多的时候会卡死。
15、map-reduce程序运行的时候会有什么比较常见的问题?
比如说作业中大部分都完成了,但是总有几个reduce一直在运行。
这是因为这几个reduce中的处理的数据要远远大于其他的reduce,可能是因为对键值对任务划分的不均匀造成的数据倾斜。
解决的方法可以在分区的时候重新定义分区规则对于value数据很多的key可以进行拆分、均匀打散等处理,或者是在map端的combiner中进行数据预处理的操作。
16、简单说一下hadoop和spark的shuffle过程?
hadoop:map端保存分片数据,通过网络收集到reduce端。
spark:spark的shuffle是在DAGSchedular划分Stage的时候产生的,TaskSchedule要分发Stage到各个worker的executor。
减少shuffle可以提高性能。
17、RDD机制?
rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。
所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。
rdd执行过程中会形成dag图,然后形成lineage保证容错性等。
从物理的角度来看rdd存储的是block和node之间的映射。
18、spark有哪些组件?
(1)master:管理集群和节点,不参与计算。
(2)worker:计算节点,进程本身不参与计算,和master汇报。
(3)Driver:运行程序的main方法,创建spark context对象。
(4)spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
(5)client:用户提交程序的入口。
19、spark工作机制?
用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。
执行add算子,形成dag图输入dagscheduler,按照add之间的依赖关系划分stage输入task scheduler。
task scheduler会将stage划分为task set分发到各个节点的executor中执行。
20、spark的优化怎么做?
通过spark-env文件、程序中sparkconf和set property设置。
(1)计算量大,形成的lineage过大应该给已经缓存了的rdd添加checkpoint,以减少容错带来的开销。
(2)小分区合并,过小的分区造成过多的切换任务开销,使用repartition。
21、kafka工作原理?
producer向broker发送事件,consumer从broker消费事件。
事件由topic区分开,每个consumer都会属于一个group。
相同group中的consumer不能重复消费事件,而同一事件将会发送给每个不同group的consumer。
22、ALS算法原理?
答:对于user-product-rating数据,als会建立一个稀疏的评分矩阵,其目的就是通过一定的规则填满这个稀疏矩阵。
als会对稀疏矩阵进行分解,分为用户-特征值,产品-特征值,一个用户对一个产品的评分可以由这两个矩阵相乘得到。
通过固定一个未知的特征值,计算另外一个特征值,然后交替反复进行最小二乘法,直至差平方和最小,即可得想要的矩阵。
23、kmeans算法原理?
随机初始化中心点范围,计算各个类别的平均值得到新的中心点。
重新计算各个点到中心值的距离划分,再次计算平均值得到新的中心点,直至各个类别数据平均值无变化。
24、canopy算法原理?
根据两个阈值来划分数据,以随机的一个数据点作为canopy中心。
计算其他数据点到其的距离,划入t1、t2中,划入t2的从数据集中删除,划入t1的其他数据点继续计算,直至数据集中无数据。
25、朴素贝叶斯分类算法原理?
对于待分类的数据和分类项,根据待分类数据的各个特征属性,出现在各个分类项中的概率判断该数据是属于哪个类别的。
26、关联规则挖掘算法apriori原理?
一个频繁项集的子集也是频繁项集,针对数据得出每个产品的支持数列表,过滤支持数小于预设值的项,对剩下的项进行全排列,重新计算支持数,再次过滤,重复至全排列结束,可得到频繁项和对应的支持数。
作者:@小黑
以下是自己的理解,如果有不对的地方希望各位大侠可以帮我指出来~:
1、简答说一下hadoop的map-reduce编程模型
首先map task会从本地文件系统读取数据,转换成key-value形式的键值对集合
使用的是hadoop内置的数据类型,比如longwritable、text等
将键值对集合输入mapper进行业务处理过程,将其转换成需要的key-value在输出
之后会进行一个partition分区操作,默认使用的是hashpartitioner,可以通过重写hashpartitioner的getpartition方法来自定义分区规则
之后会对key进行进行sort排序,grouping分组操作将相同key的value合并分组输出,在这里可以使用自定义的数据类型,重写WritableComparator的Comparator方法来自定义排序规则,重写RawComparator的compara方法来自定义分组规则
之后进行一个combiner归约操作,其实就是一个本地段的reduce预处理,以减小后面shufle和reducer的工作量
reduce task会通过网络将各个数据收集进行reduce处理,最后将数据保存或者显示,结束整个job
2、hadoop的TextInputFormat作用是什么,如何自定义实现
InputFormat会在map操作之前对数据进行两方面的预处理
1是getSplits,返回的是InputSplit数组,对数据进行split分片,每片交给map操作一次
2是getRecordReader,返回的是RecordReader对象,对每个split分片进行转换为key-value键值对格式传递给map
常用的InputFormat是TextInputFormat,使用的是LineRecordReader对每个分片进行键值对的转换,以行偏移量作为键,行内容作为值
自定义类继承InputFormat接口,重写createRecordReader和isSplitable方法
在createRecordReader中可以自定义分隔符
3、hadoop和spark的都是并行计算,那么他们有什么相同和区别
两者都是用mr模型来进行并行计算,hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束
spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job
这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算
hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系
spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错
4、为什么要用flume导入hdfs,hdfs的构架是怎样的
flume可以实时的导入数据到hdfs中,当hdfs上的文件达到一个指定大小的时候会形成一个文件,或者超过指定时间的话也形成一个文件
文件都是存储在datanode上面的,namenode记录着datanode的元数据信息,而namenode的元数据信息是存在内存中的,所以当文件切片很小或者很多的时候会卡死
5、map-reduce程序运行的时候会有什么比较常见的问题
比如说作业中大部分都完成了,但是总有几个reduce一直在运行
这是因为这几个reduce中的处理的数据要远远大于其他的reduce,可能是因为对键值对任务划分的不均匀造成的数据倾斜
解决的方法可以在分区的时候重新定义分区规则对于value数据很多的key可以进行拆分、均匀打散等处理,或者是在map端的combiner中进行数据预处理的操作
6、简单说一下hadoop和spark的shuffle过程
hadoop:map端保存分片数据,通过网络收集到reduce端
spark:spark的shuffle是在DAGSchedular划分Stage的时候产生的,TaskSchedule要分发Stage到各个worker的executor
减少shuffle可以提高性能
部分答案不是十分准确欢迎补充:-)
******************************************************************
单项选择题
1. 下面哪个程序负责 HDFS 数据存储。
a)NameNode
b)Jobtracker
c)Datanode
d)secondaryNameNode
e)tasktracker
2. HDfS 中的 block 默认保存几份?
a)3 份
b)2 份
c)1 份
d)不确定
3. 下列哪个程序通常与 NameNode 在一个节点启动?
a)SecondaryNameNode
b)DataNode
c)TaskTracker
d)Jobtracker
4. Hadoop 作者
a)Martin Fowler
b)Kent Beck
c)Doug cutting
5. HDFS 默认 Block Size
a)32MB
b)64MB
c)128MB
6. 下列哪项通常是集群的最主要瓶颈
a)CPU
b)网络
c)磁盘
d)内存
7. 关于 SecondaryNameNode 哪项是正确的?
a)它是 NameNode 的热备
b)它对内存没有要求
c)它的目的是帮助 NameNode 合并编辑日志,减少 NameNode 启动时间
d)SecondaryNameNode 应与 NameNode 部署到一个节点
多选题:
8. 下列哪项可以作为集群的管理工具
a)Puppet
b)Pdsh
c)Cloudera Manager
d)d)Zookeeper
9. 配置机架感知的下面哪项正确
a)如果一个机架出问题,不会影响数据读写
b)写入数据的时候会写到不同机架的 DataNode 中
c)MapReduce 会根据机架获取离自己比较近的网络数据
10. Client 端上传文件的时候下列哪项正确
a)数据经过 NameNode 传递给 DataNode
b)Client 端将文件切分为 Block,依次上传
c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作
11. 下列哪个是 Hadoop 运行的模式
a)单机版
b)伪分布式
c)分布式
12. Cloudera 提供哪几种安装 CDH 的方法
a)Cloudera manager
b)Tar ball
c)Yum d)Rpm
判断题:
13. Ganglia 不仅可以进行监控,也可以进行告警。( )
14. Block Size 是不可以修改的。( )
15. Nagios 不可以监控 Hadoop 集群,因为它不提供 Hadoop 支持。( )
16. 如果 NameNode 意外终止,SecondaryNameNode 会接替它使集群继续工作。( )
17. Cloudera CDH 是需要付费使用的。( )
18. Hadoop 是 Java 开发的,所以 MapReduce 只支持 Java 语言编写。( )
19. Hadoop 支持数据的随机读写。( )
20. NameNode 负责管理 metadata,client 端每次读写请求,它都会从磁盘中读取或则会写入 metadata 信息并反馈 client 端。( )
21. NameNode 本地磁盘保存了 Block 的位置信息。( )
22. DataNode 通过长连接与 NameNode 保持通信。( )
23. Hadoop 自身具有严格的权限管理和安全措施保障集群正常运行。( )
24. Slave 节点要存储数据,所以它的磁盘越大越好。( )
25. hadoop dfsadmin –report 命令用于检测 HDFS 损坏块。( )
26. Hadoop 默认调度器策略为 FIFO( )
27. 集群内每个节点都应该配 RAID,这样避免单磁盘损坏,影响整个节点运行。( )
28. 因为 HDFS 有多个副本,所以 NameNode 是不存在单点问题的。( )
29. 每个 map 槽就是一个线程。( )
30. Mapreduce 的 input split 就是一个 block。( )
31. NameNode 的 Web UI 端口是 50030,它通过 jetty 启动的 Web 服务。( )
32. Hadoop 环境变量中的 HADOOP_HEAPSIZE 用于设置所有 Hadoop 守护线程的内存。它默认是 200 GB。( )
33. DataNode 首次加入 cluster 的时候,如果 log 中报告不兼容文件版本,那需要 NameNode执行“Hadoop namenode -format”操作格式化磁盘。( )
别走开,答案在后面哦!
1. 下面哪个程序负责 HDFS 数据存储。答案C datanode
a)NameNode
b)Jobtracker
c)Datanode
d)secondaryNameNode
e)tasktracker
2. HDfS 中的 block 默认保存几份? 答案A默认3分
a)3 份
b)2 份
c)1 份
d)不确定
3. 下列哪个程序通常与 NameNode 在一个节点启动?答案D
a)SecondaryNameNode
b)DataNode
c)TaskTracker
d)Jobtracker
此题分析:
hadoop的集群是基于master/slave模式,namenode和jobtracker属于master,datanode和tasktracker属于slave,master只有一个,而slave有多个SecondaryNameNode内存需求和NameNode在一个数量级上,所以通常secondary NameNode(运行在单独的物理机器上)和NameNode运行在不同的机器上。
JobTracker和TaskTracker
JobTracker 对应于 NameNode
TaskTracker 对应于 DataNode
DataNode 和NameNode 是针对数据存放来而言的
JobTracker和TaskTracker是对于MapReduce执行而言的
mapreduce中几个主要概念,mapreduce整体上可以分为这么几条执行线索:obclient,JobTracker与TaskTracker。
1、JobClient会在用户端通过JobClient类将应用已经配置参数打包成jar文件存储到hdfs,并把路径提交到Jobtracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。
2、JobTracker是一个master服务,软件启动之后JobTracker接收Job,负责调度Job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。
3、TaskTracker是运行在多个节点上的slaver服务。TaskTracker主动与JobTracker通信,接收作业,并负责直接执行每一个任务。TaskTracker都需要运行在HDFS的DataNode上。
4. Hadoop 作者 答案C Doug cutting
a)Martin Fowler
b)Kent Beck
c)Doug cutting
5. HDFS 默认 Block Size 答案:B
a)32MB
b)64MB
c)128MB
(因为版本更换较快,这里答案只供参考)
6. 下列哪项通常是集群的最主要瓶颈:答案:C磁盘
a)CPU
b)网络
c)磁盘IO
d)内存
该题解析:
首先集群的目的是为了节省成本,用廉价的pc机,取代小型机及大型机。小型机和大型机有什么特点?
1.cpu处理能力强
2.内存够大
所以集群的瓶颈不可能是a和d
3.网络是一种稀缺资源,但是并不是瓶颈。
4.由于大数据面临海量数据,读写数据都需要io,然后还要冗余数据,hadoop一般备3份数据,所以IO就会打折扣。
7. 关于 SecondaryNameNode 哪项是正确的?答案C
a)它是 NameNode 的热备
b)它对内存没有要求
c)它的目的是帮助 NameNode 合并编辑日志,减少 NameNode 启动时间
d)SecondaryNameNode 应与 NameNode 部署到一个节点。
多选题:
8. 下列哪项可以作为集群的管理?答案:ABD
a)Puppet
b)Pdsh
c)Cloudera Manager
d)Zookeeper
9. 配置机架感知的下面哪项正确:答案ABC
a)如果一个机架出问题,不会影响数据读写
b)写入数据的时候会写到不同机架的 DataNode 中
c)MapReduce 会根据机架获取离自己比较近的网络数据
10. Client 端上传文件的时候下列哪项正确?答案B
a)数据经过 NameNode 传递给 DataNode
b)Client 端将文件切分为 Block,依次上传
c)Client 只上传数据到一台 DataNode,然后由 NameNode 负责 Block 复制工作
该题分析:
Client向NameNode发起文件写入的请求。
NameNode根据文件大小和文件块配置情况,返回给Client它所管理部分DataNode的信息。
Client将文件划分为多个Block,根据DataNode的地址信息,按顺序写入到每一个DataNode块中。
11. 下列哪个是 Hadoop 运行的模式:答案ABC
a)单机版
b)伪分布式
c)分布式
12. Cloudera 提供哪几种安装 CDH 的方法?答案:ABCD
a)Cloudera manager
b)Tarball
c)Yum
d)Rpm
判断题:
13. Ganglia 不仅可以进行监控,也可以进行告警。(正确)
分析:此题的目的是考Ganglia的了解。严格意义上来讲是正确。ganglia作为一款最常用的Linux环境中的监控软件,它擅长的的是从节点中按照用户的需求以较低的代价采集数据。但是ganglia在预警以及发生事件后通知用户上并不擅长。最新的ganglia已经有了部分这方面的功能。但是更擅长做警告的还有Nagios。Nagios,就是一款精于预警、通知的软件。通过将Ganglia和Nagios组合起来,把Ganglia采集的数据作为Nagios的数据源,然后利用Nagios来发送预警通知,可以完美的实现一整套监控管理的系统。
14. Block Size 是不可以修改的。(错误)
分析:它是可以被修改的Hadoop的基础配置文件是hadoop-default.xml,默认建立一个Job的时候会建立Job的Config,Config首先读入hadoop-default.xml的配置,然后再读入hadoop-site.xml的配置(这个文件初始的时候配置为空),hadoop-site.xml中主要配置需要覆盖的hadoop-default.xml的系统级配置。
15. Nagios 不可以监控 Hadoop 集群,因为它不提供 Hadoop 支持。(错误)
分析:Nagios是集群监控工具,而且是云计算三大利器之一
16. 如果 NameNode 意外终止,SecondaryNameNode 会接替它使集群继续工作。(错误)
分析:SecondaryNameNode是帮助恢复,而不是替代,如何恢复,可以查看.
17. Cloudera CDH 是需要付费使用的。(错误)
分析:第一套付费产品是Cloudera Enterpris,Cloudera Enterprise在美国加州举行的 Hadoop 大会 (Hadoop Summit) 上公开,以若干私有管理、监控、运作工具加强 Hadoop 的功能。收费采取合约订购方式,价格随用的 Hadoop 叢集大小变动。
18. Hadoop 是 Java 开发的,所以 MapReduce 只支持 Java 语言编写。(错误)
分析:rhadoop是用R语言开发的,MapReduce是一个框架,可以理解是一种思想,可以使用其他语言开发。
19. Hadoop 支持数据的随机读写。(错)
分析:lucene是支持随机读写的,而hdfs只支持随机读。但是HBase可以来补救。HBase提供随机读写,来解决Hadoop不能处理的问题。HBase自底层设计开始即聚焦于各种可伸缩性问题:表可以很“高”,有数十亿个数据行;也可以很“宽”,有数百万个列;水平分区并在上千个普通商用机节点上自动复制。表的模式是物理存储的直接反映,使系统有可能提高高效的数据结构的序列化、存储和检索。
20. NameNode 负责管理 metadata,client 端每次读写请求,它都会从磁盘中读取或则会写入 metadata 信息并反馈 client 端。(错误)
此题分析:
NameNode 不需要从磁盘读取 metadata,所有数据都在内存中,硬盘上的只是序列化的结果,只有每次 namenode 启动的时候才会读取。
1)文件写入
Client向NameNode发起文件写入的请求。
NameNode根据文件大小和文件块配置情况,返回给Client它所管理部分DataNode的信息。
Client将文件划分为多个Block,根据DataNode的地址信息,按顺序写入到每一个DataNode块中。
2)文件读取
Client向NameNode发起文件读取的请求。
21. NameNode 本地磁盘保存了 Block 的位置信息。(个人认为正确,欢迎提出其它意见)
分析:DataNode是文件存储的基本单元,它将Block存储在本地文件系统中,保存了Block的Meta-data,同时周期性地将所有存在的Block信息发送给NameNode。NameNode返回文件存储的DataNode的信息。
Client读取文件信息。
22. DataNode 通过长连接与 NameNode 保持通信。( )
这个有分歧:具体正在找这方面的有利资料。下面提供资料可参考。
首先明确一下概念:
(1).长连接
Client方与Server方先建立通讯连接,连接建立后不断开,然后再进行报文发送和接收。这种方式下由于通讯连接一直存在,此种方式常用于点对点通讯。
(2).短连接
Client方与Server每进行一次报文收发交易时才进行通讯连接,交易完毕后立即断开连接。此种方式常用于一点对多点通讯,比如多个Client连接一个Server.
23. Hadoop 自身具有严格的权限管理和安全措施保障集群正常运行。(错误)
分析:hadoop只能阻止好人犯错,但是不能阻止坏人干坏事
24. Slave 节点要存储数据,所以它的磁盘越大越好。(错误)
分析:一旦Slave节点宕机,数据恢复是一个难题
25. hadoop dfsadmin –report 命令用于检测 HDFS 损坏块。(错误)
26. Hadoop 默认调度器策略为 FIFO(正确)
27. 集群内每个节点都应该配 RAID,这样避免单磁盘损坏,影响整个节点运行。(错误)
分析:首先明白什么是RAID,可以参考百科磁盘阵列。这句话错误的地方在于太绝对,具体情况具体分析。题目不是重点,知识才是最重要的。因为hadoop本身就具有冗余能力,所以如果不是很严格不需要都配备RAID。具体参考第二题。
28. 因为 HDFS 有多个副本,所以 NameNode 是不存在单点问题的。(错误)
29. 每个 map 槽就是一个线程。(错误)
分析:首先我们知道什么是map 槽,map 槽->map slotmap slot 只是一个逻辑值 ( org.apache.hadoop.mapred.TaskTracker.TaskLauncher.numFreeSlots ),而不是对应着一个线程或者进程
30. Mapreduce 的 input split 就是一个 block。(错误)
31. NameNode 的 Web UI 端口是 50030,它通过 jetty 启动的 Web 服务。(错误)
32. Hadoop 环境变量中的 HADOOP_HEAPSIZE 用于设置所有 Hadoop 守护线程的内存。它默认是 200 GB。(错误)
分析:hadoop为各个守护进程(namenode,secondarynamenode,jobtracker,datanode,tasktracker)统一分配的内存在hadoop-env.sh中设置,参数为HADOOP_HEAPSIZE,默认为1000M。
33. DataNode 首次加入 cluster 的时候,如果 log 中报告不兼容文件版本,那需要 NameNode执行“Hadoop namenode -format”操作格式化磁盘。(错误)
分析:
首先明白介绍,什么ClusterID
ClusterID。添加了一个新的标识符ClusterID用于标识集群中所有的节点。当格式化一个Namenode,需要提供这个标识符或者自动生成。这个ID可以被用来格式化加入集群的其他Namenode。
二次整理
有的同学问题的重点不是上面分析内容:内容如下:
这个报错是说明 DataNode 所装的Hadoop版本和其它节点不一致,应该检查DataNode的Hadoop版本
*******************************************************
1、你会Java语言吗?熟悉到什么程度?
2、你最喜欢的编程语言是什么?为什么?
3、处理过的最大的数据量?你是如何处理他们的?处理的结果如何。
2、在处理大数据过程中,如何保证得到期望值?
3、如何让一个网络爬虫速度更快、抽取更好的信息以及更好总结数据从而得到一干净的数据库?
4、点击流数据应该是实时处理?为什么?哪部分应该实时处理?
6、如何把非结构化的数据转换成结构化的数据?这是否真的有必要做这样的转换?把数据存成平面文本文件是否比存成关系数据库更好?
7、如何判别mapreduce过程有好的负载均衡?什么是负载均衡?
8、Spark和Hive的区别,以及Spark和Hive的数据倾斜调优问题?
9、Hive和Hbase的区别?
10、MapReduce的思想,以及MapReduce调优问题?
11、你所了解的开源网站?
12、有两个集群,每个集群有3个节点,使用hive分析相同的数据,sql语句完全一样,一个集群的分析结果比另外一个慢的多,给出造成这种现象的可能原因?
13、Hbase的优化?
14、集群的版本,以及集群的瓶颈问题?
15、CRM项目,怎么跟Spark结合?
16、如何创建一个关键字分类?
17、海量日志数据,提取出某日访问百度次数最多的那个IP?
18、Hadoop和Spark处理数据时,出现内存溢出的处理方法?
19、有一个1G大小的一个文件,里面每一是一个词,词的大小不超过16字节,内存大小限制大小1M,返回频率最高的50个词。
20、你是如何处理缺少数据的?你是推荐使用什么样的处理技术,或者说你是用什么样的技术处理呢?
如果不会Java,面试第一题就答不会,这样难道不尴尬吗?正如加里·金(崇拜/崇拜)说:“这是一场革命,庞大的数据资源使得各个领域开始了量化进程,无论学术界、商界还是政府,所有领域都将开始这种进程。”大数据时代,没有Java基础,如何开启你的大数据时代?
**********************************************************************
1.简要描述如何安装配置一个apache开源版hadoop,描述即可,列出步骤更好
2.请列出正常工作的hadoop集群中hadoop都需要启动哪些进程,他们的作用分别是什么?
3.启动hadoop报如下错误,该如何解决?
error org.apache.hadoop.hdfs.server.namenode.NameNode
org.apache.hadoop.hdfs.server.common.inconsistentFSStateExceptio
n Directory /tmp/hadoop-root/dfs/name is in an inconsistent
state storage direction does not exist or is not accessible?
4.请写出以下执行命令
1)杀死一个job?
2)删除hdfs上的/tmp/aaa目录
3加入一个新的存储节点和删除一个计算节点需要刷新集群状态命令?
5.请列出你所知道的hadoop调度器,并简要说明其工作方法?
6.请列出在你以前工作中所使用过的开发mapreduce的语言?
7.当前日志采样格式为
- a,b,c,d
- b,b,f,e
- a,a,c,f
请用你最熟悉的语言编写一个mapreduce,并计算第四列每个元素出现的个数
8.你认为用Java,Streaming,pipe方式开发mapreduce,各有哪些优缺点?
9.hive有哪些方式保存元数据,各有哪些特点?
10.请简述hadoop怎么样实现二级排序?
11.简述hadoop实现join的几种方法?
12.请用Java实现非递归二分查找?
13.请简述mapreduce中,combiner,partition作用?
14.某个目录下有两个文件a.txt和b.txt,文件格式为(ip,username),
列如:
a.txt
127.0.0.1 zhangsan
127.0.0.1 wangxiaoer
127.0.0.2 lisi
127.0.0.3 wangwu
b.txt
127.0.0.4 lixiaolu
127.0.0.1 lisi
每个文件至少100万行,请使用Linux命令完成如下工作:
1)每个文件各自的ip数
2)出现在b.txt而没有出现在a.txt的ip
3)每个user出现的次数以及每个user对应的ip数
**************************************************************************
1.Hadoop集群可以运行的3个模式?
单机(本地)模式
伪分布式模式
全分布式模式
2. 单机(本地)模式中的注意点?
在单机模式(standalone)中不会存在守护进程,所有东西都运行在一个JVM上。这里同样没有DFS,使用的是本地文件系统。单机模式适用于开发过程中运行MapReduce程序,这也是最少使用的一个模式。
3. 伪分布模式中的注意点?
伪分布式(Pseudo)适用于开发和测试环境,在这个模式中,所有守护进程都在同一台机器上运行。
4. VM是否可以称为Pseudo?
不是,两个事物,同时Pseudo只针对Hadoop。
5. 全分布模式又有什么注意点?
全分布模式通常被用于生产环境,这里我们使用N台主机组成一个Hadoop集群,Hadoop守护进程运行在每台主机之上。这里会存在Namenode运行的主机,Datanode运行的主机,以及task tracker运行的主机。在分布式环境下,主节点和从节点会分开。
6. Hadoop是否遵循UNIX模式?
是的,在UNIX用例下,Hadoop还拥有“conf”目录。
7. Hadoop安装在什么目录下?
Cloudera和Apache使用相同的目录结构,Hadoop被安装在cdusrlibhadoop-0.20。
8. Namenode、Job tracker和task tracker的端口号是?
Namenode,70;Job tracker,30;Task tracker,60。
9. Hadoop的核心配置是什么?
Hadoop的核心配置通过两个xml文件来完成:1,hadoop-default.xml;2,hadoop-site.xml。这些文件都使用xml格式,因此每个xml中都有一些属性,包括名称和值,但是当下这些文件都已不复存在。
10. 那当下又该如何配置?
Hadoop现在拥有3个配置文件:1,core-site.xml;2,hdfs-site.xml;3,mapred-site.xml。这些文件都保存在conf子目录下。
11. RAM的溢出因子是?
溢出因子(Spill factor)是临时文件中储存文件的大小,也就是Hadoop-temp目录。
12. fs.mapr.working.dir只是单一的目录?
fs.mapr.working.dir只是一个目录。
13. hdfs-site.xml的3个主要属性?
dfs.name.dir决定的是元数据存储的路径以及DFS的存储方式(磁盘或是远端)
dfs.data.dir决定的是数据存储的路径
fs.checkpoint.dir用于第二Namenode
14. 如何退出输入模式?
退出输入的方式有:1,按ESC;2,键入q(如果你没有输入任何当下)或者键入wq(如果你已经输入当下),并且按下Enter。
15. 当你输入hadoopfsck 造成“connection refused java exception’”时,系统究竟发生了什么?
这意味着Namenode没有运行在你的VM之上。
16. 我们使用Ubuntu及Cloudera,那么我们该去哪里下载Hadoop,或者是默认就与Ubuntu一起安装?
这个属于Hadoop的默认配置,你必须从Cloudera或者Edureka的dropbox下载,然后在你的系统上运行。当然,你也可以自己配置,但是你需要一个Linux box,Ubuntu或者是Red Hat。在Cloudera网站或者是Edureka的Dropbox中有安装步骤。
17. “jps”命令的用处?
这个命令可以检查Namenode、Datanode、Task Tracker、 Job Tracker是否正常工作。
18. 如何重启Namenode?
点击stop-all.sh,再点击start-all.sh。
键入sudo hdfs(Enter),su-hdfs (Enter),etcinit.dha(Enter),及etcinit.dhadoop-0.20-namenode start(Enter)。
19. Fsck的全名?
全名是:File System Check。
20. 如何检查Namenode是否正常运行?
如果要检查Namenode是否正常工作,使用命令etcinit.dhadoop-0.20-namenode status或者就是简单的jps。
21. mapred.job.tracker命令的作用?
可以让你知道哪个节点是Job Tracker。
22. etc init.d命令的作用是?
etc init.d说明了守护进程(服务)的位置或状态,其实是LINUX特性,和Hadoop关系不大。
23. 如何在浏览器中查找Namenode?
如果你确实需要在浏览器中查找Namenode,你不再需要localhost8021,Namenode的端口号是50070。
24. 如何从SU转到Cloudera?
从SU转到Cloudera只需要键入exit。
25. 启动和关闭命令会用到哪些文件?
Slaves及Masters。
26. Slaves由什么组成?
Slaves由主机的列表组成,每台1行,用于说明数据节点。
27. Masters由什么组成?
Masters同样是主机的列表组成,每台一行,用于说明第二Namenode服务器。
28. hadoop-env.sh是用于做什么的?
hadoop-env.sh提供了Hadoop中. JAVA_HOME的运行环境。
29. Master文件是否提供了多个入口?
是的你可以拥有多个Master文件接口。
30. Hadoop-env.sh文件当下的位置?
hadoop-env.sh现在位于conf。
31. 在Hadoop_PID_DIR中,PID代表了什么?
PID代表了“Process ID”。
32. varhadooppids用于做什么?
varhadooppids用来存储PID。
33. hadoop-metrics.properties文件的作用是?
hadoop-metrics.properties被用做“Reporting”,控制Hadoop报告,初始状态是“not to report”。
34. Hadoop需求什么样的网络?
Hadoop核心使用Shell(SSH)来驱动从节点上的服务器进程,并在主节点和从节点之间使用password-less SSH连接。
35. 全分布式环境下为什么需求password-less SSH?
这主要因为集群中通信过于频繁,Job Tracker需要尽可能快的给Task Tracker发布任务。
36. 这会导致安全问题吗?
完全不用担心。Hadoop集群是完全隔离的,通常情况下无法从互联网进行操作。与众不同的配置,因此我们完全不需要在意这种级别的安全漏洞,比如说通过互联网侵入等等。Hadoop为机器之间的连接提供了一个相对安全的方式。
37. SSH工作的端口号是?
SSH工作的端口号是NO.22,当然可以通过它来配置,22是默认的端口号。
38. SSH中的注意点还包括?
SSH只是个安全的shell通信,可以把它当做NO.22上的一种协议,只需要配置一个密码就可以安全的访问。
39. 为什么SSH本地主机需要密码?
在SSH中使用密码主要是增加安全性,在某些情况下也根本不会设置密码通信。
40. 如果在SSH中添加key,是否还需要设置密码?
是的,即使在SSH中添加了key,还是需要设置密码。
41. 假如Namenode中没有数据会怎么样?
没有数据的Namenode就不能称之为Namenode,通常情况下,Namenode肯定会有数据。
42. 当Job Tracker宕掉时,Namenode会发生什么?
当Job Tracker失败时,集群仍然可以正常工作,只要Namenode没问题。
43. 是客户端还是Namenode决定输入的分片?
这并不是客户端决定的,在配置文件中以及决定分片细则。
44. 是否可以自行搭建Hadoop集群?
是的,只要对Hadoop环境足够熟悉,你完全可以这么做。
45. 是否可以在Windows上运行Hadoop?
你最好不要这么做,Red Hat Linux或者是Ubuntu才是Hadoop的最佳操作系统。在Hadoop安装中,Windows通常不会被使用,因为会出现各种各样的问题。因此,Windows绝对不是Hadoop的推荐系统。
*************************************************************
问题导读
1、当前集群的可用资源不能满足应用程序的需求,怎么解决?
2、内存里堆的东西太多了,有什么好办法吗?
1、WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster uito ensure that workers are registered and have sufficient memory
当前的集群的可用资源不能满足应用程序所请求的资源。
资源分2类: cores 和 ram
Core代表对执行可用的executor slots
Ram代表每个Worker上被需要的空闲内存来运行你的Application。
解决方法:
应用不要请求多余空闲可用资源的
关闭掉已经执行结束的Application
2、Application isn’t using all of the Cores: How to set the Cores used by a Spark App
设置每个App所能获得的core
解决方法:
spark-env.sh里设置spark.deploy.defaultCores
或
spark.cores.max
3、Spark Executor OOM: How to set Memory Parameters on Spark
OOM是内存里堆的东西太多了
1、增加job的并行度,即增加job的partition数量,把大数据集切分成更小的数据,可以减少一次性load到内存中的数据量。InputFomart, getSplit来确定。
2、spark.storage.memoryFraction
管理executor中RDD和运行任务时的内存比例,如果shuffle比较小,只需要一点点shuffle memory,那么就调大这个比例。默认是0.6。不能比老年代还要大。大了就是浪费。
3、spark.executor.memory如果还是不行,那么就要加Executor的内存了,改完executor内存后,这个需要重启。
4、Shark Server/ Long Running Application Metadata Cleanup
Spark程序的元数据是会往内存中无限存储的。spark.cleaner.ttl来防止OOM,主要出现在Spark Steaming和Shark Server里。
export SPARK_JAVA_OPTS +="-Dspark.kryoserializer.buffer.mb=10 -Dspark.cleaner.ttl=43200"
5、Class Not Found: Classpath Issues
问题1、缺少jar,不在classpath里。
问题2、jar包冲突,同一个jar不同版本。
解决1:
将所有依赖jar都打入到一个fatJar包里,然后手动设置依赖到指定每台机器的DIR。
val conf = new SparkConf().setAppName(appName).setJars(Seq(System.getProperty("user.dir") + "/target/scala-2.10/sparktest.jar"))
解决2:
把所需要的依赖jar包都放到default classpath里,分发到各个worker node上。
关于性能优化:
第一个是sort-based shuffle。这个功能大大的减少了超大规模作业在shuffle方面的内存占用量,使得我们可以用更多的内存去排序。
第二个是新的基于Netty的网络模块取代了原有的NIO网络模块。这个新的模块提高了网络传输的性能,并且脱离JVM的GC自己管理内存,降低了GC频率。
第三个是一个独立于Spark executor的external shuffle service。这样子executor在GC的时候其他节点还可以通过这个service来抓取shuffle数据,所以网络传输本身不受到GC的影响。
过去一些的参赛系统软件方面的处理都没有能力达到硬件的瓶颈,甚至对硬件的利用率还不到10%。而这次我们的参赛系统在map期间用满了3GB/s的硬盘带宽,达到了这些虚拟机上八块SSD的瓶颈,在reduce期间网络利用率到了1.1GB/s,接近物理极限。
**********************************************************
1. Spark 的四大组件下面哪个不是 (D )
A.Spark Streaming B Mlib
C Graphx D Spark R
2.下面哪个端口不是 spark 自带服务的端口 (C )
A.8080 B.4040 C.8090 D.18080
3.spark 1.4 版本的最大变化 (B )
A spark sql Release 版本 B 引入 Spark R
C DataFrame D支持动态资源分配
4. Spark Job 默认的调度模式 (A )
A FIFO B FAIR
C 无 D 运行时指定
5.哪个不是本地模式运行的个条件 ( D)
A spark.localExecution.enabled=true B 显式指定本地运行 C finalStage 无父 Stage D partition默认值
6.下面哪个不是 RDD 的特点 (C )
A. 可分区 B 可序列化 C 可修改 D 可持久化
7. 关于广播变量,下面哪个是错误的 (D )
A 任何函数调用 B 是只读的 C 存储在各个节点 D 存储在磁盘或 HDFS
8. 关于累加器,下面哪个是错误的 (D )
A 支持加法 B 支持数值类型
C 可并行 D 不支持自定义类型
9.Spark 支持的分布式部署方式中哪个是错误的 (D )
A standalone B spark on mesos
C spark on YARN D Spark on local
10.Stage 的 Task 的数量由什么决定 (A )
A Partition B Job C Stage D TaskScheduler
11.下面哪个操作是窄依赖 (B )
A join B filter
C group D sort
12.下面哪个操作肯定是宽依赖 (C )
A map B flatMap
C reduceByKey D sample
13.spark 的 master 和 worker 通过什么方式进行通信的? (D )
A http B nio C netty D Akka
14 默认的存储级别 (A )
A MEMORY_ONLY B MEMORY_ONLY_SER
C MEMORY_AND_DISK D MEMORY_AND_DISK_SER
15 spark.deploy.recoveryMode 不支持那种 (D )
A.ZooKeeper B. FileSystem
D NONE D Hadoop
16.下列哪个不是 RDD 的缓存方法 (C )
A persist() B Cache()
C Memory()
17.Task 运行在下来哪里个选项中 Executor 上的工作单元 (C )
A Driver program B. spark master
C.worker node D Cluster manager
18.hive 的元数据存储在 derby 和 MySQL 中有什么区别 (B )
A.没区别 B.多会话 C.支持网络环境 D数据库的区别
19.DataFrame 和 RDD 最大的区别 (B )
A.科学统计支持 B.多了 schema
C.存储方式不一样 D.外部数据源支持
20.Master 的 ElectedLeader 事件后做了哪些操作 (D )
A. 通知 driver B.通知 worker
C.注册 application D.直接 ALIVE
**************************************************************
Volatile的特征:
A、禁止指令重排(有例外)
B、可见性
Volatile的内存语义:
当写一个volatile变量时,JMM会把线程对应的本地内存中的共享变量值刷新到主内存。
当读一个volatile变量时,JMM会把线程对应的本地内存置为无效,线程接下来将从主内存中读取共享变量。
Volatile的重排序
1、当第二个操作为volatile写操做时,不管第一个操作是什么(普通读写或者volatile读写),都不能进行重排序。这个规则确保volatile写之前的所有操作都不会被重排序到volatile之后;
2、当第一个操作为volatile读操作时,不管第二个操作是什么,都不能进行重排序。这个规则确保volatile读之后的所有操作都不会被重排序到volatile之前;
3、当第一个操作是volatile写操作时,第二个操作是volatile读操作,不能进行重排序。
这个规则和前面两个规则一起构成了:两个volatile变量操作不能够进行重排序;
除以上三种情况以外可以进行重排序。
比如:
1、第一个操作是普通变量读/写,第二个是volatile变量的读;
2、第一个操作是volatile变量的写,第二个是普通变量的读/写;
内存屏障/内存栅栏
内存屏障(Memory Barrier,或有时叫做内存栅栏,Memory Fence)是一种CPU指令,用于控制特定条件下的重排序和内存可见性问题。Java编译器也会根据内存屏障的规则禁止重排序。(也就是让一个CPU处理单元中的内存状态对其它处理单元可见的一项技术。)
内存屏障可以被分为以下几种类型:
LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。
StoreLoad屏障:对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。它的开销是四种屏障中最大的。
在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。
内存屏障阻碍了CPU采用优化技术来降低内存操作延迟,必须考虑因此带来的性能损失。为了达到最佳性能,最好是把要解决的问题模块化,这样处理器可以按单元执行任务,然后在任务单元的边界放上所有需要的内存屏障。采用这个方法可以让处理器不受限的执行一个任务单元。合理的内存屏障组合还有一个好处是:缓冲区在第一次被刷后开销会减少,因为再填充改缓冲区不需要额外工作了。
happens-before原则
Java是如何实现跨平台的?
跨平台是怎样实现的呢?这就要谈及Java虚拟机(Java Virtual Machine,简称 JVM)。
JVM也是一个软件,不同的平台有不同的版本。我们编写的Java源码,编译后会生成一种 .class 文件,称为字节码文件。Java虚拟机就是负责将字节码文件翻译成特定平台下的机器码然后运行。也就是说,只要在不同平台上安装对应的JVM,就可以运行字节码文件,运行我们编写的Java程序。
而这个过程中,我们编写的Java程序没有做任何改变,仅仅是通过JVM这一”中间层“,就能在不同平台上运行,真正实现了”一次编译,到处运行“的目的。
JVM是一个”桥梁“,是一个”中间件“,是实现跨平台的关键,Java代码首先被编译成字节码文件,再由JVM将字节码文件翻译成机器语言,从而达到运行Java程序的目的。
注意:编译的结果不是生成机器码,而是生成字节码,字节码不能直接运行,必须通过JVM翻译成机器码才能运行。不同平台下编译生成的字节码是一样的,但是由JVM翻译成的机器码却不一样。
所以,运行Java程序必须有JVM的支持,因为编译的结果不是机器码,必须要经过JVM的再次翻译才能执行。即使你将Java程序打包成可执行文件(例如 .exe),仍然需要JVM的支持。
注意:跨平台的是Java程序,不是JVM。JVM是用C/C++开发的,是编译后的机器码,不能跨平台,不同平台下需要安装不同版本的JVM。
手机扫二维码登录是怎么实现的?
友情链接:扫码登录是如何实现的?
Java 线程有哪些状态,这些状态之间是如何转化的?
-
新建(new):新创建了一个线程对象。
-
可运行(runnable):线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。该状态的线程位于可运行线程池中,等待被线程调度选中,获取cpu 的使用权 。
-
运行(running):可运行状态(runnable)的线程获得了cpu 时间片(timeslice) ,执行程序代码。
-
阻塞(block):阻塞状态是指线程因为某种原因放弃了cpu 使用权,也即让出了cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu timeslice 转到运行(running)状态。阻塞的情况分三种:
(一). 等待阻塞:运行(running)的线程执行o.wait()方法,JVM会把该线程放入等待队列(waitting queue)中。
(二). 同步阻塞:运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则JVM会把该线程放入锁池(lock pool)中。
(三). 其他阻塞:运行(running)的线程执行Thread.sleep(long ms)或t.join()方法,或者发出了I/O请求时,JVM会把该线程置为阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入可运行(runnable)状态。
- 死亡(dead):线程run()、main() 方法执行结束,或者因异常退出了run()方法,则该线程结束生命周期。死亡的线程不可再次复生。
List接口、Set接口和Map接口的区别
Cookie和Session的区别?
友情链接:Cookies 和 Session的区别
Java中的equals和hashCode方法详解
友情链接:Java提高篇——equals()与hashCode()方法详解
Java中CAS算法
友情链接:乐观的并发策略——基于CAS的自旋
TimSort原理
友情链接:TimSort原理
comparable与comparator的区别?
手写单例模式(线程安全)
友情链接:快速理解Java中的五种单例模式
Java线程间的通信方式?
友情链接:Java 多线程(七) 线程间的通信——wait及notify方法
友情链接:Java线程间的通信方式详解
Java8的内存分代改进
友情链接:Java7、Java8的堆内存有啥变化?
对Java内存模型的理解以及其在并发当中的作用?
Arrays和Collections 对于sort的不同实现原理?
1、Arrays.sort()
该算法是一个经过调优的快速排序,此算法在很多数据集上提供N*log(N)的性能,这导致其他快速排序会降低二次型性能。
2、Collections.sort()
该算法是一个经过修改的合并排序算法(其中,如果低子列表中的最高元素效益高子列表中的最低元素,则忽略合并)。此算法可提供保证的N*log(N)的性能,此实现将指定列表转储到一个数组中,然后再对数组进行排序,在重置数组中相应位置处每个元素的列表上进行迭代。这避免了由于试图原地对链接列表进行排序而产生的
Java中object常用方法
1、clone()
2、equals()
3、finalize()
4、getclass()
5、hashcode()
6、notify()
7、notifyAll()
8、toString()
对于Java中多态的理解
所谓多态就是指程序中定义的引用变量所指向的具体类型和通过该引用变量发出的方法调用在编程时并不确定,而是在程序运行期间才确定,即一个引用变量到底会指向哪个类的实例对象,该引用变量发出的方法调用到底是哪个类中实现的方法,必须在由程序运行期间才能决定。因为在程序运行时才确定具体的类,这样,不用修改源程序代码,就可以让引用变量绑定到各种不同的类实现上,从而导致该引用调用的具体方法随之改变,即不修改程序代码就可以改变程序运行时所绑定的具体代码,让程序可以选择多个运行状态,这就是多态性。
多态的定义:指允许不同类的对象对同一消息做出响应。即同一消息可以根据发送对象的不同而采用多种不同的行为方式。(发送消息就是函数调用)
Java实现多态有三个必要条件:继承、重写、父类引用指向子类对象。
继承:在多态中必须存在有继承关系的子类和父类。
重写:子类对父类中某些方法进行重新定义,在调用这些方法时就会调用子类的方法。
父类引用指向子类对象:在多态中需要将子类的引用赋给父类对象,只有这样该引用才能够具备技能调用父类的方法和子类的方法。
实现多态的技术称为:动态绑定(dynamic binding),是指在执行期间判断所引用对象的实际类型,根据其实际的类型调用其相应的方法。
多态的作用:消除类型之间的耦合关系。
Session机制?
友情链接 :Session机制详解
Java序列化与反序列化是什么?为什么需要序列化与反序列化?如何实现Java序列化与反序列化?
友情链接 : Java序列化与反序列化
spring AOP 实现原理?
友情链接 :Spring AOP 实现原理
Servlet 工作原理?
友情链接 :Servlet 工作原理解析
java NIO和IO的区别?
友情链接 :Java NIO和IO的区别
Java中堆内存和栈内存区别?
友情链接 :Java中堆内存和栈内存详解
反射讲一讲,主要是概念,都在哪需要反射机制,反射的性能,如何优化?
反射机制的定义:
是在运行状态中,对于任意的一个类,都能够知道这个类的所有属性和方法,对任意一个对象都能够通过反射机制调用一个类的任意方法,这种动态获取类信息及动态调用类对象方法的功能称为java的反射机制。
反射的作用:
1、动态地创建类的实例,将类绑定到现有的对象中,或从现有的对象中获取类型。
2、应用程序需要在运行时从某个特定的程序集中载入一个特定的类。
如何保证RESTful API安全性 ?
友情链接: 如何设计好的RESTful API之安全性
如何预防MySQL注入?
友情链接:MySQL 及 SQL 注入与防范方法
******************************************************
spark-submit的时候如何引入外部jar包
spark shuffle的具体过程,你知道几种shuffle方式
spark 如何防止内存溢出
cache和pesist的区别
怎么处理数据倾斜
简要描述Spark分布式集群搭建的步骤
spark使用:
1)当前文件a.text的格式为,请统计每个单词出现的个数、计算第四列每个元素出现的个数
A,b,c,d
B,b,f,e
A,a,c,f
2)在(url,user)的键值对中,如
a.text
127.0.0.1 xiaozhang
127.0.0.1 xiaoli
127.0.0.2 wangwu
127.0.0.3 lisi
…..
B.text
127.0.0.4 lixiaolu
127.0.0.5 lisi
127.0.0.3 zhangsan
每个文件至少有1000万行,请用程序完成一下工作,
1)各个文件的ip数
2)出现在b.text而没有出现在a.text的IP
3)每个user出现的次数以及每个user对应的IP的个数
4)对应IP数最多的前K个user