当前位置:   article > 正文

Flink入门搭建_flink搭建

flink搭建

一、Flink简介

  • 1.Flink的引入

大数据的计算引擎的发展过程:

  1. 第一代计算引擎:Hadoop的MapReduce,批处理
  2. 第二代计算引擎:支持DAG(有向无环图)的框架,如Tez和Oozie,批处理
  3. 第三代计算引擎:spark,内存计算,支持批处理和流处理,Job内部的DAG支持(不跨域job),比MR快100倍
  4. 第四代:Flink批处理、流处理,SQL高层api,自带DAG流式计算性能更高、更可靠什么是

Flink是一个框架和分布式处理引擎,用于对无界(流)和有界(批)数据流进行有状态计算。Fink可通过集群以内存进行任意规模的计算。

  • 3.Flink的特性

  1. 高吞吐、低延迟、高性能
  2. 支持带有事件事件的窗口(window)操作
  3. 支持有状态的计算
  4. 内存计算
  5. 迭代计算

  • 4.Flink的基石

  1. Checkpoint:校验点
  2. State:状态
  3. Time:时间
  4. Window:窗口
  • 5.批处理和流处理

  1. 批处理:有界、持久、大量,Spark SQL,Flink DataSet
  2. 流处理:无界、实时、持续,Spark Streaming,Flink DataStream

 二、Flink架构

1.JobManager

也叫做Master,用于协调分布式执行,它来调度任务(task),协调检查点(checkpoint),协调失败时的恢复。可配置高可用。只有一台是leader,其他的为standby。

2.TaskManage

也叫做Worker,用于执行一个dataflow的task、数据缓冲和data stream的交换,至少得有一个worker。

3.Flink的数据流编程模型

  • 三、Flink集群搭建

1.安装模式

  1. local(本地):单机模式,一般不用
  2. standalone:独立模式,flink自带集群,开发测试环境使用
  3. yarn:计算资源统一由Hadoop YARN管理,生产环境

2.基础环境

  1. jdk 1.8
  2. ssh免密登录(集群内节点之间免密登录)

3.local模式安装

  • 3.1下载安装包

  • 3.2解压

  • 3.3配置环境变量

source /etc/profile 

  • 3.4启动Scala shell交互界面

  • 3.5命令行示例-单词计数

  • 1.准备好数据文件:word.txt,放在/root下

  • 2.执行命令

  • 3.6启动Flink本地“集群”

  • 3.7查看Flink的web ui

  • 3.8集群运行测试任务-单词计数

查看结果

  • 3.9web ui查看执行情况

  • 3.10Flin本地(local)模式的原理

4.standalone模式安装

4.1集群规划

JobManager(master):hadoop01

TaskManager(worker):hadoop01,hadop02,hadoop03

4.2下载安装包

4.3解压

4.4配置环境变量

  

4.5环境变量生效

4.6修改Flink的配置文件

flink-conf.yaml

 

注意:keyvalue之间(:后)必须得有一个空格

masters

workers

4.7在环境配置文件中添加

vi /etc/profile

4.8分发

4.9使环境变量起作用

source /etc/profile

4.10启动集群并查看进程

4.11启动历史服务器

4.12Flink UI界面

4.13历史任务web UI界面

4.14测试

启动hadoop集群,如果配置了Hadoop HA,需要先启动zookeeper

flink run examples/batch/WordCount.jar --input hdfs://hadoop01:9000/wordcount/input/word.txt --output hdfs://hadoop01:9000/wordcount/output/result.txt --parallelism 2

 

4.15查看历史任务

http://hadoop01:50070/explorer.html#/flink/completed-jobs

http://hadoop01:8082/#/overview

 

4.16停止集群

stop-cluster.sh

4.17工作原理

5.Standalone-HA模式安装

 5.1集群规划

  1. JobManager(master):hadoop01,hadoop02
  2. TaskManager(worker):hadoop01,hadop02,hadoop03

5.2启动zookeeper

5.3启动hadoop集群

start-dfs.sh

5.4停止flink集群

 stop-cluster.sh

5.5修改flink配置文件

  1. 最终的配置文件
  2. ################################################################################
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements. See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership. The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License. You may obtain a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. ################################################################################
  19. #==============================================================================
  20. # Common
  21. #==============================================================================
  22. # The external address of the host on which the JobManager runs and can be
  23. # reached by the TaskManagers and any clients which want to connect. This setting
  24. # is only used in Standalone mode and may be overwritten on the JobManager side
  25. # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
  26. # In high availability mode, if you use the bin/start-cluster.sh script and setup
  27. # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
  28. # automatically configure the host name based on the hostname of the node where the
  29. # JobManager runs.
  30. jobmanager.rpc.address: hadoop01
  31. # The RPC port where the JobManager is reachable.
  32. jobmanager.rpc.port: 6123
  33. # The total process memory size for the JobManager.
  34. #
  35. # Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead.
  36. jobmanager.memory.process.size: 1600m
  37. # The total process memory size for the TaskManager.
  38. #
  39. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.
  40. taskmanager.memory.process.size: 1728m
  41. # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
  42. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
  43. #
  44. # taskmanager.memory.flink.size: 1280m
  45. # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
  46. taskmanager.numberOfTaskSlots: 2
  47. # The parallelism used for programs that did not specify and other parallelism.
  48. parallelism.default: 1
  49. # The default file system scheme and authority.
  50. #
  51. # By default file paths without scheme are interpreted relative to the local
  52. # root file system 'file:///'. Use this to override the default and interpret
  53. # relative paths relative to a different file system,
  54. # for example 'hdfs://mynamenode:12345'
  55. #
  56. # fs.default-scheme
  57. #==============================================================================
  58. # High Availability
  59. #==============================================================================
  60. # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
  61. #
  62. high-availability: zookeeper
  63. # The path where metadata for master recovery is persisted. While ZooKeeper stores
  64. # the small ground truth for checkpoint and leader election, this location stores
  65. # the larger objects, like persisted dataflow graphs.
  66. #
  67. # Must be a durable file system that is accessible from all nodes
  68. # (like HDFS, S3, Ceph, nfs, ...)
  69. #
  70. high-availability.storageDir: hdfs://hadoop01:9000/flink/ha/
  71. # The list of ZooKeeper quorum peers that coordinate the high-availability
  72. # setup. This must be a list of the form:
  73. # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
  74. #
  75. high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181
  76. # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
  77. # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
  78. # The default value is "open" and it can be changed to "creator" if ZK security is enabled
  79. #
  80. # high-availability.zookeeper.client.acl: open
  81. #==============================================================================
  82. # Fault tolerance and checkpointing
  83. #==============================================================================
  84. # The backend that will be used to store operator state checkpoints if
  85. # checkpointing is enabled.
  86. #
  87. # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
  88. # <class-name-of-factory>.
  89. #
  90. state.backend: filesystem
  91. # Directory for checkpoints filesystem, when using any of the default bundled
  92. # state backends.
  93. #
  94. state.checkpoints.dir: hdfs://hadoop01:9000/flink-checkpoints
  95. # Default target directory for savepoints, optional.
  96. #
  97. state.savepoints.dir: hdfs://hadoop01:9000/flink-savepoints
  98. # Flag to enable/disable incremental checkpoints for backends that
  99. # support incremental checkpoints (like the RocksDB state backend).
  100. #
  101. # state.backend.incremental: false
  102. # The failover strategy, i.e., how the job computation recovers from task failures.
  103. # Only restart tasks that may have been affected by the task failure, which typically includes
  104. # downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption.
  105. jobmanager.execution.failover-strategy: region
  106. #==============================================================================
  107. # Rest & web frontend
  108. #==============================================================================
  109. # The port to which the REST client connects to. If rest.bind-port has
  110. # not been specified, then the server will bind to this port as well.
  111. #
  112. #rest.port: 8081
  113. # The address to which the REST client will connect to
  114. #
  115. #rest.address: 0.0.0.0
  116. # Port range for the REST and web server to bind to.
  117. #
  118. #rest.bind-port: 8080-8090
  119. # The address that the REST & web server binds to
  120. #
  121. #rest.bind-address: 0.0.0.0
  122. # Flag to specify whether job submission is enabled from the web-based
  123. # runtime monitor. Uncomment to disable.
  124. web.submit.enable: true
  125. #==============================================================================
  126. # Advanced
  127. #==============================================================================
  128. # Override the directories for temporary files. If not specified, the
  129. # system-specific Java temporary directory (java.io.tmpdir property) is taken.
  130. #
  131. # For framework setups on Yarn or Mesos, Flink will automatically pick up the
  132. # containers' temp directories without any need for configuration.
  133. #
  134. # Add a delimited list for multiple directories, using the system directory
  135. # delimiter (colon ':' on unix) or a comma, e.g.:
  136. # /data1/tmp:/data2/tmp:/data3/tmp
  137. #
  138. # Note: Each directory entry is read from and written to by a different I/O
  139. # thread. You can include the same directory multiple times in order to create
  140. # multiple I/O threads against that directory. This is for example relevant for
  141. # high-throughput RAIDs.
  142. #
  143. # io.tmp.dirs: /tmp
  144. # The classloading resolve order. Possible values are 'child-first' (Flink's default)
  145. # and 'parent-first' (Java's default).
  146. #
  147. # Child first classloading allows users to use different dependency/library
  148. # versions in their application than those in the classpath. Switching back
  149. # to 'parent-first' may help with debugging dependency issues.
  150. #
  151. # classloader.resolve-order: child-first
  152. # The amount of memory going to the network stack. These numbers usually need
  153. # no tuning. Adjusting them may be necessary in case of an "Insufficient number
  154. # of network buffers" error. The default min is 64MB, the default max is 1GB.
  155. #
  156. # taskmanager.memory.network.fraction: 0.1
  157. # taskmanager.memory.network.min: 64mb
  158. # taskmanager.memory.network.max: 1gb
  159. #==============================================================================
  160. # Flink Cluster Security Configuration
  161. #==============================================================================
  162. # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
  163. # may be enabled in four steps:
  164. # 1. configure the local krb5.conf file
  165. # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
  166. # 3. make the credentials available to various JAAS login contexts
  167. # 4. configure the connector to use JAAS/SASL
  168. # The below configure how Kerberos credentials are provided. A keytab will be used instead of
  169. # a ticket cache if the keytab path and principal are set.
  170. # security.kerberos.login.use-ticket-cache: true
  171. # security.kerberos.login.keytab: /path/to/kerberos/keytab
  172. # security.kerberos.login.principal: flink-user
  173. # The configuration below defines which JAAS login contexts
  174. # security.kerberos.login.contexts: Client,KafkaClient
  175. #==============================================================================
  176. # ZK Security Configuration
  177. #==============================================================================
  178. # Below configurations are applicable if ZK ensemble is configured for security
  179. # Override below configuration to provide custom ZK service name if configured
  180. # zookeeper.sasl.service-name: zookeeper
  181. # The configuration below must match one of the values set in "security.kerberos.login.contexts"
  182. # zookeeper.sasl.login-context-name: Client
  183. #==============================================================================
  184. # HistoryServer
  185. #==============================================================================
  186. # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
  187. # Directory to upload completed jobs to. Add this directory to the list of
  188. # monitored directories of the HistoryServer as well (see below).
  189. jobmanager.archive.fs.dir: hdfs://hadoop01:9000/flink/completed-jobs/
  190. # The address under which the web-based HistoryServer listens.
  191. historyserver.web.address: hadoop01
  192. # The port under which the web-based HistoryServer listens.
  193. historyserver.web.port: 8082
  194. # Comma separated list of directories to monitor for completed jobs.
  195. historyserver.archive.fs.dir: hdfs://hadoop01:9000/flink/completed-jobs/
  196. # Interval in milliseconds for refreshing the monitored directories.
  197. #historyserver.archive.fs.refresh-interval: 10000

5.6修改master

5.7修改workers

不用修改

分发

5.8同步配置文件

5.10重新启动flink集群

5.11查看进程

1.发现没有相关进程被启动,是因为缺少flink整合hadoop的jar包,需要从flink官网下载

2.放入lib目录

3.分发其他的节点上对应的flink下的lib目录

4.重新启动Flink集群,查看进程

 

5.12测试

1.访问webui

http://hadoop01:8081/#/overview

http://hadoop02:8081/#/overview

2.执行wc

flink run /export/servers/flink-1.12.2/examples/batch/WordCount.jar

 

3.kill掉其中一个master

 

4.重新执行wc,看是否能正常执行 

5.13停止集群

5.14工作原理 

 

 6 .Flink on yarn安装

6.1 介绍

  1. 资源可以按需使用,提高集群的资源利用率
  2. 任务有优先级,可以根据优先级运行作业
  3. 基于Yarn调度系统,能够自动化的处理各个角色的容错(FailOver)

6.2 集群规划

可以根据standalone保持一致

6.3 配置yarn

关闭yarn的内存检查,需要修改hadoop集群yarn的配置文件yarn-site.xml

  1. <property>
  2. <name>yarn.nodemanager.pmeme-check-enabled</name>
  3. <value>false</value>
  4. </property>
  5. <property>
  6. <name>yarn.nodemanager.vmeme-check-enabled</name>
  7. <value>false</value>
  8. </property>

 

分发

scp /export/servers/hadoop/etc/hadoop/yarn-site.xml hadoop02:/export/servers/hadoop/etc/hadoop/

scp /export/servers/hadoop/etc/hadoop/yarn-site.xml hadoop03:/export/servers/hadoop/etc/hadoop/

6.4 启动yarn 

start-yarn.sh

 6.5 测试

提交任务有两种模式,session模式和per-job模式

(1)Session模式

1.开启会话(资源)

yarn-session.sh -n 2-tm 800 -s 1 –d

说明:-n 表示申请的容器,也就是worker的数量,也即cpu的数量

-tm:表示每个workerTaskManager)的内存的大小

-s:表示每个workerslot的数量

-d:表示后台运行

 

2.查看UI界面

 

3.提交任务

flink run /export/servers/flink-1.12.2/examples/batch/WordCount.jar

 

 

 

 4.再次提交一个任务

 

Session一直存在

 

 5.关闭yarn-session

yarn application -kill application_1620298469313_0001

 

(2)Per-Job模式 

1.直接提交job

flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/servers/flink-1.12.2/examples/batch/WordCount.jar

说明:

-m:jobmanager的地址

-yjm:jobmanage的内存大小

-ytm:taskmanager的内存大小

2.查看UI界面

 

3.再次提交Job

flink run -m yarn-cluster -yjm 1024 -ytm 1024 /export/servers/flink-1.12.2/examples/batch/WordCount.jar 

(3)参数总结 

flink run –help

四、DataSet开发

进行批处理操作,但是现在不推荐使用了。


1.入门案例

实现wordcount单词计数

1.1 构建工程

 

 

 

1.2 pom文件

 

1.3 建立包和类 

1.4 代码实现

  1. package cn.edu.hgu.flnkl;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.common.operators.Order;
  5. import org.apache.flink.api.java.DataSet;
  6. import org.apache.flink.api.java.ExecutionEnvironment;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. import scala.Int;
  11. /**
  12. * @desc 使用flink的dataset api实现单词计数(已经不推荐使用)
  13. * @author 007
  14. * @date 2021-5-9 m母亲节
  15. */
  16. public class WordCount {
  17. public static void main(String args[]) throws Exception {
  18. //System.out.println("hello,flnk!");
  19. //1、准备环境-env
  20. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//单例模式
  21. //2、准备数据-source
  22. DataSet<String> lineDS = env.fromElements("spark sqoop hadoop","spark flink","hadoop fink spark");
  23. //3、处理数据-transformation
  24. //3.1 将每一行数据切分成一个个的单词组成一个集合
  25. DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
  26. @Override
  27. public void flatMap(String s, Collector<String> collector) throws Exception {
  28. //s就是一行行的数据,再将每一行分割为一个个的单词
  29. String[] words = s.split(" ");
  30. for (String word : words) {
  31. //将切割的单词收集起来并返回
  32. collector.collect(word);
  33. }
  34. }
  35. });
  36. //3.2 对集合中的每个单词记为1
  37. DataSet<Tuple2<String,Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  38. @Override
  39. public Tuple2<String, Integer> map(String s) throws Exception {
  40. //s就是进来的一个个单词,再跟1组成一个二元组
  41. return Tuple2.of(s,1);
  42. }
  43. });
  44. //3.3 对数据按照key进行分组
  45. UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOnesDS.groupBy(0);
  46. //3.4 对各个组内的数据按照value进行聚合也就是求sum
  47. DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
  48. //3.4 对结果排序
  49. DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
  50. //4、输出结果-sink
  51. result.print();
  52. //5、触发执行-execute
  53. //说明:如果有pring那么Dataset不需要调用excute,DataStream需要调用execute
  54. }
  55. }

 1.5 运行,查看结果

2.基于DataStream改写代码,执行并查看结果

完整代码

  1. package cn.edu.hgu.flnkl;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.common.operators.Order;
  6. import org.apache.flink.api.java.DataSet;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.streaming.api.datastream.DataStream;
  10. import org.apache.flink.streaming.api.datastream.KeyedStream;
  11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  12. import org.apache.flink.util.Collector;
  13. /**
  14. * @desc 用flink的dataStream api实现单词计数
  15. * @author 007
  16. * @date 2021-5-9 母亲节
  17. */
  18. public class WordCountStream {
  19. public static void main(String args[]) throws Exception {
  20. //1、准备环境-env
  21. //新版本的流批统一api,既支持流处理也指出批处理
  22. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. //批处理模式//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  24. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//流处理模式
  25. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动选择处理模式
  26. //2、准备数据-source
  27. DataStream<String> lineDS = env.fromElements("spark sqoop hadoop","spark flink","hadoop fink spark");
  28. //3、处理数据-transformation
  29. //3.1 将每一行数据切分成一个个的单词组成一个集合
  30. DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
  31. @Override
  32. public void flatMap(String s, Collector<String> collector) throws Exception {
  33. //s就是一行行的数据,再将每一行分割为一个个的单词
  34. String[] words = s.split(" ");
  35. for (String word : words) {
  36. //将切割的单词收集起来并返回
  37. collector.collect(word);
  38. }
  39. }
  40. });
  41. //3.2 对集合中的每个单词记为1
  42. DataStream<Tuple2<String,Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  43. @Override
  44. public Tuple2<String, Integer> map(String s) throws Exception {
  45. //s就是进来的一个个单词,再跟1组成一个二元组
  46. return Tuple2.of(s,1);
  47. }
  48. });
  49. //3.3 对数据按照key进行分组
  50. //UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOnesDS.groupBy(0);
  51. KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOnesDS.keyBy(t->t.f0);
  52. //3.4 对各个组内的数据按照value进行聚合也就是求sum
  53. DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);
  54. //3.5 对结果排序
  55. //DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
  56. //4、输出结果-sink
  57. result.print();
  58. //5、触发执行-execute
  59. //说明:如果有print那么Dataset不需要调用execute,DataStream需要调用execute
  60. env.execute();
  61. }
  62. }

 执行并查看结果

3.在yarn上运行

3.1 修改代码

  1. package cn.edu.hgu.flnkl;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.api.java.utils.ParameterTool;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.KeyedStream;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.util.Collector;
  11. /**
  12. * @desc 用flink的dataStream java api实现单词计数,在yarn上运行
  13. * @author 007
  14. * @date 2021-5-13 母亲节
  15. */
  16. public class WordCountYarn {
  17. public static void main(String args[]) throws Exception {
  18. //0、获取参数
  19. //获取命令行参数
  20. ParameterTool params = ParameterTool.fromArgs(args);
  21. String output = null;
  22. if (params.has("output")) {//如果命令行中指定了输出文件夹参数,则用这个参数
  23. output = params.get("output");
  24. } else {//如果没有指定输出文件夹参数,则指定默认的输出文件夹
  25. output = "hdfs://hadoop01:9000/wordcount/output_" +System.currentTimeMillis(); //避免输出文件夹重名
  26. }
  27. //1、准备环境-env
  28. //新版本的流批统一api,既支持流处理也指出批处理
  29. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  30. //批处理模式//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  31. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//流处理模式
  32. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动选择处理模式
  33. //2、准备数据-source
  34. DataStream<String> lineDS = env.fromElements("spark sqoop hadoop","spark flink","hadoop fink spark");
  35. //3、处理数据-transformation
  36. //3.1 将每一行数据切分成一个个的单词组成一个集合
  37. DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
  38. @Override
  39. public void flatMap(String s, Collector<String> collector) throws Exception {
  40. //s就是一行行的数据,再将每一行分割为一个个的单词
  41. String[] words = s.split(" ");
  42. for (String word : words) {
  43. //将切割的单词收集起来并返回
  44. collector.collect(word);
  45. }
  46. }
  47. });
  48. //3.2 对集合中的每个单词记为1
  49. DataStream<Tuple2<String,Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  50. @Override
  51. public Tuple2<String, Integer> map(String s) throws Exception {
  52. //s就是进来的一个个单词,再跟1组成一个二元组
  53. return Tuple2.of(s,1);
  54. }
  55. });
  56. //3.3 对数据按照key进行分组
  57. //UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOnesDS.groupBy(0);
  58. KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOnesDS.keyBy(t->t.f0);
  59. //3.4 对各个组内的数据按照value进行聚合也就是求sum
  60. DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
  61. //3.5 对结果排序
  62. // DataStream<Tuple2<String,Integer>> result = aggResult..sortPartition(1, Order.DESCENDING).setParallelism(1);
  63. //4、输出结果-sink
  64. aggResult.print();
  65. aggResult.writeAsText(output).setParallelism(1);
  66. //5、触发执行-execute
  67. //说明:如果有print那么Dataset不需要调用execute,DataStream需要调用execute
  68. env.execute();
  69. }
  70. }

3.2 打jar包,参考博文

可视化方式

Flink实例-Wordcount详细步骤 - 紫轩弦月 - 博客园

修改pom文件,添加构建jar包的插件

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.plugins</groupId>
  5. <artifactId>maven-compiler-plugin</artifactId>
  6. <configuration>
  7. <source>1.8</source>
  8. <target>1.8</target>
  9. </configuration>
  10. </plugin>
  11. <plugin>
  12. <groupId>org.apache.maven.plugins</groupId>
  13. <artifactId>maven-jar-plugin</artifactId>
  14. <configuration>
  15. <archive>
  16. <manifest>
  17. <addClasspath>true</addClasspath>
  18. <useUniqueVersions>false</useUniqueVersions>
  19. <classpathPrefix>lib/</classpathPrefix>
  20. <mainClass>cn.edu.hgu.flnkl.WordCountYarn</mainClass>
  21. </manifest>
  22. </archive>
  23. </configuration>
  24. </plugin>
  25. </plugins>
  26. </build>

3.3 上传集群并改名

 

3.4 提交执行

不带参数

flink run -Dexecution.runtime-mode=BATCH -m yarn-cluster -yjm 1024 -ytm 1024 -c cn.edu.hgu.flnkl.WordCountYarn /root/wc.jar

 

3.5 web UI查看进程

  • yarn上

     

  • hadoop上

http://hadoop01:50070/explorer.html#/wordcount

 

  • 在flink上

五、DataStream流批一体API开发

1.编程模型

 

2.source数据来源

a) 预定义的Source

  • 基于集合的Source
  1. env.fromElements():元素
  2. env.fromCollection():集合
  3. env.generateSequence():产生序列
  4. env.fromSequence():来自于序列
  1. package cn.edu.hgu.flnkl;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import java.util.Arrays;
  6. /**
  7. * @desc Flink基于集合的Source演示
  8. * @author 007
  9. * @date 2021/5/14
  10. */
  11. public class FlinkSourceDemo {
  12. public static void main(String args[]) throws Exception {
  13. //1.env
  14. //1、准备环境-env
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  17. //2.source
  18. //env.fromElements()//:元素
  19. DataStream<String> ds1 = env.fromElements("hadoop","spark","flink","hbase");
  20. //env.fromCollection()//:集合
  21. DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop","spark","flink","hbase"));
  22. //env.generateSequence()//:产生序列
  23. DataStream<Long> ds3 = env.generateSequence(1,10);
  24. //env.fromSequence()//:来自于序列
  25. DataStream<Long> ds4 = env.fromSequence(1,10);
  26. //3.transformer
  27. //4.sink
  28. ds1.print();
  29. ds2.print();
  30. ds3.print();
  31. ds4.print();
  32. //5.execute
  33. env.execute();
  34. }
  35. }

b) 基于文件的Source

env.readTextFile(本地/hdfs文件/文件夹/压缩包),如果需要读取hadoop上的文件或文件夹,在pom文件中需要添加hadoop依赖,同时需要把hadoop的配置文件hdfs-site.xml和core-site.xml下载复制到项目resource是文件下

pom文件

  1. dependency>
  2. <groupId>org.apache.hadoop</groupId>
  3. <artifactId>hadoop-client</artifactId>
  4. <version>3.1.4</version>
  5. </dependency>

resources文件夹

源代码

  1. package cn.edu.hgu.flnkl;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.DataStream;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import java.util.Arrays;
  6. /**
  7. * @desc Flink基于文件的Source演示
  8. * @author 007
  9. * @date 2021/5/14
  10. */
  11. public class FlinkSourceDemo1 {
  12. public static void main(String args[]) throws Exception {
  13. //1.env
  14. //1、准备环境-env
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  17. //2.source
  18. //env.fromElements()//:元素
  19. DataStream<String> ds1 = env.readTextFile("D:\\data\\input\\text1.txt");//文件
  20. //env.fromCollection()//:集合
  21. DataStream<String> ds2 = env.readTextFile("D:\\data\\input");//文件夹
  22. //env.generateSequence()//:产生序列
  23. DataStream<String> ds3 = env.readTextFile("hdfs://hadoop01:9000/wordcount/input/word.txt");//hadoop文件
  24. //env.fromSequence()//:来自于序列
  25. DataStream<String> ds4 = env.readTextFile("hdfs://hadoop01:9000/wordcount/input/words.txt.gz");//hadoop上的压缩包
  26. //3.transformer
  27. //4.sink
  28. ds1.print();
  29. ds2.print();
  30. ds3.print();
  31. ds4.print();
  32. //5.execute
  33. env.execute();
  34. }
  35. }

c)基于socket的Source

env.socketTextStream("主机名或ip地址", port)

需在hadoop01上安装nc,nc可以持续的向某一个主机的某个端口发送数据

yum install –y nc

安装好后,执行以下命令

nc -lk 9999

源代码

  1. package cn.edu.hgu.flnkl;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FlatMapFunction;
  4. import org.apache.flink.api.common.functions.MapFunction;
  5. import org.apache.flink.api.java.tuple.Tuple2;
  6. import org.apache.flink.streaming.api.datastream.DataStream;
  7. import org.apache.flink.streaming.api.datastream.KeyedStream;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.util.Collector;
  10. /**
  11. * @desc Flink基于socket的Source演示
  12. * @author 007
  13. * @date 2021/5/14
  14. */
  15. public class FlinkSourceDemo2 {
  16. public static void main(String args[]) throws Exception {
  17. //1、准备环境-env
  18. //新版本的流批统一api,既支持流处理也指出批处理
  19. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  20. //批处理模式//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  21. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//流处理模式
  22. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动选择处理模式
  23. //2、准备数据-source
  24. DataStream<String> lineDS = env.socketTextStream("hadoop01",9999);
  25. //3、处理数据-transformation
  26. //3.1 将每一行数据切分成一个个的单词组成一个集合
  27. DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
  28. @Override
  29. public void flatMap(String s, Collector<String> collector) throws Exception {
  30. //s就是一行行的数据,再将每一行分割为一个个的单词
  31. String[] words = s.split(" ");
  32. for (String word : words) {
  33. //将切割的单词收集起来并返回
  34. collector.collect(word);
  35. }
  36. }
  37. });
  38. //3.2 对集合中的每个单词记为1
  39. DataStream<Tuple2<String,Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  40. @Override
  41. public Tuple2<String, Integer> map(String s) throws Exception {
  42. //s就是进来的一个个单词,再跟1组成一个二元组
  43. return Tuple2.of(s,1);
  44. }
  45. });
  46. //3.3 对数据按照key进行分组
  47. //UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOnesDS.groupBy(0);
  48. KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOnesDS.keyBy(t->t.f0);
  49. //3.4 对各个组内的数据按照value进行聚合也就是求sum
  50. DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
  51. //3.5 对结果排序
  52. // DataStream<Tuple2<String,Integer>> result = aggResult..sortPartition(1, Order.DESCENDING).setParallelism(1);
  53. //4、输出结果-sink
  54. aggResult.print();
  55. //5、触发执行-execute
  56. //说明:如果有print那么Dataset不需要调用execute,DataStream需要调用execute
  57. env.execute();
  58. }
  59. }

启动项目,查看结果

 2. 预定义的数据源

  1. SourceFunction:非并行数据源(并行度=1)
  2. RichSourceFunction:多功能非并行数据源(并行度=1)
  3. ParallelSourceFunction:并行数据源(并行度>=1)
  4. RichParallelSourceFunction:多功能并行数据源(并行度>=1)

a)随机生成数据

b) Mysql

自定义数据源从MySql中读取数据

  • idea添加lombok插件

  • 在pom文件中,添加lombok和mysql的依赖

 

  •  连接mysql,建表

  • 创建学生实体类

  • 创建mysql自定义数据源类 

以前的代码

  1. package cn.edu.hgu.flink.config;
  2. import cn.edu.hgu.flink.entity.Student;
  3. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  4. import java.sql.Connection;
  5. import java.sql.DriverManager;
  6. import java.sql.PreparedStatement;
  7. import java.sql.ResultSet;
  8. import java.util.concurrent.TimeUnit;
  9. /**
  10. * @desc 自定义数据源连接mysql
  11. */
  12. public class MySQLSource extends RichParallelSourceFunction<Student> {
  13. private Connection connection = null;
  14. private PreparedStatement preparedStatement = null;
  15. private boolean flag = true;
  16. @Override
  17. public void run(SourceContext sourceContext) throws Exception {
  18. connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","root");
  19. String sql = "select * from student";
  20. preparedStatement = connection.prepareStatement(sql);
  21. while (flag) {
  22. ResultSet rs = preparedStatement.executeQuery();
  23. while (rs.next()) {
  24. int id = rs.getInt("id");
  25. String name = rs.getString("name");
  26. int age = rs.getInt("age");
  27. sourceContext.collect(new Student(id,name,age));
  28. }
  29. TimeUnit.SECONDS.sleep(5);
  30. }
  31. }
  32. @Override
  33. public void cancel() {
  34. // preparedStatement.close();
  35. // connection.close();
  36. flag = false;
  37. }
  38. }

 新版的:

  1. package cn.edu.hgu.flink.config;
  2. import cn.edu.hgu.flink.entity.Student;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.PreparedStatement;
  8. import java.sql.ResultSet;
  9. import java.util.concurrent.TimeUnit;
  10. /**
  11. * @desc 自定义数据源连接mysql
  12. */
  13. public class MySQLSource extends RichParallelSourceFunction<Student> {
  14. private Connection connection = null;
  15. private PreparedStatement preparedStatement = null;
  16. private boolean flag = true;
  17. @Override
  18. public void open(Configuration parameters) throws Exception {
  19. super.open(parameters);
  20. connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","root");
  21. String sql = "select * from student";
  22. preparedStatement = connection.prepareStatement(sql);
  23. }
  24. @Override
  25. public void run(SourceContext sourceContext) throws Exception {
  26. while (flag) {
  27. ResultSet rs = preparedStatement.executeQuery();
  28. while (rs.next()) {
  29. int id = rs.getInt("id");
  30. String name = rs.getString("name");
  31. int age = rs.getInt("age");
  32. sourceContext.collect(new Student(id,name,age));
  33. }
  34. TimeUnit.SECONDS.sleep(5);
  35. }
  36. }
  37. @Override
  38. public void cancel() {
  39. flag = false;
  40. }
  41. @Override
  42. public void close() throws Exception {
  43. super.close();
  44. preparedStatement.close();
  45. connection.close();
  46. }
  47. }
  • 编写主类

  • 执行,查看结果 

3.Transformation数据的计算

3.1 基本操作

  • a )flatmap

将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

  • b) map

将函数作用在集合的每一个元素上,并返回作用后的结果

  • c) keyBy

按照指定的key对流中的数据进行分组,注意流处理中没有groupBy,而是keyBy

  • d) filter

按照指定的条件对集合中的元素进行过滤,返回符合条件的元素

  • e) sum

按照指定的字段对集合中的元素进行求和

  • f) reduce

对集合中的元素进行聚合

  • g) 案例

对流数据中的单词进行统计,排除敏感词heihei

完整的代码

  1. package cn.edu.hgu.flink;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.functions.FlatMapFunction;
  5. import org.apache.flink.api.common.functions.MapFunction;
  6. import org.apache.flink.api.common.functions.ReduceFunction;
  7. import org.apache.flink.api.java.tuple.Tuple2;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. import org.apache.flink.streaming.api.datastream.KeyedStream;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.util.Collector;
  12. /**
  13. * @desc flink的transformation过滤敏感词
  14. * @author 007
  15. * @date 2021-5-21
  16. */
  17. public class FlinkTransformationDemo1 {
  18. public static void main(String args[]) throws Exception {
  19. //1、准备环境-env
  20. //新版本的流批统一api,既支持流处理也指出批处理
  21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. //批处理模式//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
  23. // env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//流处理模式
  24. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//自动选择处理模式
  25. //2、准备数据-source
  26. DataStream<String> lineDS = env.fromElements("spark heihei sqoop hadoop","spark flink","hadoop fink heihei spark");
  27. //3、处理数据-transformation
  28. //3.1 将每一行数据切分成一个个的单词组成一个集合
  29. DataStream<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
  30. @Override
  31. public void flatMap(String s, Collector<String> collector) throws Exception {
  32. //s就是一行行的数据,再将每一行分割为一个个的单词
  33. String[] words = s.split(" ");
  34. for (String word : words) {
  35. //将切割的单词收集起来并返回
  36. collector.collect(word);
  37. }
  38. }
  39. });
  40. //3.1.5 对数据进行敏感词过滤
  41. DataStream<String> filterDS = wordsDS.filter(new FilterFunction<String>() {
  42. @Override
  43. public boolean filter(String s) throws Exception {
  44. return !s.equals("heihei");
  45. }
  46. });
  47. //3.2 对集合中的每个单词记为1
  48. DataStream<Tuple2<String,Integer>> wordAndOnesDS = filterDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
  49. @Override
  50. public Tuple2<String, Integer> map(String s) throws Exception {
  51. //s就是进来的一个个单词,再跟1组成一个二元组
  52. return Tuple2.of(s,1);
  53. }
  54. });
  55. //3.3 对数据按照key进行分组
  56. //UnsortedGrouping<Tuple2<String,Integer>> groupedDS = wordAndOnesDS.groupBy(0);
  57. KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOnesDS.keyBy(t->t.f0);
  58. //3.4 对各个组内的数据按照value进行聚合也就是求sum
  59. DataStream<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
  60. //3.5 对结果聚合
  61. DataStream<Tuple2<String,Integer>> redResult = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
  62. @Override
  63. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
  64. return Tuple2.of(t2.f0,t2.f1 + t2.f1);
  65. }
  66. });
  67. //4、输出结果-sink
  68. aggResult.print();
  69. redResult.print();
  70. //5、触发执行-execute
  71. //说明:如果有print那么Dataset不需要调用execute,DataStream需要调用execute
  72. env.execute();
  73. }
  74. }

 3.2 拆分-合并

  • a)union和connect

union合并多个同类型的数据流,并生成一个同类型的新的数据流,connect连接两个数据流,这两个数据流可以是不同的类型

案例

  1. package cn.edu.hgu.flink;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  7. import java.util.Arrays;
  8. /**
  9. * @desc flink的transformation合并
  10. * @author 007
  11. * @date 2021-5-21
  12. */
  13. public class FlinkTransformationDemo2 {
  14. public static void main(String args[]) throws Exception {
  15. //1.env
  16. //1、准备环境-env
  17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  18. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  19. //2.source
  20. //env.fromElements()//:元素
  21. DataStream<String> ds1 = env.fromElements("hadoop","spark","flink","hbase");
  22. //env.fromCollection()//:集合
  23. DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop","spark","flink","hbase"));
  24. //env.generateSequence()//:产生序列
  25. DataStream<Long> ds3 = env.generateSequence(1,10);
  26. //env.fromSequence()//:来自于序列
  27. DataStream<Long> ds4 = env.fromSequence(1,10);
  28. //3.transformer
  29. //合并
  30. DataStream<String> union1 = ds1.union(ds2);//合并但不去重
  31. ConnectedStreams<String,Long> connect1 = ds1.connect(ds3);
  32. DataStream<String> connect2 = connect1.map(new CoMapFunction<String, Long, String>() {
  33. @Override
  34. public String map1(String s) throws Exception {
  35. return "String->String" + s;
  36. }
  37. @Override
  38. public String map2(Long aLong) throws Exception {
  39. return "Long->String" + aLong.toString();
  40. }
  41. });
  42. //4.sink
  43. // union1.print();
  44. connect2.print();
  45. //5.execute
  46. env.execute();
  47. }
  48. }
  • b)Split、Select和Side Outputs

split将一个流分成多个流(已过期并移除),select是获取分流后对应的数据(已过期并移除),Side Outputs可以使用process方法对流中的数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中。

  • c)案例

代码

  1. package cn.edu.hgu.flink;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.typeinfo.TypeInformation;
  4. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.streaming.api.functions.ProcessFunction;
  10. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
  11. import org.apache.flink.util.Collector;
  12. import org.apache.flink.util.OutputTag;
  13. import scala.Int;
  14. import java.util.Arrays;
  15. /**
  16. * @desc flink的transformation拆分
  17. * @author 007
  18. * @date 2021-5-21
  19. */
  20. public class FlinkTransformationDemo3 {
  21. public static void main(String args[]) throws Exception {
  22. //1.env
  23. //1、准备环境-env
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  26. //2.source
  27. DataStreamSource<Integer> ds = env.fromElements(1,2,3,4,5,6,7,8,9,10);
  28. //3.transformer
  29. //拆分
  30. //Side Outputs
  31. //定义标签
  32. OutputTag<Integer> tag_even = new OutputTag<Integer>("偶数", TypeInformation.of(Integer.class));
  33. OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇数",TypeInformation.of(Integer.class));
  34. //对ds中的数据按标签进行划分
  35. SingleOutputStreamOperator<Integer> tagResult = ds.process(new ProcessFunction<Integer, Integer>() {
  36. @Override
  37. public void processElement(Integer integer, Context context, Collector<Integer> collector) throws Exception {
  38. if (integer % 2 == 0) {//偶数
  39. context.output(tag_even,integer);
  40. } else {
  41. context.output(tag_odd,integer);
  42. }
  43. }
  44. });
  45. // //取出标记好的数据
  46. DataStream<Integer> evenResult = tagResult.getSideOutput(tag_even);//取出偶数标记的数据
  47. DataStream<Integer> oddResult = tagResult.getSideOutput(tag_odd);//取出奇数标记的数据
  48. //4.sink
  49. evenResult.print();
  50. oddResult.print();
  51. //5.execute
  52. env.execute();
  53. }
  54. }

 

3.3 分区

  • rebalance重平衡分区

类似于spark中的repartition,解决数据倾斜,数据倾斜指的是大量的数据集中于一台节点上,而其他节点的负载较轻

案例:

完整代码

  1. package cn.edu.hgu.flink;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.api.common.functions.FilterFunction;
  4. import org.apache.flink.api.common.functions.RichMapFunction;
  5. import org.apache.flink.api.common.typeinfo.TypeInformation;
  6. import org.apache.flink.api.java.tuple.Tuple2;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.flink.streaming.api.functions.ProcessFunction;
  12. import org.apache.flink.util.Collector;
  13. import org.apache.flink.util.OutputTag;
  14. import javax.xml.crypto.Data;
  15. /**
  16. * @desc flink的transformation重平衡
  17. * @author 007
  18. * @date 2021-5-21
  19. */
  20. public class FlinkTransformationDemo4 {
  21. public static void main(String args[]) throws Exception {
  22. //1.env
  23. //1、准备环境-env
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  26. //2.source
  27. DataStreamSource<Long> longDS = env.fromSequence(0,10000);
  28. //3.transformer
  29. //将数据随机分配一下,有可能出现数据倾斜
  30. DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
  31. @Override
  32. public boolean filter(Long aLong) throws Exception {
  33. return aLong > 10;
  34. }
  35. });
  36. //直接处理,有可能出现数据倾斜
  37. DataStream<Tuple2<Integer,Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
  38. @Override
  39. public Tuple2<Integer, Integer> map(Long aLong) throws Exception {
  40. int id = getRuntimeContext().getIndexOfThisSubtask();
  41. return Tuple2.of(id,1);
  42. }
  43. }).keyBy(t->t.f0).sum(1);
  44. //在数据输出前进行了rebalance重平衡分区,解决数据的倾斜
  45. DataStream<Tuple2<Integer,Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
  46. @Override
  47. public Tuple2<Integer, Integer> map(Long aLong) throws Exception {
  48. int id = getRuntimeContext().getIndexOfThisSubtask();
  49. return Tuple2.of(id,1);
  50. }
  51. }).keyBy(t->t.f0).sum(1);
  52. //4.sink
  53. // result1.print();
  54. result2.print();
  55. //5.execute
  56. env.execute();
  57. }
  58. }

 

  • 其他分区

3.4 Sink数据的去处

  • 1)预定义的Sink
  1. ds.print():直接输出到控制台
  2. ds.printToErr():直接输出到控制台,用红色
  3. ds.writeAsText(“本地/HDFS”,WriteMode.OVERWRITE).setParallelism(n):输出到本地或者hdfs上,如果n=1,则输出为文件名,如果n>1,则输出为文件夹

代码演示:

项目结构:

代码:

  1. package cn.edu.hgu.flink.sink;
  2. import org.apache.flink.api.common.RuntimeExecutionMode;
  3. import org.apache.flink.core.fs.FileSystem;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. /**
  7. * @desc Flink预定义Sink演示
  8. * @author 007
  9. * @date 2021/5/28
  10. */
  11. public class FlinkSinkDemo1 {
  12. public static void main(String args[]) throws Exception {
  13. //1.env
  14. //1、准备环境-env
  15. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16. env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
  17. //2.source
  18. //env.fromElements()//:元素
  19. DataStream<String> ds1 = env.readTextFile("D:\\data\\input\\text1.txt");//文件
  20. //3.transformer
  21. //4.sink
  22. // ds1.print();
  23. // ds1.printToErr();
  24. // ds1.writeAsText("d:/data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(1);//输出为一个文件
  25. ds1.writeAsText("d:/data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2);//输出为一个文件夹
  26. //5.execute
  27. env.execute();
  28. }
  29. }

 

  • 2)自定义的Sink

MySQL

自定义sink,把数据存放到mysql中

项目结构:

学生实体类:

  1. package cn.edu.hgu.flink.entity;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. /**
  6. * 学生实体类
  7. */
  8. @Data //生成getter和setter
  9. @NoArgsConstructor //生成无参构造方法
  10. @AllArgsConstructor //生成全参的构造方法
  11. public class Student {
  12. private Integer id;
  13. private String name;
  14. private Integer age;
  15. }

 数据存入mysql的sink类

  1. package cn.edu.hgu.flink.config;
  2. import cn.edu.hgu.flink.entity.Student;
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  5. import java.sql.Connection;
  6. import java.sql.DriverManager;
  7. import java.sql.PreparedStatement;
  8. /**
  9. * @desc 自定义Sink连接mysql
  10. */
  11. public class MySQLSink extends RichSinkFunction<Student> {
  12. private Connection connection = null;
  13. private PreparedStatement preparedStatement = null;
  14. @Override
  15. public void open(Configuration parameters) throws Exception {
  16. //调用父类的构造方法,可删除
  17. super.open(parameters);
  18. //加载mysql驱动,建立连接
  19. connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC","root","root");
  20. String sql = "insert into student(name,age) values(?,?)";
  21. //建立Statement
  22. preparedStatement = connection.prepareStatement(sql);
  23. }
  24. @Override
  25. public void invoke(Student value, Context context) throws Exception {
  26. //给ps中的?设置具体的值
  27. preparedStatement.setString(1,value.getName());//获取姓名
  28. preparedStatement.setInt(2,value.getAge());//获取年龄
  29. //执行sql
  30. preparedStatement.executeUpdate();
  31. }
  32. @Override
  33. public void close() throws Exception {
  34. super.close();
  35. preparedStatement.close();
  36. connection.close();
  37. }
  38. }

主类

  1. package cn.edu.hgu.flink.sink;
  2. import cn.edu.hgu.flink.config.MySQLSink;
  3. import cn.edu.hgu.flink.config.MySQLSource;
  4. import cn.edu.hgu.flink.entity.Student;
  5. import org.apache.flink.streaming.api.datastream.DataStream;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. /**
  8. * @desc Flink自定义Sink把数据写入到Mysql中
  9. * @author 007
  10. * @date 2021-5-28
  11. */
  12. public class FlinkSinkMysqlDemo {
  13. public static void main(String args[]) throws Exception {
  14. //1.env
  15. //1、准备环境-env
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. //2.source
  18. DataStream<Student> studentDS = env.fromElements(new Student(null,"tony",28));
  19. //3.transformer
  20. //4.sink
  21. studentDS.addSink(new MySQLSink());
  22. //5.execute
  23. env.execute();
  24. }
  25. }

 

3.5 connectors 

六、Table API和SQL开发

 1.简介

2.为什么需要Table API和SQL

Flink的Table模块包括Table API和SQL:

Table API是一种类SQL的API,使用它用户可以像操作table一样操作数据,非常直观和方便。

SQL作为一种声明式语言,和关系型数据库比如mysql的sql基本一致,用户可以不用关心底层实现就可进行数据的处理。

特点:

  1. 声明式-用户只关心做什么,不用关心怎么做
  2. 高性能-支持查询优化,可以获取更好的性能
  3. 流批统一
  4. 标准稳定-遵循sql标准
  5. 易理解

3. pom文件添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>Flink-dataset-api-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>8</maven.compiler.source>
  11. <maven.compiler.target>8</maven.compiler.target>
  12. </properties>
  13. <dependencies>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-clients_2.12</artifactId>
  17. <version>1.12.2</version>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.apache.hadoop</groupId>
  21. <artifactId>hadoop-client</artifactId>
  22. <version>3.1.4</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.projectlombok</groupId>
  26. <artifactId>lombok</artifactId>
  27. <version>1.18.16</version>
  28. </dependency>
  29. <dependency>
  30. <groupId>mysql</groupId>
  31. <artifactId>mysql-connector-java</artifactId>
  32. <version>8.0.22</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.apache.flink</groupId>
  36. <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  37. <version>1.12.2</version>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.flink</groupId>
  41. <artifactId>flink-table-common</artifactId>
  42. <version>1.12.2</version>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.apache.flink</groupId>
  46. <artifactId>flink-table-planner-blink_2.12</artifactId>
  47. <version>1.12.2</version>
  48. </dependency>
  49. <dependency>
  50. <groupId>org.apache.flink</groupId>
  51. <artifactId>flink-streaming-java_2.12</artifactId>
  52. <version>1.12.2</version>
  53. </dependency>
  54. <dependency>
  55. <groupId>org.apache.flink</groupId>
  56. <artifactId>flink-csv</artifactId>
  57. <version>1.12.2</version>
  58. </dependency>
  59. <dependency>
  60. <groupId>org.apache.flink</groupId>
  61. <artifactId>flink-connector-jdbc_2.12</artifactId>
  62. <version>1.12.2</version>
  63. </dependency>
  64. </dependencies>
  65. <build>
  66. <plugins>
  67. <plugin>
  68. <groupId>org.apache.maven.plugins</groupId>
  69. <artifactId>maven-compiler-plugin</artifactId>
  70. <configuration>
  71. <source>1.8</source>
  72. <target>1.8</target>
  73. </configuration>
  74. </plugin>
  75. <plugin>
  76. <groupId>org.apache.maven.plugins</groupId>
  77. <artifactId>maven-jar-plugin</artifactId>
  78. <configuration>
  79. <archive>
  80. <manifest>
  81. <addClasspath>true</addClasspath>
  82. <useUniqueVersions>false</useUniqueVersions>
  83. <classpathPrefix>lib/</classpathPrefix>
  84. <mainClass>cn.edu.hgu.flink.dataset.WordCountYarn</mainClass>
  85. </manifest>
  86. </archive>
  87. </configuration>
  88. </plugin>
  89. </plugins>
  90. </build>
  91. </project>

4. 案例1-读取csv中的数据进行操作

4.1 准备数据

4.2 新建一个类

 

完整代码

  1. package cn.edu.hgu.flink.table;
  2. import org.apache.flink.table.api.EnvironmentSettings;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.TableEnvironment;
  5. import org.apache.flink.table.api.TableResult;
  6. /**
  7. * @desc flink操作csv的数据
  8. * @author 007
  9. * @date 2021-6-17
  10. */
  11. public class FlinkTableCSVDemo {
  12. public static void main(String[] args) {
  13. // 1、create a TableEnvironment for batch or streaming execution
  14. EnvironmentSettings settings = EnvironmentSettings
  15. .newInstance()
  16. .inStreamingMode()
  17. //.inBatchMode()
  18. .build();
  19. TableEnvironment tEnv = TableEnvironment.create(settings);
  20. // 2、create an input Table
  21. tEnv.executeSql("CREATE TABLE student (\n" +
  22. " id INT,\n" +
  23. " name STRING,\n" +
  24. " age INT\n" +
  25. ") WITH (\n" +
  26. " 'connector' = 'filesystem',\n" +
  27. " 'path' = 'd:\\student.csv',\n" +
  28. " 'format' = 'csv',\n" +
  29. " 'csv.ignore-parse-errors' = 'true',\n" +
  30. " 'csv.allow-comments' = 'true',\n" +
  31. " 'csv.field-delimiter' = ','\n" +
  32. ")");
  33. // 3、register an output Table
  34. //tEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
  35. // 4、create a Table object from a Table API query
  36. Table table = tEnv.from("student");
  37. // create a Table object from a SQL query
  38. //Table table3 = tEnv.sqlQuery("SELECT ... FROM table1 ... ");
  39. // 5、emit a Table API result Table to a TableSink, same for SQL result
  40. TableResult tableResult = table.execute();
  41. tableResult.print();
  42. // table.printSchema();
  43. }
  44. }

4.3 执行结果

5. 案例2-flink读取mysql表的数据进行操作

5.1 准备数据

 

5.2 新建一个类

完整代码

  1. package cn.edu.hgu.flink.table;
  2. import org.apache.flink.table.api.EnvironmentSettings;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.TableEnvironment;
  5. import org.apache.flink.table.api.TableResult;
  6. /**
  7. * @desc flink读取mysql表的数据进行操作
  8. * @author 007
  9. * @date 2021-6-17
  10. */
  11. public class FlinkTableJDBCDemo {
  12. public static void main(String[] args) {
  13. // 1、create a TableEnvironment for batch or streaming execution
  14. EnvironmentSettings settings = EnvironmentSettings
  15. .newInstance()
  16. .inStreamingMode()
  17. //.inBatchMode()
  18. .build();
  19. TableEnvironment tEnv = TableEnvironment.create(settings);
  20. //2、 create an input Table
  21. tEnv.executeSql("CREATE TABLE student (\n" +
  22. " id INT,\n" +
  23. " name STRING,\n" +
  24. " age INT,\n" +
  25. " PRIMARY KEY (id) NOT ENFORCED\n" +
  26. ") WITH (\n" +
  27. " 'connector' = 'jdbc',\n" +
  28. " 'url' = 'jdbc:mysql://localhost:3306/test?serverTimezone=UTC',\n" +
  29. " 'table-name' = 'student',\n" +
  30. " 'username' = 'root',\n" +
  31. " 'password' = 'root'\n" +
  32. ")");
  33. //3、 register an output Table
  34. //tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
  35. //4、create a Table object from a Table API query
  36. Table table = tEnv.from("student").select("id,name");
  37. // create a Table object from a SQL query
  38. //Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
  39. //5、emit a Table API result Table to a TableSink, same for SQL result
  40. //打印表的结构
  41. table.printSchema();
  42. //输出表的数据
  43. TableResult tableResult = table.execute();
  44. tableResult.print();
  45. }
  46. }

5.3 执行结果

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/598051
推荐阅读
相关标签
  

闽ICP备14008679号