赞
踩
目录
1.首先在linux服务器上安装nc工具,nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据
yum install -y nc
2.启动一个服务端并开放9999端口,等一下往这个端口发数据
nc -lk 9999
3.发送数据
- //创建sparkConf
- val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test01")
-
- //创建SparkContext
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //创建StreamingContext
- var ssc = new StreamingContext(sc, Seconds(5))
-
- //接收数据
- val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
-
- //根据业务逻辑进行计算
- val wordCount: DStream[(String, Int)] = dataDStream.flatMap(x => x.split(" ")).map(line => (line, 1)).reduceByKey(_ + _)
-
- //打印数据
- wordCount.print()
-
- //开启计算任务
- ssc.start()
-
- //等待关闭任务
- ssc.awaitTermination()
问题:每个批次的单词次数都被正确的统计出来,但是结果不能累加
如果需要累加需要使用updateStateByKey(func)来更新状态
- def main(args: Array[String]): Unit = {
- //创建sparkConf
- val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test01")
-
- //创建SparkContext
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //创建StreamingContext
- var ssc = new StreamingContext(sc, Seconds(5))
-
- //设置临时数据存放位置
- ssc.checkpoint("./tmpCount")
-
- //接收数据
- val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
-
- //根据业务逻辑进行计算
- val wordAndOne: DStream[(String, Int)] = dataDStream.flatMap(x => x.split(" ")).map(line => (line, 1))
- val wordCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunc)
-
- //打印数据
- wordCount.print()
-
- //开启计算任务
- ssc.start()
-
- //等待关闭任务
- ssc.awaitTermination()
- }
-
- //计算的方法
- def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={
- // currentValues当前值
- // historyValue历史值
- val result: Int = currentValues.sum + historyValue.getOrElse(0)
- Some(result)
- }
滑动窗口转换操作的计算过程如下图所示,我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),比如设置滑动窗口的长度(也就是窗口的持续时间)为24H,设置滑动窗口的时间间隔(每隔多长时间执行一次计算)为1H,那么意思就是:每隔1H计算最近24H的数据
- //创建sparkConf
- val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("test01")
-
- //创建SparkContext
- val sc = new SparkContext(conf)
- sc.setLogLevel("WARN")
-
- //创建StreamingContext
- var ssc = new StreamingContext(sc, Seconds(5))
-
- //设置临时数据存放位置
- ssc.checkpoint("./tmpCount")
-
- //接收数据
- val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
-
- //根据业务逻辑进行计算
- val wordAndOne: DStream[(String, Int)] = dataDStream.flatMap(x => x.split(" ")).map(line => (line, 1))
-
- //第一个参数为需要执行的操作
- //第二个参数为窗口宽度
- //第三个参数为滑动距离
- val wordCount: DStream[(String, Int)] = wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(10),Seconds(5) )
-
- //打印数据
- wordCount.print()
-
- //开启计算任务
- ssc.start()
-
- //等待关闭任务
- ssc.awaitTermination()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。