当前位置:   article > 正文

Flink SQL 流计算可视化 UI 平台_com.flink.streaming.core.jobapplication

com.flink.streaming.core.jobapplication

一、简介

flink-streaming-platform-web系统是基于flink封装的一个可视化的web系统,用户只需在web界面进行sql配置就能完成流计算任务,主要功能包含任务配置、启/停任务、告警、日志等功能。目的是减少开发,完全实现 flink-sql 流计算任务,flink 任务支持单流、双流、单流与维表等,支持本地模式、yarn-per模式、STANDALONE模式。

支持udf、自定义连接器等,完全兼容官方连接器

目前flink版本已经升级到1.12

效果图

图片

图片

 

图片

图片

图片

图片

图片

图片

图片

图片

图片

图片

二、环境以及安装

1、环境

  • 操作系统:linux  (不支持win系统)

  • hadoop版本 2+

  • flink 版本 1.12.0  官方地址: https://ci.apache.org/projects/flink/flink-docs-release-1.12/

  • jdk版本 jdk1.8

  • scala版本 2.11

  • kafka版本 1.0+

  • mysql版本 5.6+

2、应用安装

1、flink客户端安装

下载对应版本

https://www.apache.org/dyn/closer.lua/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz

然后解压

a: /flink-1.12.0/conf

1、YARN_PER模式

文件下面放入hadoop客户端配置文件

  1. core-site.xml 
  2. yarn-site.xml 
  3. hdfs-site.xml

2、LOCAL模式

3、STANDALONE模式

以上三种模式都需要修改  flink-conf.yaml   开启 classloader.resolve-order 并且设置

classloader.resolve-order: parent-first

b: /flink-1.11.1/lib  hadoop集成

  1. 下载 flink-shaded-hadoop-2-uber-${xxx}.jar 到lib 
  2. 地址  https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

完毕后执行  export HADOOP_CLASSPATH=hadoop classpath

export HADOOP_CLASSPATH=hadoop classpath

2、flink-streaming-platform-web安装

a:下载最新版本 并且解压 https://github.com/zhp8341/flink-streaming-platform-web/releases/

  1.  tar -xvf   flink-streaming-platform-web.tar.gz

b:执行mysql语句

  1. mysql 版本5.6+以上
  2.  创建数据库 数据库名:flink_web
  3.  
  4.  执行表语句
  5.  语句地址 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql

c:修改数据库连接配置

  1. /flink-streaming-platform-web/conf/application.properties  
  2. 改成上面建好的mysql地址

关于数据库连接配置 需要看清楚你 useSSL=true 你的mysql是否支持

d:启动web

  1. cd  /XXXX/flink-streaming-platform-web/bin 
  2. 一定要到bin目录下再执行
  3. 启动 : sh deploy.sh  start
  4. 停止 :  sh deploy.sh  stop
  5. 日志目录地址: /XXXX/flink-streaming-platform-web/logs/

e:登录

  1. http://${ip或者hostname}:9084/  如 : http://hadoop003:9084/
  2. 登录号:admin  密码 123456

f:集群

如果需要集群部署模式 简单参考图

图片

图片

备注:flink客户端必须和flink-streaming-platform-web应用部署在一起

三、功能介绍

1、新增任务配置说明

a: 任务名称(*必选)

任务名称不能超过50个字符 并且 任务名称仅能含数字,字母和下划线

b: 运行模式

YARN_PER( yarn独立模式 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn)

STANDALONE(独立集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/cluster_setup.html)

LOCAL(本地集群 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/local.html )

LOCAL 需要在本地单机启动flink 服务  ./bin/start-cluster.sh

c: flink运行配置

1、YARN_PER模式

  1. 参数(和官方保持一致)但是只支持 -yD -p -yjm -yn -ytm -ys -yqu(必选)  
  2.  -ys slot个数。
  3.  -yn task manager 数量。
  4.  -yjm job manager 的堆内存大小。
  5.  -ytm task manager 的堆内存大小。
  6.  -yqu yarn队列明
  7.  -p 并行度
  8.  -yD 如-yD  taskmanager.heap.mb=518
  9.  详见官方文档
  10. 如: -yqu flink   -yjm 1024m -ytm 2048m  -p 1  -ys 1

2、LOCAL模式

无需配置

3、STANDALONE模式

  1. -d,--detached                        If present, runs the job in detached
  2.                                           mode
  3. -p,--parallelism <parallelism>       The parallelism with which to run the
  4.                                           programOptional flag to override the
  5.                                           default value specified in the
  6.                                           configuration.
  7. -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
  8.                                           from (for example
  9.                                           hdfs:///flink/savepoint-1537).
  10. 其他运行参数可通过 flink -h查看

d: Checkpoint信息

  1. 不填默认不开启checkpoint机制 参数只支持 
  2. -checkpointInterval 
  3. -checkpointingMode 
  4. -checkpointTimeout 
  5. -checkpointDir 
  6. -tolerableCheckpointFailureNumber 
  7. -asynchronousSnapshots 
  8. 如:  -asynchronousSnapshots true  -checkpointDir   hdfs://hcluster/flink/checkpoints/   
  9. (注意目前权限)
参数说明
checkpointInterval整数 (如 1000)默认每60s保存一次checkpoint  单位毫秒
checkpointingModeEXACTLY_ONCE  或者 AT_LEAST_ONCE一致性模式 默认EXACTLY_ONCE  单位字符
checkpointTimeout6000默认超时10 minutes 单位毫秒
checkpointDir 保存地址 如  hdfs://hcluster/flink/checkpoints/ 注意目录权限
tolerableCheckpointFailureNumber1设置失败次数 默认一次
asynchronousSnapshotstrue 或者 false是否异步

e: 三方地址

  1. 填写连接器或者udf等jar 
  2.  如: 
  3. http://ccblog.cn/jars/flink-connector-jdbc_2.11-1.12.0.jar
  4. http://ccblog.cn/jars/flink-sql-connector-kafka_2.11-1.12.0.jar
  5. http://ccblog.cn/jars/flink-streaming-udf.jar
  6. http://ccblog.cn/jars/mysql-connector-java-5.1.25.jar
  7.  
  8.  地址填写后 udf可以在sql语句里面直接写
  9. CREATE   FUNCTION jsonHasKey as 'com.xx.udf.JsonHasKeyUDF';

图片

多个url使用换行

udf 开发demo 详见  https://github.com/zhp8341/flink-streaming-udf

2、系统设置

  1.     系统设置有三个必选项
  2.     1、flink-streaming-platform-web应用安装的目录(必选) 
  3.      这个是应用的安装目录
  4.       如 /root/flink-streaming-platform-web/
  5.     2、flink安装目录(必选)
  6.       --flink客户端的目录 如: /usr/local/flink-1.12.0/
  7.     3、yarn的rm Http地址
  8.      --hadoop yarn的rm Http地址  http://hadoop003:8088/
  9.     4、flink_rest_http_address
  10.      LOCAL模式使用 flink http的地址
  11.     5、flink_rest_ha_http_address
  12.      STANDALONE模式 支持HA的   可以填写多个地址 ;用分隔

图片

3、报警设置

  1.     报警设置用于: 当运行的任务挂掉的时候会告警
  2.    
  3.     资料:钉钉报警设置官方文档:https://help.aliyun.com/knowledge_detail/106247.html
  4.  

安全设置 关键词必须填写: 告警

图片

图片

效果图

图片

三、配置demo

请使用一下sql进行环境测试

  1.   CREATE TABLE source_table (
  2.   f0 INT,
  3.   f1 INT,
  4.   f2 STRING
  5.  ) WITH (
  6.   'connector' = 'datagen',
  7.   'rows-per-second'='5'
  8.  );
  9.   
  10.   
  11.  CREATE TABLE print_table (
  12.   f0 INT,
  13.   f1 INT,
  14.   f2 STRING
  15.  ) WITH (
  16.   'connector' = 'print'
  17.  );
  18.   
  19.   
  20.   insert into print_table select f0,f1,f2 from source_table;
  21.  

以下语法是按照flink1.10写的 有时间重新写

demo1 单流kafka写入mysqld 参考

demo2 双流kafka写入mysql 参考

demo3 kafka和mysql维表实时关联写入mysql 参考

demo4 滚动窗口

demo5 滑动窗口

  1. CREATE   FUNCTION jsonHasKey as 'com.xx.udf.JsonHasKeyUDF';
  2. -- 如果使用udf 函数必须配置 udf地址
  3.      create table flink_test_6 ( 
  4.   id BIGINT,
  5.   day_time VARCHAR,
  6.   amnount BIGINT,
  7.   proctime AS PROCTIME ()
  8. )
  9.  with ( 
  10.  'connector.properties.zookeeper.connect'='hadoop001:2181',
  11.   'connector.version'='universal',
  12.   'connector.topic'='flink_test_6',
  13.   'connector.startup-mode'='earliest-offset',
  14.   'format.derive-schema'='true',
  15.   'connector.type'='kafka',
  16.   'update-mode'='append',
  17.   'connector.properties.bootstrap.servers'='hadoop003:9092',
  18.   'connector.properties.group.id'='flink_gp_test1',
  19.   'format.type'='json'
  20.  );
  21. create table flink_test_6_dim ( 
  22.   id BIGINT,
  23.   coupon_amnount BIGINT
  24. )
  25.  with ( 
  26.    'connector.type' = 'jdbc',
  27.    'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8',
  28.    'connector.table' = 'test_dim',
  29.    'connector.username' = 'flink_web_test',
  30.    'connector.password' = 'flink_web_test_123',
  31.    'connector.lookup.max-retries' = '3'
  32.  );
  33. CREATE TABLE sync_test_3 (
  34.                    day_time string,
  35.                    total_gmv bigint
  36.  ) WITH (
  37.    'connector.type' = 'jdbc',
  38.    'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8',
  39.    'connector.table' = 'sync_test_3',
  40.    'connector.username' = 'flink_web_test',
  41.    'connector.password' = 'flink_web_test_123'
  42.  );
  43. INSERT INTO sync_test_3 
  44. SELECT 
  45.   day_time
  46.   SUM(amnount - coupon_amnount) AS total_gmv 
  47. FROM 
  48.   (
  49.     SELECT 
  50.       a.day_time as day_time
  51.       a.amnount as amnount, 
  52.       b.coupon_amnount as coupon_amnount 
  53.     FROM 
  54.       flink_test_6 as a 
  55.       LEFT JOIN flink_test_6_dim  FOR SYSTEM_TIME AS OF  a.proctime  as b
  56.      ON b.id = a.id
  57.   ) 
  58. GROUP BY day_time;

官方相关预发和连接下载

请移步 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/

四、支持flink sql官方语法

完全按照flink1.12的连接器相关的配置 详见

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/

如果需要使用到连接器请去官方下载 如:kafka 连接器 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html

第一种下载连接器后直接放到 flink/lib/目录下就可以使用了

  1. 1、该方案存在jar冲突可能,特别是连接器多了以后
  2. 2、在非yarn模式下每次新增jar需要重启flink集群服务器

第二种放到http的服务下填写到三方地址

  1. 公司内部建议放到内网的某个http服务
  2. http://ccblog.cn/jars/flink-connector-jdbc_2.11-1.12.0.jar
  3. http://ccblog.cn/jars/flink-sql-connector-kafka_2.11-1.12.0.jar
  4. http://ccblog.cn/jars/flink-streaming-udf.jar
  5. http://ccblog.cn/jars/mysql-connector-java-5.1.25.jar

图片

多个url使用换行

自定义连接器打包的时候需要打成shade 并且解决jar的冲突

个人建议使用第二种方式,每个任务之间jar独立,如果把所有连接器放到lib 可能会和其他任务的jar冲突公用的可以放到flink/lib目录里面  如:mysql驱动 kafka连接器等

五、其他

1、由于hadoop集群环境不一样可能导致部署出现困难,整个搭建比较耗时.

2、由于es 、hbase等版本不一样可能需要下载源码重新选择对应版本 源码地址 https://github.com/zhp8341/flink-streaming-platform-web

六、问题

1、

  1. Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
  2. Could not build the program from JAR file.
  3. Use the help option (-h or --help) to get help on the command.
  4. 解决
  5.    export HADOOP_HOME=/etc/hadoop
  6.    export HADOOP_CONF_DIR=/etc/hadoop/conf
  7.    export HADOOP_CLASSPATH=`hadoop classpath`
  8.    source /etc/profile
  9.   最好配置成全局变量

2

  1. 2020-10-02 14:48:22,060 ERROR com.flink.streaming.core.JobApplication                       - 任务执行失败:
  2. java.lang.IllegalStateException: Unable to instantiate java compiler
  3.         at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
  4.         at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
  5.         at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
  6.         at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
  7.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
  8.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
  9.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
  10.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
  11.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
  12.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
  13.         at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
  14.         at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
  15.         at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
  16.         at org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
  17.         at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
  18.         at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
  19.         at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
  20.         at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
  21.         at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
  22.         at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
  23.         at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
  24.         at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
  25.         at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
  26.         at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
  27.         at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
  28.         at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
  29.         at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
  30.         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  31.         at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
  32.         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  33.         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  34.         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  35.         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  36.         at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
  37.         at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
  38.         at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
  39.         at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
  40.         at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
  41.         at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
  42.         at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
  43.         at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
  44.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
  45.         at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
  46.         at com.flink.streaming.core.JobApplication.callDml(JobApplication.java:138)
  47.         at com.flink.streaming.core.JobApplication.main(JobApplication.java:85)
  48.         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  49.         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  50.         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  51.         at java.lang.reflect.Method.invoke(Method.java:498)
  52.         at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
  53.         at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
  54.         at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
  55.         at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
  56.         at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
  57.         at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
  58.         at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
  59.         at java.security.AccessController.doPrivileged(Native Method)
  60.         at javax.security.auth.Subject.doAs(Subject.java:422)
  61.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
  62.         at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  63.         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
  64. Caused by: java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
  65.         at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
  66.         at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
  67.         at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:432)
  68.         ... 60 more
  69. conf/flink-conf.yaml 

配置里面 设置  classloader.resolve-order: parent-first

主要日志目录

1、web系统日志

/{安装目录}/flink-streaming-platform-web/logs/

2 、flink客户端命令

${FLINK_HOME}/log/flink-${USER}-client-.log

七、RoadMap

  • 支持除官方以外的连接器  如:阿里云的sls

  • 任务告警自动拉起

  • 支持Application模式

  • 完善文档

 

本文转自:https://github.com/zhp8341/flink-streaming-platform-web

 

 

图片

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/690812
推荐阅读
相关标签
  

闽ICP备14008679号