当前位置:   article > 正文

【Spark Streaming】Spark Streaming案例_使用 spark streaming连接master 虚拟机的9999 端口获取数据,以下选项*正确

使用 spark streaming连接master 虚拟机的9999 端口获取数据,以下选项*正确的

目录

WordCount

需求&准备

代码

updateStateByKey

reduceByKeyAndWindow


  • WordCount

 

  • 需求&准备

1.首先在linux服务器上安装nc工具,nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据

yum install -y nc

 

 

2.启动一个服务端并开放9999端口,等一下往这个端口发数据

nc -lk 9999

 

3.发送数据

 

  • 代码

  1. //创建sparkConf
  2. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test01")
  3. //创建SparkContext
  4. val sc = new SparkContext(conf)
  5. sc.setLogLevel("WARN")
  6. //创建StreamingContext
  7. var ssc = new StreamingContext(sc, Seconds(5))
  8. //接收数据
  9. val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
  10. //根据业务逻辑进行计算
  11. val wordCount: DStream[(String, Int)] = dataDStream.flatMap(x => x.split(" ")).map(line => (line, 1)).reduceByKey(_ + _)
  12. //打印数据
  13. wordCount.print()
  14. //开启计算任务
  15. ssc.start()
  16. //等待关闭任务
  17. ssc.awaitTermination()

问题:每个批次的单词次数都被正确的统计出来,但是结果不能累加

 

  • updateStateByKey

如果需要累加需要使用updateStateByKey(func)来更新状态

  1. def main(args: Array[String]): Unit = {
  2. //创建sparkConf
  3. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test01")
  4. //创建SparkContext
  5. val sc = new SparkContext(conf)
  6. sc.setLogLevel("WARN")
  7. //创建StreamingContext
  8. var ssc = new StreamingContext(sc, Seconds(5))
  9. //设置临时数据存放位置
  10. ssc.checkpoint("./tmpCount")
  11. //接收数据
  12. val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
  13. //根据业务逻辑进行计算
  14. val wordAndOne: DStream[(String, Int)] = dataDStream.flatMap(x => x.split(" ")).map(line => (line, 1))
  15. val wordCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
  16. //打印数据
  17. wordCount.print()
  18. //开启计算任务
  19. ssc.start()
  20. //等待关闭任务
  21. ssc.awaitTermination()
  22. }
  23. //计算的方法
  24. def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
  25. // currentValues当前值
  26. // historyValue历史值
  27. val result: Int = currentValues.sum + historyValue.getOrElse(0)
  28. Some(result)
  29. }

 

 

  • reduceByKeyAndWindow

滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),比如设置滑动窗口的长度(也就是窗口的持续时间)为24H,设置滑动窗口的时间间隔(每隔多长时间执行一次计算)为1H,那么意思就是:每隔1H计算最近24H的数据

  1. //创建sparkConf
  2. val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test01")
  3. //创建SparkContext
  4. val sc = new SparkContext(conf)
  5. sc.setLogLevel("WARN")
  6. //创建StreamingContext
  7. var ssc = new StreamingContext(sc, Seconds(5))
  8. //设置临时数据存放位置
  9. ssc.checkpoint("./tmpCount")
  10. //接收数据
  11. val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
  12. //根据业务逻辑进行计算
  13. val wordAndOne: DStream[(String, Int)] = dataDStream.flatMap(x => x.split(" ")).map(line => (line, 1))
  14. //第一个参数为需要执行的操作
  15. //第二个参数为窗口宽度
  16. //第三个参数为滑动距离
  17. val wordCount: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(10),Seconds(5) )
  18. //打印数据
  19. wordCount.print()
  20. //开启计算任务
  21. ssc.start()
  22. //等待关闭任务
  23. ssc.awaitTermination()

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/975639
推荐阅读
相关标签
  

闽ICP备14008679号