当前位置:   article > 正文

Spark-Streaming updateStateByKey用法(计算累加值)、并与kafka集成使用_sparkstreaming kafka 怎么使用累加器

sparkstreaming kafka 怎么使用累加器

说明

   Spark Streaming的updateStateByKey可以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加。

计算word count所有批次的累加值。

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.streaming.{Seconds, StreamingContext}
  4. import org.apache.spark.streaming.dstream.ReceiverInputDStream
  5. import org.apache.spark.streaming.kafka.KafkaUtils
  6. object sparkUpdateState {
  7. def main(args: Array[String]): Unit = {
  8. //由于日志信息较多,只打印错误日志信息
  9. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  10. val conf = new SparkConf().setAppName("dstream").setMaster("local[*]")
  11. val ssc = new StreamingContext(conf,Seconds(1))
  12. //使用updateStateByKey前需要设置checkpoint,将数据进行持久化保存,不然每次执行都是新的,不会与历史数据进行关联
  13. // ssc.checkpoint("f:/spark_out")
  14. //将数据保存在hdfs中
  15. ssc.checkpoint("hdfs://192.168.200.10:9000/spark_out")
  16. //与kafka做集成,使用KafkaUtils类进行参数配置
  17. val(zkQuorum,groupId,topics)=("192.168.200.10:2181","kafka_group",Map("sparkTokafka"->1))
  18. val value: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topics)
  19. //将updateFunc转换为算子
  20. val updateFunc2 = updateFunc _
  21. //统计输入的字符串,根据空格进行切割统计
  22. value.flatMap(_._2.split(" ")).map((_,1)).updateStateByKey[Int](updateFunc2).print()
  23. ssc.start()
  24. ssc.awaitTermination()
  25. }
  26. def updateFunc(seq:Seq[Int], option:Option[Int])={
  27. //sum统计一次批处理后,单词统计
  28. var sum=seq.sum;
  29. //i为已经累计的值,因为option可能为空,如果为空的话,返回的是None,所以如果为空则返回0
  30. val i = option.getOrElse(0)
  31. // 返回累加后的结果,是一个Option[Int]类型
  32. Some(sum+i)
  33. }
  34. }

运行效果:




声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/945557
推荐阅读
相关标签
  

闽ICP备14008679号