当前位置:   article > 正文

第 1 节 滑动窗口单词计数(Scala实现)_滑窗算法 统计字符串单词频率

滑窗算法 统计字符串单词频率

上篇:第 1 节 滑动窗口单词计数(Java实现)


具体代码实现:
pom文件引入:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.study.flink01</groupId>
    <artifactId>Flink_flink01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--java依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
          <!-- <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.6.1</version>
        </dependency>
</dependencies>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
package xuwei.tech

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

/**
 * 需求:滑动窗口计算
 *需要实现每隔1秒对最近2秒的数据进行汇总计算
 */
object SocketWindowWordCountScala {
  def main(args: Array[String]): Unit = {

    //获取socket端口号
        val port:Int= try {
      ParameterTool.fromArgs(args).getInt("port")
    } catch {
          case e:Exception =>{
            System.err.println("No port set.use default port 9999")
          }
          9999
    }

    //获取flink的运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //链接socket获取输入数字
  val text=  env.socketTextStream("flink102",port,'\n')
      //注意:必须要添加这一行隐式转换,否者下面的flatMap方法执行会报错
      import org.apache.flink.api.scala._

    //解析数据(把数据打平),分组、窗口计算,并且聚合求sum
   val windowCounts= text.flatMap(line =>line.split("\\s"))//打平,把每一行单词都切开
      .map(w => WordWindowCount(w,1)) //把单词转成word,1种形式
      .keyBy("word")
      .timeWindow(Time.seconds(2),Time.seconds(1))  //指定窗口大小,指定间隔时间
      .sum("count") //sum或reduce都可以
     // .reduce((a,b)=>WordWindowCount(a.word,a.count+b.count))
    windowCounts.print().setParallelism(1)  //打印到控制台

    //执行任务
    env.execute("Socket window count")
  }
  case class WordWindowCount(word:String,count: Long)

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

控制台打印的效果跟上篇:第 1 节 滑动窗口单词计数(Java实现)

的运行效果一样
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/274710
推荐阅读
相关标签