当前位置:   article > 正文

超全面试汇总——Hadoop(二)_一个文件有上亿url,内存有限

一个文件有上亿url,内存有限

谈谈什么是Hadoop?

  • Hadoop是一个开源软件框架,用于存储大量数据,并发计算/查询节点的集群上的数据。
  • Hadoop包括以下内容:
    • HDFS(Hadoop Distributed File System):Hadoop分布式文件存储系统。
    • MapReduce:分布式计算框架。它以分布式和并行的方式处理大量的数据。
    • YARN(资源定位器):用于管理和调度集群资源的框架。hadoop2.x提出的

@@MapReduce分布式计算

在这里插入图片描述

  • https://blog.csdn.net/lihuazaizheli/article/details/107674269

  • 理解map reduce 编程思想(分而治之)

    • Tasks 分:把复杂的问题分解为若干"简单的任务"
    • Reduce Tasks 合:reduce
  • 大数据量下优势明显,读写HDFS次数多

  • mapreduce流程

    • inputFile通过split被切割为多个split文件, 逻辑切片<偏移-数据>,通过Record按行读取内容给map(自己写的处理逻辑的方法),对其结果key进行分区(默认使用的hashPartitioner),分区的数量就是 Reducer 任务运行的数量,然后写入buffer,每个map task 都有一个内存缓冲区(环形缓冲区),每个分区中对其键值对进行sort ,按照paritition和key排序,排序完后会创建一个溢出文件,然后把这部分数据溢出spill写到本地磁盘。通知master位置归并merge,当一个maptask处理数据很大时,对同一个map任务产生的多个spill文件进行归并生成最终的一个已分区且已排序的大文件
    • Reduce 大致分为 copy、sort、reduce 三个阶段,:多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge
  • maptask:

    • 读数据:读取源数据,获取进行逻辑切片<偏移-数据>,把每一行文本内容解析成键值对 形成key-value数据;<0,hello you> <10,hello me>
    • 逻辑处理:调用map方法读取每行数据进行处理<hello,1> <you,1> <hello,1> <me,1>
    • 分区:对数据进行分区(hash),分区的数量就是 Reducer 任务运行的数量。默认只有一个Reducer 任务。分区号相同的数据会被分发给同一reducetask
    • 排序:对不同分区中的数据进行排序(按照k)、分组。 排序默认按照字典序列,分组指的是相同key的value放到一个集合中。排序后:<hello,1> <hello,1> <me,1> <you,1> 分组后:<hello,{1,1}><me,{1}><you,{1}>
  • reducetask:

    • shuffle多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上
    • 处理数据:对多个map的输出进行合并、排序,覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑,<hello,2> <me,1> <you,1>
    • 对reduce输出的<k,v>写到HDFS中
  • map数由分片决定,若要增加map数,可增大mapred.map.tasks,若减少map数,可增大mapred.min.split.size。减少map个数,在map执行前合并小文件,可set mapred.map.split.size

  • reduce数量由分区数决定,结果文件的数量也由此决定,且记录默认按key升序排列。reduce数量可通过mapred.reduce.tasks设置。

@shuffle流程

在这里插入图片描述

  • 目的:
    • 对Map机器上的数据进行重新分组
    • 让每个Reduce知道它需要的数据分别在每个Map机器的哪里。
  • shuffle阶段分为四个步骤:依次为**:分区,排序,规约(combiner),分组**,其中前三个步骤在map阶段完成,最后一个步骤在reduce阶段完成
  • shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称作 shuffle
  • shuffle中排序的目的
    • 这样每个Reducer都可以得知自己要处理的数据是哪些,直接拉取和计算对应的数据,避免了大量无用数据的存储和计算
  • map端shuffle
    • 后台线程根据Reducer的个数将输出结果进行分区,每一个分区对应一个Reducer,能够把map任务处理的结果**发给指定reduce执行,负载均衡,**避免数据倾斜。
    • 写入环形内存缓冲区,频繁I/O操作会严重降低效率,每个map任务都会分配一个环形内存缓冲区,用于存储map任务输出的键值对,默认大小100MB
    • 执行溢出写 排序->合并->生成溢出写文件 一旦缓存区内容达到阈值,默认80%,就锁定着80%的内存,Map task的输出结果还可以往剩下的20MB内存中写,互不影响。并在每个分区中对其键值对进行sort**,按照paritition和key排序,排序完后会创建一个溢出文件,然后把这部分数据溢出spill写到本地磁盘。如果客户端自定义了Combiner(相当于map阶段的reduce),则会在分区排序后到溢写出前自动调用combiner,将相同的key的value相加,这样的好处就是减少溢写到磁盘的数据量。这个过程叫合并**)
    • 归并merge,当一个maptask处理数据很大时,对同一个map任务产生的多个spill文件进行归并生成最终的一个已分区且已排序的大文件
    • 合并(Combine)和归并(Merge)的区别:
      两个键值对<“a”,1><“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
  • reduce端shuffle
    • 复制copy,Reduce 任务通过HTTP向各个Map任务拖取它所需要的数据。在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作
    • 归并merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的 sort过程就是这个合并的过程。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的
  • 优化shuffle:增加combiner,压缩溢写的文件
  • 原则上缓冲区越大,磁盘io的次数越少,执行速度就越快缓冲区的大小可以通过参数调整, mapreduce.task.io.sort.mb 默认100M

shuffle阶段的数据压缩机制了解吗

  • 在shuffle阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段
  • hadoop当中支持的压缩算法:gzip、bzip2、LZO、LZ4、Snappy,这几种压缩算法综合压缩和解压缩的速率,谷歌的Snappy是最优的,一般都选择Snappy压缩。

MapReduce实现基本SQL操作的原理

由于Join/GroupBy/OrderBy均需要在Reduce阶段完成

1. Join的实现原理

 select u.name, o.orderid from order o join user u on o.uid = u.uid;
  • 1
  • sql语句中on后面的字段就是key,在map阶段的输出(value)中为不同表的数据打上tag标记在reduce阶段根据tag判断数据来源。MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式)
  • 在这里插入图片描述

2. Group By的实现原理

 select rank, isonline, count(*) from city group by rank, isonline;
  • 1
  • sql中 group by后面的字段组合(rank 和isonline的组合)作为map的输出key值,利用MapReduce的排序,在reduce阶段保存LastKey区分不同的key。MapReduce的过程如下(当然这里只是说明Reduce端的非Hash聚合过程)
  • 在这里插入图片描述

3. Distinct的实现原理

 select dealid, count(distinct uid) num from order group by dealid;
  • 1
  • 当只有一个distinct字段时,如果不考虑Map阶段的Hash GroupBy,只需要将GroupBy字段和Distinct字段组合为map输出key,利用mapreduce的排序,同时将GroupBy字段作为reduce的key,在reduce阶段保存LastKey即可完成去重

  • 在这里插入图片描述

  • 如果有多个distinct字段呢,如下面的SQL

 select dealid, count(distinct uid), count(distinct date) from order group by dealid;
  • 1
  • 可以对所有的distinct字段编号,每行数据生成n行数据,那么相同字段就会分别排序,这时只需要在reduce阶段记录LastKey即可去重。这种实现方式很好的利用了MapReduce的排序,节省了reduce阶段去重的内存消耗,但是缺点是增加了shuffle的数据量。需要注意的是,在生成reduce value时,除第一个distinct字段所在行需要保留value值,其余distinct数据行value字段均可为空

  • 在这里插入图片描述

一个文件有上亿url,内存很小,找Top10

  • 外排序采用分块的方法(分而治之),首先将数据分块,对块内数据按选择一种高效的内排序策略进行排序。然后采用归并排序的思想对于所有的块进行排序,得到所有数据的一个有序序列。

  • 把磁盘上的1TB数据分割为40块(chunks),每份25GB。(注意,要留一些系统空间!)

  • 顺序将每份25GB数据读入内存,使用quick sort算法排序。

  • 把排序好的数据(也是25GB)存放回磁盘。

  • 循环40次,现在,所有的40个块都已经各自排序了。(剩下的工作就是如何把它们合并排序!)

  • 从40个块中分别读取25G/40=0.625G入内存(40 input buffers)。

  • 执行40路合并,并将合并结果临时存储于2GB 基于内存的输出缓冲区中。当缓冲区写满2GB时,写入硬盘上最终文件,并清空输出缓冲区;当40个输入缓冲区中任何一个处理完毕时,写入该缓冲区所对应的块中的下一个0.625GB,直到全部处理完成。

@SQL转化为MapReduce的过程

  1. Antlr定义SQL的语法规则,完成SQL词法,语法解析,将SQL转化为抽象语法树AST Tree
    • HiveLexerX,HiveParser分别是Antlr对语法文件Hive.g编译后自动生成的词法解析和语法解析类
  2. 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
    • QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源,计算过程,输出。简单来讲一个QueryBlock就是一个子查询
  3. 遍历QueryBlock,翻译为执行操作树OperatorTree
    • Hive最终生成的MapReduce任务,Map阶段和Reduce阶段均由OperatorTree组成。逻辑操作符,就是在Map阶段或者Reduce阶段完成单一特定的操作。
  4. 逻辑层优化器进行OperatorTree变换,减少mapreduce job,减少shuffle数据量
    • 谓词下推、合并线性的OperatorTree中partition/sort key相同的reduce (from (select key,value from src group bu key, value)s select s.key group by s.key;
    • Map端聚合
  5. 遍历OperatorTree,翻译为MapReduce任务
  6. 物理层优化器进行MapReduce任务的变换,生成最终的执行计划

https://tech.meituan.com/2014/02/12/hive-sql-to-mapreduce.html

什么是数据倾斜

  • Hadoop能够进行对海量数据进行批处理的核心,在于它的分布式思想,也就是多台服务器(节点)组成集群,进行分布式的数据处理
  • 举例:如果有10亿数据,一台电脑可能要10小时,现在集群有10台,可能1小时就够了,但是有可能大量的数据集中到一台或几台上,要5小时,发生了数据倾斜

数据倾斜的表现

  • Mapreduce任务
    • reduce阶段 卡在99.99%不动
    • 各种container报错OOM
    • 读写数据量很大,超过其他正常reduce
  • spark任务
    • 个别task执行很慢
    • 单个执行特别久
    • shuffle出错
    • sparkstreaming做实时算法使,会有executor出现内存溢出,但是其他的使用率很低

@发生数据倾斜的原因

  • shuffle是按照key,来进行values的数据的输出、拉取和聚合的,一旦发生shuffle,所有相同key的值就会拉到一个或几个节点上,个别key对应的数据比较多,就容易发生单个节点处理数据量爆增的情况。
  • key分布不均匀
    • 存在大量相同值的数据
    • 存在大量异常值或者空值
  • 业务数据本身的特性
    • 例如某个分公司或某个城市订单量大幅提升几十倍甚至几百倍,对该城市的订单统计聚合时,容易发生数据倾斜。
  • 某些SQL语句本身就有数据倾斜
    • 两个表中关联字段存在大量空值(去除或者加随机数),或是关联字段的数据不统一(方法:把数字类型转为字符串类型,统一大小写)
    • join 一个key集中的小表 (方法:reduce join 改成 map join)
    • group by维度过小 某值的数量过多 (方法:两阶段聚合,放粗粒度)
    • count distinct 某特殊值过多 (方法:用group by)
  • 数据频率倾斜——某一个区域的数据量要远远大于其他区域。
  • 数据大小倾斜——部分记录的大小远远大于平均值。

@如何解决数据倾斜

@聚合类group by操作,发生数据倾斜

  • map段部分聚合

    • 开启Map端聚合参数设置set hive.map.aggr=true
    • 在Map端进行聚合操作的条目数目set hive.grouby.mapaggr.checkinterval=100000
    • 有数据倾斜的时候进行负载均衡(默认是false)set hive.groupby.skewindata = true
  • 阶段拆分-两阶段聚合 需要聚合的key前加一个随机数的前后缀,这样就均匀了,之后再按照原始的key聚合一次

  • 生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。相同的 Group By Key 有可 能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。

  • 假设 key = 水果
    select count(substr(a.key,1,2)) as key
    from(
    	select concat(key,'_',cast(round(10*rand())+1 as string)) tmp
    	from table
    	group by tmp
    )a
    group by key
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

@Reduce join 改为Map join

  • 适用于小表和大表 join,将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD 的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
  • 设置自动选择MapJoin set hive.auto.convert.join = true;默认为true
  • reduce join: 先将所有相同的key,对应的values,汇聚到一个task中,然后再进行join。
  • map reduce:broadcast出去那个小表的数据以后,就会在每个executor的block manager中都驻留一份+map算子来实现与join同样的效果。不会发生shuffe,从根本上杜绝了join操作可能导致的数据倾斜的问题

空值产生的数据倾斜

  • 1.在查询的时候,过滤掉所有为NULL的数据,比如:
    SELECT * FROM log a
    JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id
    UNION ALL
    SELECT *FROM log a WHERE a.user_id IS NULL;
    
    2.查询出空值并给其赋上随机数,避免了key值为空(数据倾斜中常用的一种技巧)
    SELECT *FROM log a
    LEFT JOIN bmw_users b ON 
    CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand()) ELSE a.user_id END = b.user_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

少用count(distinct) 采用sum() group by的方式来替换

  • select count(distinct a) from test ;
    select count x.a 
    from (select a from test group by a ) x 
    
    select a, count(distinct b) as c from tbl group by a;
    select a, count(*) as c from (select a, b from tbl group by a, b) group by a;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

特殊值分开处理法

  • 当需要把用户表和日志表关联起来时,再日志表中有很多没注册的用户表,可以分开处理

  • select *from
    (select * from logs where user_id = 0)a
    join
    (select * from users where user_id = 0)b
    on a.user_id = b.user_id
    union all
    select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

大表 join 大表

  • 将有大表中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD笛卡尔积,相当于将其数据增到到原来的N倍,N即为随机前缀的总个数)然后将二者Join后去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。
  • RDD扩容

不同数据类型关联产生数据倾斜

  • 一张表 s8_log,每个商品一条记录,要和商品表关联。**s8_log 中有字符串商品 id,也有数字的商品 id。**字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。

  • 问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。

  • -- 把数字类型转换成字符串类型
    SELECT *
    FROM s8_log a
    LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);
    
    • 1
    • 2
    • 3
    • 4

多表 union all 会优化成一个 job

  • 推广效果表要和商品表关联,效果表中的 auction id 列既有商品 id,也有数字 id,和商品表关联得到商品的信息。
SELECT *
FROM effect a
  JOIN (
      SELECT auction_id AS auction_id
      FROM auctions
      UNION ALL
      SELECT auction_string_id AS auction_id
      FROM auctions
  ) b
  ON a.auction_id = b.auction_id;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • **结论:**这样子比分别过滤数字 id,字符串 id ,然后分别和商品表关联性能要好。这样写的好处:1个 MR 作业,商品表只读取一次,推广效果表只读取一次。把这个 sql 换成 MR 代码的话,map 的时候,把 a 表的记录打上标签 a ,商品表记录每读取一条,打上标签 t,变成两个<key,value> 对,<t,数字id,value>,<t,字符串id,value>。所以商品表的 HDFS(Hadoop Distributed File System) 读只会是一次。

  • 问题:比如推广效果表要和商品表关联,效果表中的 auction_id 列既有 32 为字符串商 品 id,也有数字 id,和商品表关联得到商品的信息。 比分别过滤数字 id,字符串 id 然后分别和商品表关联性能要好。

SELECT * FROM effect a 
JOIN 
(SELECT auction_id AS auction_id FROM auctions 
UNION All 
SELECT auction_string_id AS auction_id FROM auctions) b 
ON a.auction_id=b.auction_id;	
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 场景:有一张user表,为卖家每天收到表,user_id,ds(日期)为key,属性有主营类目,指标有交易金额,交易笔数。每天要取前10天的总收入,总笔数,和最近一天的主营类目。

    • SELECT user_id, substr(MAX(CONCAT(ds, cat)), 9) AS main_cat, SUM(qty), SUM(amt) FROM users
      WHERE ds BETWEEN 20120301 AND 20120329
      GROUP BY user_id
      
      • 1
      • 2
      • 3

优化in/exists语句

  • hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join

排序选择

  • cluster by: 对同一字段分桶并排序,不能和sort by连用;

  • distribute by + sort by: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by 保证每个reduceTask结果有序;

  • sort by: 单机排序,单个reduce结果有序

  • order by:全局排序,缺陷是只能使用一个reduce

今天也是爱zz的一天哦!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/545807
推荐阅读
相关标签
  

闽ICP备14008679号