赞
踩
TestWordCount.java
/***
* flink 批处理测试 从文件中读取单词,计数
*/
public class TestWordCount {
public static void main(String[] args) throws Exception {
// 获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 读取文件
String inputPath = "E:\\Projects\\bigdata\\flink\\flink-study\\demo01\\src\\main\\resources\\textfile.txt";
DataSource<String> dataSource = env.readTextFile(inputPath);
// 处理数据
DataSet<Tuple2<String, Integer>> wordCount = dataSource
.flatMap(new LineToWordOne())
.groupBy(0)
.sum(1);
// output
wordCount.print();
}
// 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
public static class LineToWordOne implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] split = line.split(" ");
for (String wold: split){
out.collect(new Tuple2<String,Integer>(wold,1));
}
}
}
}
将文件中的数据读取成数据流,然后提交任务到 Flink 执行,以此来模拟有界流数据处理。
StreamWordCount .java
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 获取启动参数
ParameterTool parameters = ParameterTool.fromArgs(args);
boolean isNetcat = parameters.getBoolean("isNetcat");
// 获取流处理环境
StreamExecutionEnvironment streamEnv
= StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
streamEnv.setParallelism(3);
// 读取数据流
String filePath = parameters.get("filepath");
DataStreamSource<String> dataSource = streamEnv.readTextFile(filePath);
// 处理流数据
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream
= dataSource.flatMap(new LineToWordOne())
.keyBy(0)
.sum(1);
resultStream.print();
/**
* 上面都是在定义一个数据处理的逻辑流程
* 执行任务
*/
streamEnv.execute();
}
// 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
public static class LineToWordOne implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] split = line.split(" ");
for (String wold: split){
out.collect(new Tuple2<String,Integer>(wold,1));
}
}
}
}
对比批处理的代码逻辑可以看出,流处理需要先定义好一个数据处理的流程,然后将任务提交到 Flink 中执行,数据是一个一个实时处理的,即使是有界流,也是根据定义好的逻辑读取数据流,然后攒到界限后一个一个处理,而批处理中,使用 DataSetAPI 数据先汇聚到一个数据集再分组计算:
修改 StreamWordCount.java 类,让它可选择的处理有界流或者无界流
package org.flink.study.practice01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 流处理 word count
* 有界流模拟:读取文件流 (--isNetcat false)
* 无界流模拟:利用远程Linux主机的 netcat 工具开启一个 socket 端口,实时的输入数据 命令: nc -lk 9999
* 故障恢复后不漏算演示:关闭程序,在 netcat 开启的 socket 中继续追加输入,然后启动程序,发现程序会从之前的位置继续处理数据
*
* @param 配置启动参数
* --isNetcat true
* --filepath E:\Projects\bigdata\flink\flink-study\demo01\src\main\resources\textfile.txt
* --host 192.168.116.100
* --port 9999
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
// 获取启动参数
ParameterTool parameters = ParameterTool.fromArgs(args);
boolean isNetcat = parameters.getBoolean("isNetcat");
// 获取流处理环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
streamEnv.setParallelism(3);
DataStreamSource<String> dataSource;
if (!isNetcat ){
System.out.println("测试 1:从文件中读取文件流,其实还是一个有界流");
String filePath = parameters.get("filepath");
dataSource = streamEnv.readTextFile(filePath);
} else {
System.out.println("测试 2:从中 socket 中读取流,无界流,实时处理");
String host = parameters.get("host");
int port = parameters.getInt("port");
dataSource = streamEnv.socketTextStream(host,port);
}
// 处理流数据
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = dataSource.flatMap(new LineToWordOne())
.keyBy(0)
.sum(1);
resultStream.print();
//执行任务
streamEnv.execute();
}
// 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
public static class LineToWordOne implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String,Integer>> out) throws Exception {
String[] split = line.split(" ");
for (String wold: split){
out.collect(new Tuple2<String,Integer>(wold,1));
}
}
}
}
idea开发环境启动参数,使用无界流数据:
在虚拟机上开启一个 socket 监听 9999 端口,用来发数据:
nc -lk 9999
启动应用程序,向 Socket 中实时输入数据
输入数据:
程序控制台实时输出:
关闭程序,在 netcat 开启的 socket 中继续追加输入,然后启动程序,发现程序会从之前的位置继续处理数据:
可以看到数据不会漏算,如果要实现故障恢复,还需要将状态定期持久化,并设置检查点。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。