当前位置:   article > 正文

流处理实现WordCount_streamexecutionenvironment wordcount

streamexecutionenvironment wordcount

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}

object ffff {

      def main(args: Array[String]): Unit = {

    //创建执行环境

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //接受socket文本流

    val inputdatastream = env.socketTextStream(hostname = "bigdata1",port = 7777)

    //对数据进行转换处理

    val resultdatastream = inputdatastream

      .flatMap(_.split(" "))

      .filter(_.nonEmpty)

      .map((_,1))//.setParallelism(2)

      .keyBy(0)

      .sum(1)

    //打印输出

    resultdatastream.print()

    //触发程序,启动任务执行程序

    env.execute()

  }

}

注意:在运行的代码前,在终端输入:

nc -lk 7777

如果报 -bash:nc:command not found

可以使用yum安装,安装完成再输入 nc -lk 7777

终端输入

idea输出

  1. val resultdatastream = inputdatastream: 定义了一个新的 DataStream 变量 resultdatastream,并将其初始化为 inputdatastream。inputdatastream 是一个已经存在的 Flink DataStream。

  2. .flatMap(_.split(" ")): 使用 flatMap 操作符对 inputdatastream 中的每个元素应用一个函数,该函数通过空格(" ")分割元素(假设元素是字符串),然后返回分割后的单词的迭代器。这会将所有的单词平坦化成一个连续的单词流。

  3. .filter(_.nonEmpty): 使用 filter 操作符过滤掉上一步中产生的所有空字符串。只有非空字符串才会被保留在数据流中。

  4. .map((_,1)): 使用 map 操作符将每个非空字符串映射为一个元组,该元组的第一个元素是字符串本身,第二个元素是数字 1。这通常是为了进行后续的聚合操作,比如计算每个单词的出现次数。

    注意:这里原本有一行注释掉的代码 //.setParallelism(2),如果取消注释,它会设置该 DataStream 的并行度为 2,意味着 Flink 会尝试在两个并行任务上处理这个数据流。但是,由于它被注释掉了,所以不会影响当前的数据流。

  5. .keyBy(0): 使用 keyBy 操作符根据元组的第一个元素(即单词)对流进行逻辑分区,以确保所有相同的单词都进入同一个分区,从而可以在后续操作中进行聚合。

  6. .sum(1): 使用 sum 操作符对每个单词的所有数字 1 进行求和,从而计算出每个单词的出现次数。

最终,resultdatastream 会是一个新的 DataStream,其中包含元组 (word, count),其中 word 是输入流中的单词,count 是该单词在输入流中的出现次数。注意,这段代码是一个流处理应用程序的一部分,它需要嵌入在一个更大的 Flink 应用程序中才能运行。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/562829
推荐阅读
相关标签
  

闽ICP备14008679号