赞
踩
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object ffff {
def main(args: Array[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//接受socket文本流
val inputdatastream = env.socketTextStream(hostname = "bigdata1",port = 7777)
//对数据进行转换处理
val resultdatastream = inputdatastream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))//.setParallelism(2)
.keyBy(0)
.sum(1)
//打印输出
resultdatastream.print()
//触发程序,启动任务执行程序
env.execute()
}
}
注意:在运行的代码前,在终端输入:
nc -lk 7777
如果报 -bash:nc:command not found
可以使用yum安装,安装完成再输入 nc -lk 7777
终端输入
idea输出
val resultdatastream = inputdatastream: 定义了一个新的 DataStream 变量 resultdatastream,并将其初始化为 inputdatastream。inputdatastream 是一个已经存在的 Flink DataStream。
.flatMap(_.split(" ")): 使用 flatMap 操作符对 inputdatastream 中的每个元素应用一个函数,该函数通过空格(" ")分割元素(假设元素是字符串),然后返回分割后的单词的迭代器。这会将所有的单词平坦化成一个连续的单词流。
.filter(_.nonEmpty): 使用 filter 操作符过滤掉上一步中产生的所有空字符串。只有非空字符串才会被保留在数据流中。
.map((_,1)): 使用 map 操作符将每个非空字符串映射为一个元组,该元组的第一个元素是字符串本身,第二个元素是数字 1。这通常是为了进行后续的聚合操作,比如计算每个单词的出现次数。
注意:这里原本有一行注释掉的代码 //.setParallelism(2),如果取消注释,它会设置该 DataStream 的并行度为 2,意味着 Flink 会尝试在两个并行任务上处理这个数据流。但是,由于它被注释掉了,所以不会影响当前的数据流。
.keyBy(0): 使用 keyBy 操作符根据元组的第一个元素(即单词)对流进行逻辑分区,以确保所有相同的单词都进入同一个分区,从而可以在后续操作中进行聚合。
.sum(1): 使用 sum 操作符对每个单词的所有数字 1 进行求和,从而计算出每个单词的出现次数。
最终,resultdatastream 会是一个新的 DataStream,其中包含元组 (word, count),其中 word 是输入流中的单词,count 是该单词在输入流中的出现次数。注意,这段代码是一个流处理应用程序的一部分,它需要嵌入在一个更大的 Flink 应用程序中才能运行。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。