赞
踩
对于flink webUI的查看,首先还是根据先整体观看一下Task的划分,然后根据Exception去寻找对应的TaskManager相关信息,然后双向确定是什么原因造成的;作为metrics,虽然看起来花里胡哨,本质上对flink底层实现机制的基本逻辑弄懂,应该还是比较容易看的;
注:由大佬有这块比较好的分享,跪求评论区共享一下;
job代码:https://github.com/interestingcom/FlinkTest/blob/master/src/main/java/com/example/job/WordCount.java
以WordCount为例,说明Flink Webui界面参数解析,详细代码参考:
核心代码如下:
StreamExecutionEnvironment streamingEnvironment = FlinkEnvironmentFactory.createStreamingEnvironment();
streamingEnvironment.setParallelism(4);
SingleOutputStreamOperator<String> word = streamingEnvironment.addSource(new WordCountSource()).flatMap(new SplitMapFunction());
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = word.map(value -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING,Types.INT));
KeyedStream<Tuple2<String, Integer>, String> tuple2StringKeyedStream = wordAndOne.keyBy(value -> value.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tuple2StringKeyedStream.sum(1);
sum.print("flink计算结果===>");
streamingEnvironment.execute();
产生异常的代码:
if ( string.equals("world") && randomInt %5==0){
throw new Exception("随机数是randomInt"+randomInt+"遇到单词world...");
}
jar提交界面
位置2:传入main class;
进入Overview界面,位置1,黑色表示总共的subTask个数,绿色表示正在运行的subTask个数;
点击Running Jobs,进入WordCount的这个job
点入一个Task,我们设置的并行度为4,这里启动4个subTasks示例,运行在3个TaskManager上,Record received/sent表征数据在两个算子之间的传输条数(准确来说是已经处理的数据条数),Bytes received/sent表示数据传输的字节数(这里可能包括Barrier导致的字节数不准确);在Yarn集群上,这里的Bytes sent不为0
如果Job 运行时throw Exception,会抛出到Job运行的界面中,这里查看异常的时候,一般直接找最底层的Cause By即可;checkpoint后续补充,背压
对于task Manager来说,核心主要关注日志文件;位置1记录了TaskManager的内存状态,位置2是日志信息,对应在flink安装目录的日志文件里面,可以用于辅助排查运行时异常;位置3是信息,目前没有用到过;Metrice后续补充
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。