当前位置:   article > 正文

【推荐】Java for flink 学习系列:从入门到精通_flink java学习

flink java学习

Flink 官方doc地址:  https://ci.apache.org/projects/flink/flink-docs-release-1.8/

官方下载地址:https://flink.apache.org/downloads.html

A. 运行部署部份

一般来说,flink部署常见的有三种模式,下面就举例就如何启动最简单的example wordcount来说说。

其中最多的是第三种。

  •  在Linux/Windows上 单机运行

 1)Flink入门系列1----windows安装flink并启动

 2) Flink入门系列2----Linux安装flink并启动

 

  • 在Linux 上 以 standalone 集群 方式运行

 1) Flink入门系列3----多机部署standalone集群

2) Flink Standalone 集群部署详细步骤

 

  • 在Hadoop Yarn 运行

Yarn上面配置HA模式有两种:session方式和单个job方式。

  1. session 方式

首先,你必须设置  YARN_CONF_DIR 或者 HADOOP_CONF_DIR 环境变量去指向hadoop 的配置文件目录。

  1. #vi /etc/profile
  2. 然后加入下面的一行
  3. export HADOOP_CONF_DIR=/etc/hadoop/conf
  4. # . /etc/profile

或者直接在shell console中,执行下面的语句也行。

#  export HADOOP_CONF_DIR=/etc/hadoop/conf

目的就是让flink yarn client能够找到 hadoop集群的配置信息。

 

其次:要如果在CDH,你要设置用户为hdfs (因为CDH就是用这个用户管理hdfs的,否则就会出现读写HDFS 权限问题)

 

  1. # vi yarn-session.sh
  2. # add user begin
  3. export HADOOP_USER_NAME=hdfs
  4. #add user end
  5. JVM_ARGS="$JVM_ARGS -Xmx512m"
  6. CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`

  Step 1: 生成session

  1. // cd <你的flink 安装目录>
  2. # cd flink-1.8.1
  3. # bin/yarn-session.sh -n 3 -s 3 -jm 1024  -tm 1024 -nm pinpoint-flink-job

//example output 如下

  1. [root@sltuenym6bh flink-1.8.1]# bin/yarn-session.sh -n 3 -s 3 -jm 1024 -tm 1024 -nm pinpoint-flink-job
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/home/tanghb2/flink/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  7. 2019-08-25 11:02:49,561 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
  8. 2019-08-25 11:02:49,564 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
  9. 2019-08-25 11:02:49,564 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
  10. 2019-08-25 11:02:49,564 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
  11. 2019-08-25 11:02:49,564 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1
  12. 2019-08-25 11:02:49,564 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
  13. 2019-08-25 11:02:50,121 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to hdfs (auth:SIMPLE)
  14. 2019-08-25 11:02:50,346 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument n is deprecated in will be ignored.
  15. 2019-08-25 11:02:50,494 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=3, slotsPerTaskManager=3}
  16. 2019-08-25 11:02:50,950 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  17. 2019-08-25 11:02:50,967 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/tanghb2/flink/flink-1.8.1/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
  18. 2019-08-25 11:02:52,178 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1565019835797_0516
  19. 2019-08-25 11:02:52,204 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1565019835797_0516
  20. 2019-08-25 11:02:52,204 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
  21. 2019-08-25 11:02:52,206 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
  22. 2019-08-25 11:02:57,250 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
  23. 2019-08-25 11:02:57,771 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
  24. Flink JobManager is now running on sltlrdwgty2.novalocal:44648 with leader id 00000000-0000-0000-0000-000000000000.
  25. JobManager Web Interface: http://sltlrdwgty2.novalocal:44648

注意上面给出了Job manager GUI地址:

Flink JobManager is now running on sltlrdwgty2.novalocal:44648 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://sltlrdwgty2.novalocal:44648

你的输出和你的YARN 集群配置有关。你可以根据这个GUI 地址去查看Flink Job mgr的信息了,样子如下。

 

注意:部署长期运行的flink on yarn实例后,在flink web上看到的TaskManager以及Slots都为0。只有在提交任务的时候,才会依据分配资源给对应的任务执行。 

Step 2: 提交Job到长期运行的flink on yarn实例上   

  1. # cd <你的flink安装目录>
  2. # wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
  3. # hadoop fs -put ./LICENSE-2.0.txt hdfs://nameservice1:/billflink/
  4. # bin/flink run ./examples/batch/WordCount.jar --input hdfs://nameservice1:/billflink/LICENSE-2.0.txt --output hdfs://nameservice1:/billflink/wordcount_result.txt

 

输出的样子如下

  1. SLF4J: Class path contains multiple SLF4J bindings.
  2. SLF4J: Found binding in [jar:file:/home/tanghb2/flink/flink-1.8.1/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  3. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.1.0-1.cdh6.1.0.p0.770702/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  5. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  6. 2019-08-25 11:22:22,965 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root.
  7. 2019-08-25 11:22:22,965 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn properties file under /tmp/.yarn-properties-root.
  8. 2019-08-25 11:22:23,385 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 9
  9. 2019-08-25 11:22:23,385 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN properties set default parallelism to 9
  10. YARN properties set default parallelism to 9
  11. 2019-08-25 11:22:23,523 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  12. 2019-08-25 11:22:23,523 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  13. 2019-08-25 11:22:23,588 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Found application JobManager host name 'sltlrdwgty2.novalocal' and port '44648' from supplied application id 'application_1565019835797_0516'
  14. Starting execution of program
  15. Program execution finished
  16. Job with JobID f391357c52e0be2d518d743f858fb15e has finished.
  17. Job Runtime: 10516 ms

注:输出有下面的信息:2019-08-25 11:33:40,546 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'sltlrdwgty2.novalocal' and port '44648' from supplied application id 'application_1565019835797_0516'。

所以上面提交job时候,不需要指定job manager的地址,应该flink clinet能找到:job manager 和 application master是在同一个主机上。

去观察flink job manager GUI,如下所示。可以看到有作业提交和完成了。

  1. # hadoop fs -text hdfs://nameservice1:/billflink/wordcount_result.txt
  2. 0 3
  3. 1 2
  4. 2 4
  5. 2004 1
  6. 3 1
  7. 4 1
  8. 5 1

可以看到有统计的结果了。

然后可以删除掉上次的统计结果,再次提交作业

  1. # hadoop fs -rm hdfs://nameservice1:/billflink/wordcount_result.txt
  2. # bin/flink run ./examples/batch/WordCount.jar --input hdfs://nameservice1:/billflink/LICENSE-2.0.txt --output hdfs://nameservice1:/billflink/wordcount_result.txt

这时候,flink job manager 的GUI如下所示,显示有两个作业完成了。

演示完毕,更多的信息可以看下面的文章。

1. 官方讲解如何配置FLINK YARN HA

2. 官方讲解如何在YARN上运行flink

一篇讲得还可以的文章如下:

3)  Apache Flink On Yarn模式高可用(HA)集群部署

 

请注意: client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。经试验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可
 

当然,flink还支持在 Kubernetes (k8s),Docker, AWS, Aliyun等上面运行,详见情况见(点击这里的官方文档)。

 

B:系统学习部分

整个系统学习,见下面的文章,感觉不错,就当键盘侠收藏一下!

flink学习之一---准备工作

flink学习之二-入门版概念

flink学习之三--引入spring

flink学习之四-使用kafka作为数据源

flink学习之五-数据持久化to-mysql

flink学习之六-数据持久化to-kafka

flink学习之八-keyby&reduce

flink学习之九-window & Time概念理解

flink学习之十-window&ProcessingTime实例

flink学习之十一-window&EventTime实例

flink学习之十二-eventTime中的watermark

官方资料:

https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#start-a-long-running-flink-cluster-on-yarn

flink的运行方式:

https://www.jianshu.com/p/c47e8f438291

C:flink1.8填坑笔记

1. Flink-1.8 在 Hadoop yarn cluster 的坑

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/972545
推荐阅读
相关标签
  

闽ICP备14008679号