当前位置:   article > 正文

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

          这是仿真过程某图:

仿真场景kafkaStream
仿真实战kafka
 

 kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收

数据落地情况: 

全部接收到并all存入mysql

下面就简单分享一下StructuredStreaming代码吧

  1. import org.apache.spark.sql.functions.{col, from_json}
  2. import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
  3. import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
  4. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
  5. val spark: SparkSession = SparkSession.builder()
  6. .appName("kafkaConsumer")
  7. .master("local[3]")
  8. .getOrCreate()
  9. import spark.implicits._
  10. // 定义json字段类型格式
  11. val Jsonschmea: StructType = new StructType()
  12. .add("id", dataType = IntegerType)
  13. .add("name", dataType = StringType)
  14. .add("sorce", dataType = IntegerType)
  15. val message: DataFrame = spark.readStream // message为从kafka读到的原数据
  16. .format("kafka")
  17. .option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092")
  18. .option("subscribe", "xxxx")
  19. .option("startingOffsets", "latest")
  20. .load()
  21. // 将json字符串转化为结构化数据
  22. val streamData: DataFrame = message.selectExpr("cast(value as String) as message")
  23. .select(from_json($"message", Jsonschmea).alias("data"))
  24. // 将json结构化为新的df
  25. // 预加载mysql驱动
  26. // 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数
  27. def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {
  28. val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"
  29. val sqluser = "xxxx"
  30. val sqlpass = "xxxxx"
  31. Class.forName("com.mysql.cj.jdbc.Driver") // mysql 8.0后得驱动,旧版本去掉cj
  32. batchDF.foreachPartition {
  33. partitionOfRecords =>
  34. val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)
  35. // 关闭自动提交以支持增量写入
  36. connection.setAutoCommit(false)
  37. // 创建预编译的插入语句
  38. val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"
  39. val preparedStatement = connection.prepareStatement(insertsql)
  40. partitionOfRecords.foreach {
  41. row =>
  42. // val id = row.getAs[Int]("data.id")
  43. // val name = row.getAs[String]("data.name")
  44. // val score = row.getAs[Int]("data.sorce")
  45. val id = row.getAs[Row]("data").getAs[Int]("id")
  46. val name = row.getAs[Row]("data").getAs[String]("name")
  47. val sorce = row.getAs[Row]("data").getAs[Int]("sorce")
  48. // 设置参数到预处理sql函数中
  49. preparedStatement.setInt(1, id)
  50. preparedStatement.setString(2, name)
  51. preparedStatement.setInt(3, sorce)
  52. // 执行添加到批次操作
  53. preparedStatement.addBatch()
  54. }
  55. preparedStatement.executeBatch()
  56. connection.commit() // 执行批处理后手动提交事务
  57. preparedStatement.close() // 手动GC
  58. connection.close()
  59. }
  60. }
  61. // 数据落地到数据库
  62. streamData.writeStream
  63. .outputMode(OutputMode.Append())
  64. .foreachBatch(writeToMysql _)
  65. .trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch
  66. .start()
  67. .awaitTermination()

存储按照一定批次量做存储   

友情提示 : 上述程序是经过脱敏处理的哦

----彩蛋----

如果你看到者你会知道scala在11更新之后也就是12版本如下:

batchDF.foreachPartition {
  partitionOfRecords => ... 这个位置

 Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理

所以更改后为

batchDF.rdd.foreachPartition { partitionOfRecords => ...

而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储

ok -----

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/239600
推荐阅读
相关标签
  

闽ICP备14008679号