当前位置:   article > 正文

详解 Spark 核心编程之 RDD 序列化

详解 Spark 核心编程之 RDD 序列化

一、问题引出

object TestRDDSerializable {
    def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ser")
        val sc = new SparkContext(sparkConf)
        
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        
        val user = new User()
        
        rdd.foreach(num => {
            println("age = " + (user.age + num))
        })
        
        /*
         结果:程序执行抛出异常 NotSerializableException
         分析:
          1.foreach 算子外部的执行是在 Driver 端,内部的操作是在 Executor 端执行
          2.foreach 算子的内部操作使用到了 user 对象的属性,所以 user 对象需要从 Driver 发送到 Executor,涉及到网络传输
          3.由于 User 类没有混入序列化特质,所以抛出异常
          4.解决方法:class User extends Serializable {} 或 case class User {},样例类在编译时会自动混入序列化
        */
        
        println("=================")
        
        val rdd1 = sc.makeRDD(List[Int](), 2)
        
        val user1 = new User()
        
        rdd1.foreach(num => {
            println("age = " + (user1.age + num))
        })
        
        /*
         期望:由于rdd1没有数据,foreach 算子不会实际执行,即使 User 没有混入序列化也不会报错
         结果:程序执行抛出异常 NotSerializableException
         分析:
          1.RDD 算子中如果传递的是函数参数,则会涉及到闭包操作,内部会调用 sc.clean(f)
          2.clean 方法底层会进行闭包检测,其中就包含序列化的检测,如果检测到使用的对象没有混入序列化特质,就会抛出异常
        */
    }
}

class User {
    val age: Int = 30
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

二、Kryo 序列化框架

  • 参考地址:https://github.com/EsotericSoftware/kryo

  • Java 序列化的对比:

    • Java 的序列化比较重,生成的文件字节比较多,而 Kryo 序列化是轻量级的,产生的字节较少,所以 Kryo 速度是 Serializable 的 10 倍
    • Java 的序列化中可以通过 transient 关键字限制不参与序列化的属性,而 transient 关键字在 Kryo 序列化中不产生作用
  • 自定义 Kryo 序列化

    /*
    简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
    */
    object TestKryoSerializable {
        def main(args: Array[String]): Unit = {
        	val conf = new SparkConf().setMaster("local[*]").setAppName("Ser")
            
            // 替换默认的序列化机制
            conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            // 注册需要使用  kryo  序列化的自定义类,该类必须混入 Serializable 特质
            conf.registerKryoClasses(Array(classOf[Searcher]))
            
        	val sc = new SparkContext(conf)
            
        	val rdd: RDD[String] = sc.makeRDD(Array(
                "hello world", "hello spark", 
        		"kafka", "hive"
            ), 2)
            
        	val searcher = new Searcher("h")
            
        	val result: RDD[String] = searcher.getMatchedRDD1(rdd)
            
        	result.collect.foreach(println)
            
        }
    }
    
    case class Searcher(val query: String) { 
        def isMatch(s: String) = {
        	s.contains(query) // this.query
        }
        
        def getMatchedRDD1(rdd: RDD[String]) = {
        	rdd.filter(isMatch) 
        }
        
        def getMatchedRDD2(rdd: RDD[String]) = {
            val q = query
        	rdd.filter(_.contains(q))
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/663325
推荐阅读
相关标签
  

闽ICP备14008679号