当前位置:   article > 正文

flink-WordCount批处理和流处理案例

wordcount批处理

N.1 编程流程简介

1)每个Flink程序都包含以下的若干流程:

(1)获取一个执行环境(Execution Environment)

(2)加载、创建初始数据(source)

(3)转换这些数据:(Transformation)

(4)放置计算结果的位置(Sink)

(5)触发程序执行(execute)

2)开发依赖

在后面 学习中 我们都使用scala编写。如下是开发用到的依赖,如果要打成jar包运行,就要上传到 flink的webUI上面,进行远行。

<dependencies>

<!-- 这是1.11.6中间版本的依赖 ,目前新版本已经很高了

但是本笔记很多代码原本是用的是1.7.2低版本,出现了很多的新功能,建议不要在用旧版的了,先试下1.11.6 ,如果报错那就用就版本吧 -->

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-scala_2.11</artifactId>

<version>1.11.6</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-scala_2.11</artifactId>

<version>1.11.6</version>

</dependency>

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-clients_2.12</artifactId>

<version>1.11.6</version>

</dependency>

<!-- scala打jar 编译要使用的类库 功能有不足的地方 不建议用了 -->

<dependency>

<groupId>org.scala-lang</groupId>

<artifactId>scala-library</artifactId>

<!-- self : 这里的版本还是和windos的sdk2.11.8的版本一样为好-->

<version>2.11.8</version>

<scope>compile</scope>

</dependency>

</dependencies>

N.2 批处理案例

import org.apache.flink.api.scala._

/**

*/

// 批处理代码

object WordCount {

def main(args: Array[String]): Unit = {

// 创建一个批处理的执行环境

val env = ExecutionEnvironment.getExecutionEnvironment

// 从文件中读取数据

val inputPath = "D:\\WorkCache_IDEA\\FlinkTutorial\\src\\main\\resources\\hello.txt"

val inputDataSet = env.readTextFile(inputPath)

// 分词之后做count

val wordCountDataSet = inputDataSet.flatMap(_.split(" "))

.map( (_, 1) )

.groupBy(0) //批处理有groupBy

.sum(1)

// 打印输出

wordCountDataSet.print()

}

}

————————————————————————

————————————————————————

N.3 流处理案例

import org.apache.flink.api.java.utils.ParameterTool

import org.apache.flink.streaming.api.scala._

object StreamWordCount {

def main(args: Array[String]): Unit = {

val params = ParameterTool.fromArgs(args)

val host: String = params.get("host")

val port: Int = params.getInt("port")

// 创建一个流处理的执行环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 接收socket数据流

val textDataStream = env.socketTextStream(host, port) // 当然 这样是可以直接输入的

// 逐一读取数据,分词之后进行wordcount

val wordCountDataStream = textDataStream.flatMap(_.split("\\s")) // 按空格分隔

.filter(_.nonEmpty)

.map( (_, 1) )

.keyBy(0) //流处理有keyBy

.sum(1)

// 打印输出

wordCountDataStream.print()

// 执行任务

env.execute("stream word count job")

}

}

运行结果:

————————————————————————

————————————————————————

————————————————————————

————————————————————————

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/562822
推荐阅读
相关标签
  

闽ICP备14008679号