当前位置:   article > 正文

Spark java.io.NotSerializableException

Spark java.io.NotSerializableException

源代码

//编写SQL语句
val sql = "insert into province_browser_cnt(`province`,`browser`,`operator_cnt`) values(?,?,?)"
//获取mysql的连接
val conn = JDBCUtil.getConn()
//将如下的结果写入数据库
userRDD.map(line => ((line(6), line(7)), 1))
  .reduceByKey(_ + _)
  .foreach(line=> {
    //创建ps
    val ps = conn.prepareStatement(sql)
    ps.setString(1, line._1._1) //设置省份
    ps.setString(2, line._1._2) //设置浏览器
    ps.setInt(3, line._2) //设置统计值
    //提交
    ps.execute()
  })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

错误原因

因为ps是一个PrepareStatement对象,这个对象无法序列化,而传入foreach中的对象是需要分布式传送到各个节点上,传送前先序列化,到达相应服务器上后再反序列化,PrepareStatement是个Java类,如果一个Java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口。

错误信息

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:971)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
	at edu.nanzhi.core.useraction.UserActionExternalPro$.main(UserActionExternalPro.scala:68)
	at edu.nanzhi.core.useraction.UserActionExternalPro.main(UserActionExternalPro.scala)
Caused by: java.io.NotSerializableException: com.mysql.cj.jdbc.DatabaseMetaData
Serialization stack:
	- object not serializable (class: com.mysql.cj.jdbc.DatabaseMetaData, value: com.mysql.cj.jdbc.DatabaseMetaData@59c70ceb)
	- field (class: com.mysql.cj.jdbc.ConnectionImpl, name: dbmd, type: interface java.sql.DatabaseMetaData)
	- object (class com.mysql.cj.jdbc.ConnectionImpl, com.mysql.cj.jdbc.ConnectionImpl@210d2a6c)
	- field (class: edu.nanzhi.core.useraction.UserActionExternalPro$$anonfun$main$18, name: conn$1, type: interface java.sql.Connection)
	- object (class edu.nanzhi.core.useraction.UserActionExternalPro$$anonfun$main$18, <function1>)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
	... 11 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

方案一

添加collect()函数将结果数据收集到driver端。对象ps在driver端,collect后的数据也在driver端,就不需ps序列化数据传到各个节点了,因此就不会出现序列化错误。

方案一具体代码

userRDD.map(line => ((line(6), line(7)), 1))
  .reduceByKey(_ + _)
  .collect()
  .foreach(line=> {
    //创建ps
    val ps = conn.prepareStatement(sql)
    ps.setString(1, line._1._1) //设置省份
    ps.setString(2, line._1._2) //设置浏览器
    ps.setInt(3, line._2) //设置统计值
    //提交
    ps.execute()
  })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

如上代码改正后将不会再报序列化错误。

方案二

因为方案一采用collect(),如果数据量大,将会消耗大量Driver端内存,影响整个作业性能,严重者甚至会导致Driver宕掉,导致整个作业失败。所以建议采用分区循环方式foreachPartition实现。foreachPartition函数和foreach函数类似,foreachPartition是将rdd的每一个分区循环一次,而foreach函数是上游每行数据循环一次。foreachPatition可以将每个分区的数据以迭代器方式迭代操作,而foreach函数则不能,通常foreachPatition比foreach要高效。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用foreach函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用foreachPartition,那么只需要针对每一个分区建立一个connection。同时,foreachPartitioin可以迭代进行处理,但foreach里的对象仍然还是需要反序列化

方案二具体代码

userRDD.map(line => ((line(6), line(7)), 1))
  .reduceByKey(_ + _)
  .foreachPartition(it => {
    val conn = JDBCUtil.getConn()
    try {
      //获取ps对象
      val ps = conn.prepareStatement(sql)
      //链接自动提交关闭
      conn.setAutoCommit(false)
      //循环迭代器
      it.foreach(line => {
        ps.setString(1, line._1._1) //设置省份
        ps.setString(2, line._1._2) //设置浏览器
        ps.setInt(3, line._2) //设置统计值
        //提交
        ps.addBatch()
      })
      //循环完后批次执行
      ps.executeBatch()
      //链接提交--->手动提交必须要关闭自动提交,默认事自动提交
      conn.commit()
    }
  })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

如上代码改正后将不会再报序列化错误,同时比方案一更加高效。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/485098
推荐阅读
相关标签
  

闽ICP备14008679号