赞
踩
object TestRDDSerializable { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ser") val sc = new SparkContext(sparkConf) val rdd = sc.makeRDD(List(1,2,3,4), 2) val user = new User() rdd.foreach(num => { println("age = " + (user.age + num)) }) /* 结果:程序执行抛出异常 NotSerializableException 分析: 1.foreach 算子外部的执行是在 Driver 端,内部的操作是在 Executor 端执行 2.foreach 算子的内部操作使用到了 user 对象的属性,所以 user 对象需要从 Driver 发送到 Executor,涉及到网络传输 3.由于 User 类没有混入序列化特质,所以抛出异常 4.解决方法:class User extends Serializable {} 或 case class User {},样例类在编译时会自动混入序列化 */ println("=================") val rdd1 = sc.makeRDD(List[Int](), 2) val user1 = new User() rdd1.foreach(num => { println("age = " + (user1.age + num)) }) /* 期望:由于rdd1没有数据,foreach 算子不会实际执行,即使 User 没有混入序列化也不会报错 结果:程序执行抛出异常 NotSerializableException 分析: 1.RDD 算子中如果传递的是函数参数,则会涉及到闭包操作,内部会调用 sc.clean(f) 2.clean 方法底层会进行闭包检测,其中就包含序列化的检测,如果检测到使用的对象没有混入序列化特质,就会抛出异常 */ } } class User { val age: Int = 30 }
参考地址:https://github.com/EsotericSoftware/kryo
与 Java 序列化的对比:
自定义 Kryo 序列化
/* 简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化 */ object TestKryoSerializable { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("Ser") // 替换默认的序列化机制 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // 注册需要使用 kryo 序列化的自定义类,该类必须混入 Serializable 特质 conf.registerKryoClasses(Array(classOf[Searcher])) val sc = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(Array( "hello world", "hello spark", "kafka", "hive" ), 2) val searcher = new Searcher("h") val result: RDD[String] = searcher.getMatchedRDD1(rdd) result.collect.foreach(println) } } case class Searcher(val query: String) { def isMatch(s: String) = { s.contains(query) // this.query } def getMatchedRDD1(rdd: RDD[String]) = { rdd.filter(isMatch) } def getMatchedRDD2(rdd: RDD[String]) = { val q = query rdd.filter(_.contains(q)) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。