当前位置:   article > 正文

大数据(9a)Flink入门Java代码Windows和Linux上运行_!我明白了,executewithdispatcher() 方法在 flink 1.14 版本中被弃

!我明白了,executewithdispatcher() 方法在 flink 1.14 版本中被弃用并移除了。您可以通过使用新的方法 `

1、Flink简介

  • Apache Flink是为
    分布式、高性能、随时可用以及准确的流处理应用程序
    打造的开源流处理框架

1.1、事件驱动(event-driven)

  • 事件驱动型应用是一类具有状态的应用
    它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作

事件驱动型应用

1.2、无界数据流 和 有界数据流

  • 在Spark世界观中,一切皆由批次组成,离线数据是大批次,实时数据是小批次
  • 在Flink世界观中,一切皆由流组成,离线数据是有界限的流,实时数据是一个没有界限的流

1.3、分层API

Flink的分层API

  • 高等级API更容易使用,低等级API更灵活
  • DataStream API 较为常用

2、Windows环境上跑Flink

Flink应用程序支持批和流式处理分析

2.1、创建Maven工程,导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.2、代码示例:词频统计

2.2.1、自定义数据处理函数

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.2.2、批处理

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

public class WordCountBatch {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取数据
        DataSet<String> inputDataSet = env.readTextFile("src/main/resources/a.txt");
        // 分词,平化,分组,合计
        DataSet<Tuple2<String, Integer>> wordCountDataSet =
                inputDataSet.flatMap(new MyFlatMap())
                        .groupBy(0)  // 对Tuple2的0号位置进行分组
                        .sum(1);  // 对Tuple2的1号位置进行求和
        // 打印输出
        wordCountDataSet.print();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

打印结果
(dd,3)
(aa,4)
(bb,1)
(cc,2)

2.2.3、流式处理

2.2.3.1、读本地文件
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCountStream {
    public static void main(String[] args) throws Exception {
        // 流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取数据
        DataStream<String> inputDataStream = env.readTextFile("src/main/resources/a.txt");
        // 分词,平化,分组,合计
        DataStream<Tuple2<String, Integer>> wordCountDataStream = inputDataStream
                .flatMap(new MyFlatMap())
                .keyBy(0)
                .sum(1);
        // 打印,设置并行度
        wordCountDataStream.print().setParallelism(1);
        // 执行
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

打印结果
(dd,1)
(cc,1)
(dd,2)
(cc,2)
(dd,3)
(aa,1)
(aa,2)
(aa,3)
(aa,4)
(bb,1)

2.2.3.2、读取网络数据

虚拟机输入命令

nc -lk 7777
  • 1

运行下面Java代码(只改了第10行)

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCountStreamSocket {
    public static void main(String[] args) throws Exception {
        // 流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从网络中读取数据
        DataStream<String> inputDataStream = env.socketTextStream("hadoop100", 7777);
        // 分词,平化,分组,合计
        DataStream<Tuple2<String, Integer>> wordCountDataStream = inputDataStream
                .flatMap(new MyFlatMap())
                .keyBy(0)
                .sum(1);
        // 打印,设置并行度
        wordCountDataStream.print().setParallelism(1);
        // 执行
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

打印结果

3、Linux上跑Flink

3.1、下载、解压

https://archive.apache.org/dist/flink/

tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C $B_HOME/
cd $B_HOME
mv flink-1.10.1 flink
  • 1
  • 2
  • 3

3.2、环境变量

https://yellow520.blog.csdn.net/article/details/112692486

export FLINK_HOME=$B_HOME/flink
  • 1

3.3、打包上传一个Flink代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        String[] words = value.split(" ");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class WordCount {
    public static void main(String[] args) throws Exception {
        // 流执行环境
        StreamExecutionEnvironment e = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从网络中读取数据
        DataStream<String> inputDataStream = e.socketTextStream("localhost", 7777);
        // 分词,平化,分组,合计
        DataStream<Tuple2<String, Integer>> ds;
        ds = inputDataStream.flatMap(new MyFlatMap()).keyBy(0).sum(1);
        // 打印,设置并行度
        ds.print();
        // 执行
        e.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

3.4、开启网络数据传输端口

nc -lk 7777
  • 1

4、Flink的YARN模式(续3)

把Flink的Hadoop包放到lib

cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar $FLINK_HOME/lib
ll $FLINK_HOME/lib
  • 1
  • 2

启动Hadoop

hadoop.py start
  • 1

Flink的YARN模式架构图

4.1、会话模式(Session Mode)

在YARN中初始化一个Flink集群,开辟指定的资源,以后提交任务都向这里提交。
这个flink集群会常驻在YARN集群中

4.1.1、开启会话

$FLINK_HOME/bin/yarn-session.sh \
-s 2 \
-jm 1024 \
-tm 1024 \
-nm a1 \
-d
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
参数说明
-s(--slots)每个TaskManagerslot数量
-jmJobManager的内存(单位MB)
-tm每个TaskManager的内存(单位MB)
-nmYARN上应用程序名字
-d后台执行

4.1.2、运行jar包

$FLINK_HOME/bin/flink run -c WordCount FlinkPractise-1.0-SNAPSHOT.jar
  • 1

结果

如图示5个单词,Records也是5

4.1.3、关闭会话

yarn application --list
yarn application --kill application_1626235724262_0001
  • 1
  • 2

4.2、任务独立提交模式(Per-Job Cluster Mode)

每次提交都会创建一个新的Flink集群,任务之间互相独立
任务执行完成之后创建的集群也会消失

多了个-m yarn-cluster

$FLINK_HOME/bin/flink run \
-m yarn-cluster \
-c WordCount \
FlinkPractise-1.0-SNAPSHOT.jar
  • 1
  • 2
  • 3
  • 4

Appendix

en
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小桥流水78/article/detail/900848
推荐阅读
相关标签