赞
踩
需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作//也就是说,需要对一份数据执行两次算子操作。//错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。//这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,//然后分别对每个RDD都执行了一个算子操作。//这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;//第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")rdd1.map(...)val rdd2 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")rdd2.reduce(...)//正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。//这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,//然后对这一个RDD执行了多次算子操作。//但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,//还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。//要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,//才能保证一个RDD被多次使用时只被计算一次。val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)")rdd1.map(...)rdd1.reduce(...)
原则二:
尽可能复用同一个RDD除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。一个简单的例子
// 错误的做法。// 有一个格式的RDD,即rdd1。// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,//而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。JavaPairRDDlong><long , String> rdd1 = ...JavaRDD<string> rdd2 = rdd1.map(...)// 分别对rdd1和rdd2执行了不同的算子操作。rdd1.reduceByKey(...)rdd2.map(...)// 正确的做法。// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,//rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。// 其实在这种情况下完全可以复用同一个RDD。// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。JavaPairRDD<long , String> rdd1 = ...rdd1.reduceByKey(...)rdd1.map(tuple._2...)// 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,//才能保证一个RDD被多次使用时只被计算一次。
原则三:
对多次使用的RDD进行持久化当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。对多次使用的RDD进行持久化的代码示例
/ 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。// 正确的做法。// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)").cache()rdd1.map(...)rdd1.reduce(...)// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,//内存不充足时持久化到磁盘文件中。// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition//都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,//从而发生频繁GC。val rdd1 = sc.textFile("[hdfs://192.168.0.1:9000/hello.txt](hdfs://192.168.0.1:9000/hello.txt)").persist(StorageLevel.MEMORY_AND_DISK_SER)rdd1.map(...)rdd1.reduce(...)
对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。Spark的持久化级别
如何选择一种最合适的持久化策略1、默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。2、如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。3、如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。4、通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。
原则四:尽量避免使用shuffle类算子如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。Broadcast与map进行join代码示例
// 传统的join操作会导致shuffle操作。// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。val rdd3 = rdd1.join(rdd2)// Broadcast+map的join操作,不会导致shuffle操作。// 使用Broadcast将一个数据量较小的RDD作为广播变量。val rdd2Data = rdd2.collect()val rdd2DataBroadcast = sc.broadcast(rdd2Data)// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,//那么就判定可以进行join。// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,//拼接在一起(String或Tuple)。val rdd3 = rdd1.map(rdd2DataBroadcast...)// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
原则五:
使用map-side预聚合的shuffle操作如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。比如下图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。
原则六:使用高性能的算子除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。使用reduceByKey/aggregateByKey替代groupByKey详情见“原则五:使用map-side预聚合的shuffle操作”。
使用mapPartitions替代普通mapmapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
使用foreachPartitions替代foreach原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。
使用filter之后进行coalesce操作通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。
使用repartitionAndSortWithinPartitions替代repartition与sort类操作repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
原则七:广播大变量有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。广播大变量的代码示例
// 以下代码在算子函数中,使用了外部的变量。// 此时没有做任何特殊操作,每个task都会有一份list1的副本。val list1 = ...rdd1.map(list1...)// 以下代码将list1封装成了Broadcast类型的广播变量。// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。// 每个Executor内存中,就只会驻留一份广播变量副本。val list1 = ...val list1Broadcast = sc.broadcast(list1)rdd1.map(list1Broadcast...)
原则八:
使用Kryo优化序列化性能在Spark中,主要有三个地方涉及到了序列化:1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。2、将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。3、使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):
// 创建SparkConf对象。val conf = new SparkConf().setMaster(...).setAppName(...)// 设置序列化器为KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
原则九:
优化数据结构Java中,有三种类型比较耗费内存:1、对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。2、字符串,每个字符串内部都有一个字符数组以及长度等额外信息。3、集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。但是在笔者的编码实践中发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
./bin/spark-submit \--master yarn-cluster \--num-executors 100 \--executor-memory 6G \--executor-cores 4 \--driver-memory 1G \--conf spark.default.parallelism=1000 \--conf spark.storage.memoryFraction=0.5 \--conf spark.shuffle.memoryFraction=0.3 \
val conf = new SparkConf()val sc = new SparkContext(conf)val lines = sc.textFile("[hdfs://...](hdfs://...)")val words = lines.flatMap(_.split(" "))val pairs = words.map((_, 1))val wordCounts = pairs.reduceByKey(_ + _)wordCounts.collect().foreach(println(_))
通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由reduceByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。2.6、某个task莫名其妙内存溢出的情况 这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。2.7、查看导致数据倾斜的key的数据分布情况 知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:1、如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况。2、如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。
val sampledPairs = pairs.sample(false, 0.1)val sampledWordCounts = sampledPairs.countByKey()sampledWordCounts.foreach(println(_))
三、数据倾斜的解决方案
解决方案一:
使用Hive ETL预处理数据 方案适用场景:导致数据倾斜的是Hive表。如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他key才对应了10条数据),而且业务场景需要频繁使用Spark对Hive表执行某个分析操作,那么比较适合使用这种技术方案。方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在Spark作业中针对的数据源就不是原来的Hive表了,而是预处理后的Hive表。此时由于数据已经预先进行过聚合或join操作了,那么在Spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。方案实现原理:这种方案从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。但是这里也要提醒一下大家,这种方式属于治标不治本。因为毕竟数据本身就存在分布不均匀的问题,所以Hive ETL中进行group by或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢。我们只是把数据倾斜的发生提前到了Hive ETL中,避免Spark程序发生数据倾斜而已。方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。方案实践经验:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。项目实践经验:在美团·点评的交互式用户行为分析系统中使用了这种方案,该系统主要是允许用户通过Java Web系统提交数据分析统计任务,后端通过Java提交Spark作业进行数据分析统计。要求Spark作业速度必须要快,尽量在10分钟以内,否则速度太慢,用户体验会很差。所以我们将有些Spark作业的shuffle操作提前到了Hive ETL中,从而让Spark直接使用预处理的Hive中间表,尽可能地减少Spark的shuffle操作,大幅度提升了性能,将部分作业的性能提升了6倍以上。
解决方案二:
过滤少数导致倾斜的key 方案适用场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。方案实现思路:如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key。比如,在Spark SQL中可以使用where子句过滤掉这些key或者在Spark Core中对RDD执行filter算子过滤掉这些key。如果需要每次作业执行时,动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可。方案实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。方案实践经验:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。
解决方案三:
提高shuffle操作的并行度 方案适用场景:如果我们必须要对数据倾斜迎难而上,那么建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。具体原理如下图所示。方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。方案实践经验:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。
解决方案四:
两阶段聚合(局部聚合+全局聚合) 方案适用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
// 第一步,给RDD中的每个key都打上一个随机前缀。JavaPairRDD<String, Long> randomPrefixRdd = rdd.mapToPair(new PairFunction<Tuple2<Long,Long>, String, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Long> call(Tuple2<Long, Long> tuple)throws Exception { Random random = new Random(); int prefix = random.nextInt(10);return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);}});// 第二步,对打上随机前缀的key进行局部聚合。JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(new Function2<Long, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1 + v2;}});// 第三步,去除RDD中每个key的随机前缀。JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(new PairFunction<Tuple2<String,Long>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, Long> call(Tuple2<String, Long> tuple)throws Exception { long originalKey = Long.valueOf(tuple._1.split("_")[1]);return new Tuple2<Long, Long>(originalKey, tuple._2);}});// 第四步,对去除了随机前缀的RDD进行全局聚合。JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(new Function2<Long, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1 + v2;}});
解决方案五:
将reduce join转为map join方案适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G),比较适用此方案。方案实现思路:不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。方案实现原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。具体原理如下图所示。方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。
// 首先将数据量比较小的RDD的数据,collect到Driver中来。List<Tuple2<Long, Row>> rdd1Data = rdd1.collect()// 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。// 可以尽可能节省内存空间,并且减少网络传输性能开销。final Broadcast<List<Tuple2<Long, Row>>> rdd1DataBroadcast = sc.broadcast(rdd1Data);// 对另外一个RDD执行map类操作,而不再是join类操作。JavaPairRDD<String, Tuple2<String, Row>> joinedRdd = rdd2.mapToPair(new PairFunction<Tuple2<Long,String>, String, Tuple2<String, Row>>() { private static final long serialVersionUID = 1L;@Override public Tuple2<String, Tuple2<String, Row>> call(Tuple2<Long, String> tuple) throws Exception {// 在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据。 List<Tuple2<Long, Row>> rdd1Data = rdd1DataBroadcast.value();// 可以将rdd1的数据转换为一个Map,便于后面进行join操作。 Map<Long, Row> rdd1DataMap = new HashMap<Long, Row>();for(Tuple2<Long, Row> data : rdd1Data) { rdd1DataMap.put(data._1, data._2);}// 获取当前RDD数据的key以及value。String key = tuple._1;String value = tuple._2;// 从rdd1数据Map中,根据key获取到可以join到的数据。Row rdd1Value = rdd1DataMap.get(key);return new Tuple2<String, String>(key, new Tuple2<String, Row>(value, rdd1Value));}});// 这里得提示一下。// 上面的做法,仅仅适用于rdd1中的key没有重复,全部是唯一的场景。// 如果rdd1中有多个相同的key,那么就得用flatMap类的操作,在进行join的时候不能用map,而是得遍历rdd1所有数据进行join。// rdd2中每条数据都可能会返回多条join后的数据。
解决方案六:采样倾斜key并分拆join操作方案适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。方案实现思路:1、对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。2、然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。3、接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。4、再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。5、而另外两个普通的RDD就照常join即可。6、最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。方案实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。
// 首先从包含了少数几个导致数据倾斜key的rdd1中,采样10%的样本数据。JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);// 对样本数据RDD统计出每个key的出现次数,并按出现次数降序排序。// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。// 具体取出多少个数据量最多的key,由大家自己决定,我们这里就取1个作为示范。JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(new PairFunction<Tuple2<Long,String>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, Long> call(Tuple2<Long, String> tuple)throws Exception {return new Tuple2<Long, Long>(tuple._1, 1L);} });JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(new Function2<Long, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Long call(Long v1, Long v2) throws Exception {return v1 + v2;}});JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(new PairFunction<Tuple2<Long,Long>, Long, Long>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)throws Exception {return new Tuple2<Long, Long>(tuple._2, tuple._1);}});final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;// 从rdd1中分拆出导致数据倾斜的key,形成独立的RDD。JavaPairRDD<Long, String> skewedRDD = rdd1.filter(new Function<Tuple2<Long,String>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<Long, String> tuple) throws Exception {return tuple._1.equals(skewedUserid);}});// 从rdd1中分拆出不导致数据倾斜的普通key,形成独立的RDD。JavaPairRDD<Long, String> commonRDD = rdd1.filter(new Function<Tuple2<Long,String>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<Long, String> tuple) throws Exception {return !tuple._1.equals(skewedUserid);}});// rdd2,就是那个所有key的分布相对较为均匀的rdd。// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。// 对扩容的每条数据,都打上0~100的前缀。JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(new Function<Tuple2<Long,Row>, Boolean>() {private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<Long, Row> tuple) throws Exception {return tuple._1.equals(skewedUserid);}}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple) throws Exception {Random random = new Random();List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();for(int i = 0; i < 100; i++) {list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));}return list;}});// 将rdd1中分拆出来的导致倾斜的key的独立rdd,每条数据都打上100以内的随机前缀。// 然后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(new PairFunction<Tuple2<Long,String>, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, String> call(Tuple2<Long, String> tuple)throws Exception {Random random = new Random();int prefix = random.nextInt(100);return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);}}).join(skewedUserid2infoRDD).mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Long, Tuple2<String, Row>> call(Tuple2<String, Tuple2<String, Row>> tuple)throws Exception {long key = Long.valueOf(tuple._1.split("_")[1]);return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);}});// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);// 将倾斜key join后的结果与普通key join后的结果,uinon起来。// 就是最终的join结果。JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);
// 首先将其中一个key分布相对较为均匀的RDD膨胀100倍。JavaPairRDD<String, Row> expandedRDD = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {private static final long serialVersionUID = 1L;@Overridepublic Iterable<Tuple2<String, Row>> call(Tuple2<Long, Row> tuple)throws Exception {List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();for(int i = 0; i < 100; i++) {list.add(new Tuple2<String, Row>(0 + "_" + tuple._1, tuple._2));}return list;}});// 其次,将另一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。JavaPairRDD<String, String> mappedRDD = rdd2.mapToPair(new PairFunction<Tuple2<Long,String>, String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, String> call(Tuple2<Long, String> tuple)throws Exception {Random random = new Random();int prefix = random.nextInt(100);return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);}});// 将两个处理后的RDD进行join即可。JavaPairRDD<String, Tuple2<String, Row>> joinedRDD = mappedRDD.join(expandedRDD);
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。