当前位置:   article > 正文

Flink入门及实战(1)_flink入门与实战

flink入门与实战

1 Flink 和 storm,spark 对比

在这里插入图片描述


  • 要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或者窗口统计的场景,建议使用 flink

2 入门案例

创建空的 maven 工程

 <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
            <scope>provided</scope>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2.1 需求分析

手工通过 socket实时产生一些单词,使用 flink 实时接收数据,对指定时间窗口内的数据进行聚合统计。

  • 每隔1秒统计最近2秒内的数据

2.2 flink 程序开发步骤

  1. 获取执行环境;
  2. 加载/创建 初始化数据;
  3. 指定操作数据的 transaction 算子;
  4. 指定把计算好的数据放在哪里;
  5. 调用 execution 出发执行程序;

  • 注意: flink 程序是延迟计算的,只有最后调用 execute() 方法的时候才会真正出发执行程序;
  • 延迟计算的好处: 针对复杂的程序,flink 可以将其转成一个 Plan,将 Plan 作为一个整体单元执行。

package com.tzb.demo;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 滑动窗口计算
 * 通过 socket 模拟产生数据
 */
public class SocketWindowWordCountJava {

    public static void main(String[] args) throws Exception {

        // 获取端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            port = 9000;
            e.printStackTrace();
            System.err.println("没有指定端口,使用默认的端口号9000");
        }

        // 获取 flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "master";
        String delimiter = "\n";

        // 连接 socket,获取数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port,delimiter);

        // a a c
        // a 2
        // c 1
        SingleOutputStreamOperator<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String s, Collector<WordWithCount> out) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))// 时间窗口大小是 2s , 间隔是 1s
                .sum("count");// 使用 sum 或者 reduce 都可以
        /*.reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });*/

        // 数据打印到控制台,设置并行度
        windowCount.print().setParallelism(1);

        // 没有这一步,程序不执行
        env.execute("socket window count");

    }


    public static class WordWithCount{
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

在这里插入图片描述
在这里插入图片描述

  • provided 只在编译时发挥作用,但是后期打包时要加上,这里运行时先注释掉
    在这里插入图片描述
package com.tzb.demo;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * 滑动窗口计算
 * 通过 socket 模拟产生数据
 */
public class SocketWindowWordCountJava {

    public static void main(String[] args) throws Exception {

        // 获取端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            port = 9000;
            e.printStackTrace();
            System.err.println("没有指定端口,使用默认的端口号9000");
        }

        // 获取 flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "master";
        String delimiter = "\n";

        // 连接 socket,获取数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port,delimiter);

        // a a c
        // a 2
        // c 1
        SingleOutputStreamOperator<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String s, Collector<WordWithCount> out) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))// 时间窗口大小是 2s , 间隔是 1s
                .sum("count");// 使用 sum 或者 reduce 都可以
        /*.reduce(new ReduceFunction<WordWithCount>() {
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
                });*/

        // 数据打印到控制台,设置并行度
        windowCount.print().setParallelism(1);

        // 没有这一步,程序不执行
        env.execute("socket window count");

    }


    public static class WordWithCount{
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

在这里插入图片描述
在这里插入图片描述

3 单词统计(scala版本)

  • IDEA 右键添加 scala 菜单
  • 选择 scala 安装的目录
    在这里插入图片描述

在这里插入图片描述


package com.tzb.scalademo


import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time


/**
 * 每隔1s统计最近2s内的数据
 */
object SocketWindowWordCountScala {
    def main(args: Array[String]): Unit = {

        //获取端口号
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("没有设置端口,使用默认端口9000")
            }
                9000
        }

        // 获取运行环境
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // 连接 socket 获取输入数据
        val text = env.socketTextStream("master", port, '\n')

        // 解析数据,分组,窗口,窗口计算
        val windowCounts = text.flatMap(line => line.split("\\s"))
          .map(word => WordWithCount(word, 1))
          .keyBy("word")
          .timeWindow(Time.seconds(2), Time.seconds(1))
          //.sum("count")
          .reduce((a, b) => WordWithCount(a.word, a.count + b.count))

        windowCounts.print().setParallelism(1)

        // 执行任务
        env.execute("Socket window count")
    }

    case class WordWithCount(word: String, count: Long)

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

在这里插入图片描述

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/677262
推荐阅读
相关标签
  

闽ICP备14008679号