当前位置:   article > 正文

Flink入门及运行_flink collect 是如何运行的

flink collect 是如何运行的

入门级程序


public class SocketWindowWordCountJava {

    public static void main(String[] args) throws Exception {
        //获取 flink 的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String host="localhost";
        int port=9000;
        String delimiter="\n";
        //
        DataStreamSource<String> text = env.socketTextStream(host, port, delimiter);

        SingleOutputStreamOperator<WordCount> wcs = text.flatMap(new FlatMapFunction<String, WordCount>() {
            @Override
            public void flatMap(String value, Collector<WordCount> out) throws Exception {
                String[] vs = value.split("\\s");
                for (String word : vs) {
                    out.collect(new WordCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))
                .sum("count");

        wcs.print().setParallelism(1);

        //下面这行很重要,不然程序不会执行
        env.execute(" word count...");
    }

    @Data
    @NoArgsConstructor
    public static class WordCount{
        String word;
        long  count;

        public WordCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

1.在终端输入 nc -l 9000
2.启动程序
3.在终端输入单词
在这里插入图片描述

如何将程序打包,放到 flink 运行呢?

1.打包


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>2.7</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <archive>
                        <manifestEntries>
                            <Main-Class>frame.flink.learn.SocketWindowWordCountJava</Main-Class>
                        </manifestEntries>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2.启动 flink 程序
下载 flink 程序包。flink-1.6.1-bin-hadoop27-scala_2.11.tgz,然后解压
进入到 bin 目录 运行下面的命令:
./start-cluster.sh

./flink run ~/Desktop/hadoop-stream-frame-0.0.1-SNAPSHOT.jar --port 9000

3.查看运行结果
在终端输入文字以后,运行下面的命令 查看 日志:
tail -500f …/log/flink-yuzhihao-taskexecutor-0-xxx.out

在这里插入图片描述

也可以在 Flink web 页面查看任务运行状态:
http://localhost:8081/#/running-jobs
在这里插入图片描述

也可以上传 jar 包 到 flink

在这里插入图片描述

查看运行结果
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/560601
推荐阅读
相关标签
  

闽ICP备14008679号