当前位置:   article > 正文

《Apache Flink官方文档》Quick Start

flink官方文档、

安装: 下载并开始使用Flink

Flink 可以运行在 Linux, Mac OS X和Windows上。为了运行Flink, 唯一的要求是必须在Java 7.x (或者更高版本)上安装。Windows 用户, 请查看 Flink在Windows上的安装指南。

你可以使用以下命令检查Java当前运行的版本:

java -version

如果你有安装Java 8,命令行有如下回显

  1. java version "1.8.0_111"
  2. Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
  3. Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

** 下载和解压 **

  1. 从下载页下载一个二进制的包,你可以选择任何你喜欢的Hadoop/Scala组合包。如果你计划使用文件系统,那么可以使用任何Hadoop版本。

  2. 进入下载目录

  3. 解压下载的压缩包

  1. $ cd ~/Downloads        # Go to download directory
  2. $ tar xzf flink-*.tgz   # Unpack the downloaded archive
  3. $ cd flink-1.2.0
  4. Start a Local Flink Cluster

MacOS X

对于 MacOS X 用户, Flink 可以通过Homebrew 进行安装。

  1. ~~~bash
  2. $ brew install apache-flink …
  3. $ flink –version
  4. Version: 1.2.0, Commit ID: 1c659cf ~~~

启动一个本地的Flink集群

使用如下命令启动Flink:

$ ./bin/start-local.sh  # Start Flink

通过访问http://localhost:8081检查JobManager网页,确保所有组件都已运行。网页会显示一个有效的TaskManager实例。0?wx_fmt=png

译注:本地需要有localhost 127.0.0.1的域名映射

你也可以通过检查日志目录里的日志文件来验证系统是否已经运行:

  1. $ tail log/flink-*-jobmanager-*.log
  2. INFO ... - Starting JobManager
  3. INFO ... - Starting JobManager web frontend
  4. INFO ... - Web frontend listening at 127.0.0.1:8081
  5. INFO ... - Registered TaskManager at 127.0.0.1 (akka://flink/user/taskmanager)

阅读源码

你可以在GitHub中找到SocketWindowWordCount完整的代码,有JAVA和SCALA两个版本。

Scala

  1. object SocketWindowWordCount {
  2.    def main(args: Array[String]) : Unit = {
  3.        // the port to connect to
  4.        val port: Int = try {
  5.            ParameterTool.fromArgs(args).getInt("port")
  6.        } catch {
  7.            case e: Exception => {
  8.                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
  9.                return
  10.            }
  11.        }
  12.        // get the execution environment
  13.        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14.        // get input data by connecting to the socket
  15.        val text = env.socketTextStream("localhost", port, '\n')
  16.        // parse the data, group it, window it, and aggregate the counts
  17.        val windowCounts = text
  18.            .flatMap { w => w.split("\\s") }
  19.            .map { w => WordWithCount(w, 1) }
  20.            .keyBy("word")
  21.            .timeWindow(Time.seconds(5), Time.seconds(1))
  22.            .sum("count")
  23.        // print the results with a single thread, rather than in parallel
  24.        windowCounts.print().setParallelism(1)
  25.        env.execute("Socket Window WordCount")
  26.    }
  27.    // Data type for words with count
  28.    case class WordWithCount(word: String, count: Long)
  29. }

Java

  1. public class SocketWindowWordCount {
  2.    public static void main(String[] args) throws Exception {
  3.        // the port to connect to
  4.        final int port;
  5.        try {
  6.            final ParameterTool params = ParameterTool.fromArgs(args);
  7.            port = params.getInt("port");
  8.        } catch (Exception e) {
  9.            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
  10.            return;
  11.        }
  12.        // get the execution environment
  13.        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14.        // get input data by connecting to the socket
  15.        DataStream<String> text = env.socketTextStream("localhost", port, "\n");
  16.        // parse the data, group it, window it, and aggregate the counts
  17.        DataStream<WordWithCount> windowCounts = text
  18.            .flatMap(new FlatMapFunction<String, WordWithCount>() {
  19.                @Override
  20.                public void flatMap(String value, Collector<WordWithCount> out) {
  21.                    for (String word : value.split("\\s")) {
  22.                        out.collect(new WordWithCount(word, 1L));
  23.                    }
  24.                }
  25.            })
  26.            .keyBy("word")
  27.            .timeWindow(Time.seconds(5), Time.seconds(1))
  28.            .reduce(new ReduceFunction<WordWithCount>() {
  29.                @Override
  30.                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
  31.                    return new WordWithCount(a.word, a.count + b.count);
  32.                }
  33.            });
  34.        // print the results with a single thread, rather than in parallel
  35.        windowCounts.print().setParallelism(1);
  36.        env.execute("Socket Window WordCount");
  37.    }
  38.    // Data type for words with count
  39.    public static class WordWithCount {
  40.        public String word;
  41.        public long count;
  42.        public WordWithCount() {}
  43.        public WordWithCount(String word, long count) {
  44.            this.word = word;
  45.            this.count = count;
  46.        }
  47.        @Override
  48.        public String toString() {
  49.            return word + " : " + count;
  50.        }
  51.    }
  52. }

运行例子

现在, 我们可以运行Flink 应用程序。 这个例子将会从一个socket中读一段文本,并且每隔5秒打印每个单词出现的数量。 例如 a tumbling window of processing time, as long as words are floating in.

  • 第一步, 我们可以通过netcat命令来启动本地服务。

$ nc -l 9000

提交Flink程序:

  1. $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
  2. Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
  3. Using address 127.0.0.1:6123 to connect to JobManager.
  4. JobManager web interface address http://127.0.0.1:8081
  5. Starting execution of program
  6. Submitting job with JobID: 574a10c8debda3dccd0c78a3bde55e1b. Waiting for job completion.
  7. Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#297388688]
  8. 11/04/2016 14:04:50     Job execution switched to status RUNNING.
  9. 11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to SCHEDULED
  10. 11/04/2016 14:04:50     Source: Socket Stream -> Flat Map(1/1) switched to DEPLOYING
  11. 11/04/2016 14:04:50     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to SCHEDULED
  12. 11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to DEPLOYING
  13. 11/04/2016 14:04:51     Fast TumblingProcessingTimeWindows(5000) of WindowedStream.main(SocketWindowWordCount.java:79) -> Sink: Unnamed(1/1) switched to RUNNING
  14. 11/04/2016 14:04:51     Source: Socket Stream -> Flat Map(1/1) switched to RUNNING

译者注:你也可以提交一个简单的任务examples/batch/WordCount.jar任务,也可以界面提交任务,提交前需要选择一下Entry Class。

程序连接socket并等待输入,你可以通过web界面来验证任务期望的运行结果:

0?wx_fmt=png

单词的数量在5秒的时间窗口中进行累加(使用处理时间和tumbling窗口),并打印在stdout。监控JobManager的输出文件,并在nc写一些文本(回车一行就发送一行输入给Flink) :

  1. $ nc -l 9000
  2. lorem ipsum
  3. ipsum ipsum ipsum
  4. bye

译者注:mac下使用命令nc -l -p 9000来启动监听端口,如果有问题可以telnet localhost 9000看下监听端口是否已经启动,如果启动有问题可以重装netcat ,使用命令brew install netcat

.out文件将被打印每个时间窗口单词的总数:

  1. $ tail -f log/flink-*-jobmanager-*.out
  2. lorem : 1
  3. bye : 1
  4. ipsum : 4

使用以下命令来停止Flink:

$ ./bin/stop-local.sh

下一步

Check out更多的例子来熟悉Flink的编程API。 当你完成这些,可以继续阅读streaming指南。

(本文完)

并发编程网欢迎大家投稿,要求原创和,领域包括但不限于Java,分布式,架构,大数据,项目管理和软技能等,赞赏所得归作者所有,有兴趣的同学可以把文章发送到tengfei@ifeve.com。


0?wx_fmt=jpeg

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/772723
推荐阅读
相关标签
  

闽ICP备14008679号