赞
踩
//编写SQL语句 val sql = "insert into province_browser_cnt(`province`,`browser`,`operator_cnt`) values(?,?,?)" //获取mysql的连接 val conn = JDBCUtil.getConn() //将如下的结果写入数据库 userRDD.map(line => ((line(6), line(7)), 1)) .reduceByKey(_ + _) .foreach(line=> { //创建ps val ps = conn.prepareStatement(sql) ps.setString(1, line._1._1) //设置省份 ps.setString(2, line._1._2) //设置浏览器 ps.setInt(3, line._2) //设置统计值 //提交 ps.execute() })
因为ps是一个PrepareStatement对象,这个对象无法序列化,而传入foreach中的对象是需要分布式传送到各个节点上,传送前先序列化,到达相应服务器上后再反序列化,PrepareStatement是个Java类,如果一个Java类想(反)序列化,必须实现Serialize接口,PrepareStatement并没有实现这个接口。
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:971) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.foreach(RDD.scala:970) at edu.nanzhi.core.useraction.UserActionExternalPro$.main(UserActionExternalPro.scala:68) at edu.nanzhi.core.useraction.UserActionExternalPro.main(UserActionExternalPro.scala) Caused by: java.io.NotSerializableException: com.mysql.cj.jdbc.DatabaseMetaData Serialization stack: - object not serializable (class: com.mysql.cj.jdbc.DatabaseMetaData, value: com.mysql.cj.jdbc.DatabaseMetaData@59c70ceb) - field (class: com.mysql.cj.jdbc.ConnectionImpl, name: dbmd, type: interface java.sql.DatabaseMetaData) - object (class com.mysql.cj.jdbc.ConnectionImpl, com.mysql.cj.jdbc.ConnectionImpl@210d2a6c) - field (class: edu.nanzhi.core.useraction.UserActionExternalPro$$anonfun$main$18, name: conn$1, type: interface java.sql.Connection) - object (class edu.nanzhi.core.useraction.UserActionExternalPro$$anonfun$main$18, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400) ... 11 more
添加collect()函数将结果数据收集到driver端。对象ps在driver端,collect后的数据也在driver端,就不需ps序列化数据传到各个节点了,因此就不会出现序列化错误。
userRDD.map(line => ((line(6), line(7)), 1))
.reduceByKey(_ + _)
.collect()
.foreach(line=> {
//创建ps
val ps = conn.prepareStatement(sql)
ps.setString(1, line._1._1) //设置省份
ps.setString(2, line._1._2) //设置浏览器
ps.setInt(3, line._2) //设置统计值
//提交
ps.execute()
})
如上代码改正后将不会再报序列化错误。
因为方案一采用collect(),如果数据量大,将会消耗大量Driver端内存,影响整个作业性能,严重者甚至会导致Driver宕掉,导致整个作业失败。所以建议采用分区循环方式foreachPartition实现。foreachPartition函数和foreach函数类似,foreachPartition是将rdd的每一个分区循环一次,而foreach函数是上游每行数据循环一次。foreachPatition可以将每个分区的数据以迭代器方式迭代操作,而foreach函数则不能,通常foreachPatition比foreach要高效。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用foreach函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用foreachPartition,那么只需要针对每一个分区建立一个connection。同时,foreachPartitioin可以迭代进行处理,但foreach里的对象仍然还是需要反序列化。
userRDD.map(line => ((line(6), line(7)), 1)) .reduceByKey(_ + _) .foreachPartition(it => { val conn = JDBCUtil.getConn() try { //获取ps对象 val ps = conn.prepareStatement(sql) //链接自动提交关闭 conn.setAutoCommit(false) //循环迭代器 it.foreach(line => { ps.setString(1, line._1._1) //设置省份 ps.setString(2, line._1._2) //设置浏览器 ps.setInt(3, line._2) //设置统计值 //提交 ps.addBatch() }) //循环完后批次执行 ps.executeBatch() //链接提交--->手动提交必须要关闭自动提交,默认事自动提交 conn.commit() } })
如上代码改正后将不会再报序列化错误,同时比方案一更加高效。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。