当前位置:   article > 正文

从零开始构建第一个Flink项目_first flink

first flink

1、从零开始构建第一个Flink项目

1.1、构建项目

Flink项目依赖于Jdk8和maven环境,我们可以通过flink官方提供的maven archetype模板快速生成第一个项目

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-java \
    -DarchetypeVersion=1.11.0 \
    -DgroupId=first-flink-project \
    -DartifactId=first-flink-project \
    -Dversion=0.1 \
    -Dpackage=com.first.flink \
    -DinteractiveMode=false \
    -DarchetypeCatalog=internal 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

mvn archetype:generate 构建模板项目速度慢得令人发指,增加-X显示debug级别的调试信息
[INFO] Generating project in Batch mode
[DEBUG] Searching for remote catalog: https://repo.maven.apache.org/maven2/archetype-catalog.xml

加上-DarchetypeCatalog=internal 运行参数,archetype-catalog.xml从本地获取就能解决这个问题
通过上诉命令生成的项目结构如下:

192:first-flink-project root$ tree
.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── first
        │           └── flink
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j2.properties

7 directories, 4 files
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

pom文件中已包含flink项目所需版本的依赖Jar包,打开Intellij IEDA编辑器根据提示导入项目:
在这里插入图片描述

1.2、编写第一个项目

在第一个项目中我们以FLINK提供的wordcount为例,可以学习到Flink 核心API提供的流式Stream和批式Batch编程的基本结构。

1.2.1、批式编程

在目录 src/main/com.first.flink.examples 下创建 WordCountBatchByJava 类,首先创建执行环境ExecutionEnvironment,执行环境提供了续作的方法用于控制任务的执行,比如设置parallelism等

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  • 1

下一步创建数据源,Flink 提供了DataSet API 用于处理批量数据,

        DataSet<String> text = env.fromElements("i live flink", "i love java", "i love scala");
  • 1

接下来将DataSet通过flink提供的各种算子如flatMap、grouby、sum等进行转换

        DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
  • 1

LineSplitter 实现了 Flink 提供的 FlatMapFunction 接口。本例中我们统计每个单词出现的次数,因此需要将输入的字符串解析成单词和次数(Tuple2<String, Integer>),Tuple2 的第一个字段是单词,第二个字段是次数。通过 flatMap 算子来解析是因为输入的每一行字符串可能包含多个单词,字符串按照空格(这里比较简单)进行分解。接着grouBy(0)算子按照第一个字段(0是索引)也就是单词进行分组,sum(1)是按照第二个字段求和统计次数

    static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line,
                            Collector<Tuple2<String, Integer>> collector) throws Exception {
            for (String word : line.split(" ")) {
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

最后将数据输出到目的地控制台
ds.print();
输出结果如下:
(scala,1)
(flink,1)
(love,2)
(live,1)
(i,3)
(java,1)

1.2.2、流式编程

在目录 src/main/com.first.flink.examples 下创建 WordCountStreamByJava 类,首先创建流式Stream任务的执行环境StreamExecutionEnvironment,执行环境提供了续作的方法用于控制任务的执行,比如设置parallelism、容错机制配置、checkpoint配置等

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • 1

下一步创建流式数据源,Flink 提供了DataStream API 用于处理流式数据,这里我们设置socket数据源,从9999端口读取数据,每一条数据以 "\n"为结尾

        DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999, "\n");
        // 转化处理数据
        DataStream<WordWithCount> dataStream = source
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String line,
                                        Collector<WordWithCount> collector) throws Exception {
                        for (String word : line.split(" ")) {
                            collector.collect(new WordWithCount(word, 1));
                        }
                    }
                }).keyBy(event -> event.getWord())//以key分组统计
                .timeWindow(Time.seconds(5))//设置一个窗口函数,模拟数据流动
                .sum("count");//计算时间窗口内的词语个数

        // 输出数据到目的端
        dataStream.print();

        // 执行任务操作
        env.execute("Flink Streaming Word Count By Java");
        
 public static class WordWithCount {
        public String word;
        public int    count;
        //省略getter/setter
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

keyBy(0)按照WordWithCount类的word字段也就是单词字段进行分组,timeWindow用于设置了一个2S的固定窗口,sum(“count”)聚合函数对单词出现的次数进行求和。
最后,输出的结果是每隔5秒一次性输出5秒内每个单词出现的次数。
如果要运行程序,首先要通过netcat监听9999端口:nc -lk 9999。然后执行main函数后,在监听的端口上输入:

i love java
  • 1

输出如下结果:

2> WordWithCount{word='i', count=1}
6> WordWithCount{word='love', count=1}
2> WordWithCount{word='java', count=1}
  • 1
  • 2
  • 3

个人博客:http://www.geek-make.com

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/597942
推荐阅读
相关标签
  

闽ICP备14008679号