当前位置:   article > 正文

Flink (二)经典用例 WordCount 之实时流处理和批处理实验_github flink-study

github flink-study

在这里插入图片描述

批处理实验(DataSet API)

TestWordCount.java

/***
 * flink 批处理测试  从文件中读取单词,计数
 */
public class TestWordCount {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取文件
        String inputPath = "E:\\Projects\\bigdata\\flink\\flink-study\\demo01\\src\\main\\resources\\textfile.txt";
        DataSource<String> dataSource = env.readTextFile(inputPath);
        // 处理数据
        DataSet<Tuple2<String, Integer>> wordCount = dataSource
                .flatMap(new LineToWordOne())
                .groupBy(0)
                .sum(1);
        // output
        wordCount.print();
    }

    // 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
    public static class LineToWordOne implements FlatMapFunction<String, Tuple2<String,Integer>> {

        @Override
        public void flatMap(String line, Collector<Tuple2<String,Integer>> out) throws Exception {
            String[] split = line.split(" ");
            for (String wold: split){
                out.collect(new Tuple2<String,Integer>(wold,1));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

流处理实验 (DataStream API)

有界流处理

将文件中的数据读取成数据流,然后提交任务到 Flink 执行,以此来模拟有界流数据处理。

StreamWordCount .java

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 获取启动参数
        ParameterTool parameters = ParameterTool.fromArgs(args);
        boolean isNetcat = parameters.getBoolean("isNetcat");
		
        // 获取流处理环境
        StreamExecutionEnvironment streamEnv 
        	= StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        streamEnv.setParallelism(3);

		// 读取数据流
        String filePath = parameters.get("filepath");
        DataStreamSource<String> dataSource = streamEnv.readTextFile(filePath);

        // 处理流数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream 
        	= dataSource.flatMap(new LineToWordOne())
                .keyBy(0)
                .sum(1);

        resultStream.print();

        /**
         * 上面都是在定义一个数据处理的逻辑流程
         * 执行任务
         */
        streamEnv.execute();
    }

    // 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
    public static class LineToWordOne implements FlatMapFunction<String, Tuple2<String,Integer>> {

        @Override
        public void flatMap(String line, Collector<Tuple2<String,Integer>> out) throws Exception {
            String[] split = line.split(" ");
            for (String wold: split){
                out.collect(new Tuple2<String,Integer>(wold,1));
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

对比批处理的代码逻辑可以看出,流处理需要先定义好一个数据处理的流程,然后将任务提交到 Flink 中执行,数据是一个一个实时处理的,即使是有界流,也是根据定义好的逻辑读取数据流,然后攒到界限后一个一个处理,而批处理中,使用 DataSetAPI 数据先汇聚到一个数据集再分组计算:

无界流数据处理实验

修改 StreamWordCount.java 类,让它可选择的处理有界流或者无界流

package org.flink.study.practice01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 流处理 word count
 * 有界流模拟:读取文件流  (--isNetcat false)
 * 无界流模拟:利用远程Linux主机的 netcat 工具开启一个 socket 端口,实时的输入数据  命令: nc -lk 9999
 * 故障恢复后不漏算演示:关闭程序,在 netcat 开启的 socket 中继续追加输入,然后启动程序,发现程序会从之前的位置继续处理数据
 *
 * @param 配置启动参数
 *    --isNetcat true
 *    --filepath E:\Projects\bigdata\flink\flink-study\demo01\src\main\resources\textfile.txt
 *    --host 192.168.116.100
 *    --port 9999
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 获取启动参数
        ParameterTool parameters = ParameterTool.fromArgs(args);
        boolean isNetcat = parameters.getBoolean("isNetcat");

        // 获取流处理环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        streamEnv.setParallelism(3);

        DataStreamSource<String> dataSource;
        if (!isNetcat ){
            System.out.println("测试 1:从文件中读取文件流,其实还是一个有界流");
            String filePath = parameters.get("filepath");
            dataSource = streamEnv.readTextFile(filePath);
        } else {
            System.out.println("测试 2:从中 socket 中读取流,无界流,实时处理");
            String host = parameters.get("host");
            int port = parameters.getInt("port");
            dataSource = streamEnv.socketTextStream(host,port);
        }

        // 处理流数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = dataSource.flatMap(new LineToWordOne())
                .keyBy(0)
                .sum(1);

        resultStream.print();

        //执行任务
        streamEnv.execute();
    }

    // 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
    public static class LineToWordOne implements FlatMapFunction<String, Tuple2<String,Integer>> {

        @Override
        public void flatMap(String line, Collector<Tuple2<String,Integer>> out) throws Exception {
            String[] split = line.split(" ");
            for (String wold: split){
                out.collect(new Tuple2<String,Integer>(wold,1));
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

idea开发环境启动参数,使用无界流数据:

在虚拟机上开启一个 socket 监听 9999 端口,用来发数据:

nc -lk 9999

启动应用程序,向 Socket 中实时输入数据

输入数据:

程序控制台实时输出:

不漏算测试

关闭程序,在 netcat 开启的 socket 中继续追加输入,然后启动程序,发现程序会从之前的位置继续处理数据:
在这里插入图片描述
在这里插入图片描述
可以看到数据不会漏算,如果要实现故障恢复,还需要将状态定期持久化,并设置检查点。

实验代码地址

git地址:https://github.com/hubo-admin/flink-study

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/696147
推荐阅读
相关标签
  

闽ICP备14008679号