赞
踩
业务部门最近使用Flink来做数据实时同步,通过同步工具把CDC消息接入Kafka,其中上百张表同步到单个topic里,然后通过Flink来消费Kafka,做数据解析、数据分发、然后发送到目标数据库(mysql/oracle),整个链路相对比较简单,之前通过Jstorm来实现,最近才迁移到Flink,通过Flink DataStream API来实现。代码里仅用到Kafka Source、Map、Process几个简单的算子,发送目标库的逻辑在Process的逻辑里实现,因此process的逻辑里涉及数据库连接的创建与清理、通过队列来缓存数据,创建额外线程来启动发送和消费队列的逻辑,先不说整个逻辑是否合理,本文主要基于此案例来阐述遇到的问题和排查思路以及解决方法。
业务部门使用的Flink版本为1.11.2,部署模式采用Standalone
。出问题的是单机环境,即有一个JobManager
进程和一个TaskManager
进程。
问题现象是通过web页面观察发现启动任务后很短时间任务就发生重启,同时还会出现重启去Cancel
任务的时候无法Cancel
,一直处于CANCELING
状态(正常会很快变成CANCELED
)。并且过一段时候后TaskManager
进程挂掉,导致任务一直处于无法申请Slot
的状态,最终导致数据无法正常同步。因此,问题主要有以下几个现象:
Checkpoint
超时CANCELING
,任务长时间处于RESTARTING
状态TaskManager
进程挂掉看到问题的第一反应是首先看TaskManager
进程为什么会挂掉,这个问题比较严重,因为涉及到集群层面而不单单是任务了。查看Taskmanager
日志,发现有以下片段:
// 日志1
ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds.
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
2021-03-05 04:09:30,816 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds.
at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1572) [flink-dist_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
// 日志2
WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'Source: Custom Source -> Map -> Process (1/1)' did not react to cancelling signal for 30 seconds, but is stuck in method:
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
java.lang.Thread.run(Thread.java:748)
看日志1发现有关键日志Task did not exit gracefully within 180 + seconds
打印,于是查看Flink源码,查看包含此日志的代码。
private static class TaskCancelerWatchDog implements Runnable { @Override public void run() { try { final long hardKillDeadline = System.nanoTime() + timeoutMillis * 1_000_000; long millisLeft; while (executerThread.isAlive() && (millisLeft = (hardKillDeadline - System.nanoTime()) / 1_000_000) > 0) { try { executerThread.join(millisLeft); } catch (InterruptedException ignored) { } } if (executerThread.isAlive()) { String msg = "Task did not exit gracefully within " + (timeoutMillis / 1000) + " + seconds."; taskManager.notifyFatalError(msg, new FlinkRuntimeException(msg)); } } catch (Throwable t) { } } }
可以看到TaskCancelerWatchDog
是用来监听Cancel任务是否成功的线程,如果超过timeoutMillis
执行线程还处理alive状态,则向TaskManager
进程抛出FatalError
,而这个timeoutMillis
是通过task.cancellation.timeout
参数来指定,默认是180s,如果指定为0则不开启这个功能。
日志2涉及的源码如下:
private static final class TaskInterrupter implements Runnable { @Override public void run() { try { executerThread.join(interruptIntervalMillis); while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { // log.warn("Task '{}' did not react to cancelling signal for {} seconds, but is stuck in method:\n {}", taskName, (interruptIntervalMillis / 1000), bld); executerThread.interrupt(); try { executerThread.join(interruptIntervalMillis); } catch (InterruptedException e) { // we ignore this and fall through the loop } } } catch (Throwable t) { // FatalError } } }
TaskInterrupter
是用来中断执行线程的线程,这个日志可以看出需要Cancel但还在执行的线程的堆栈信息。从以上两段日志可以看出,TaskManager
进程挂掉的原因是由于任务在180s内没被正常Cancel导致。为了防止TaskManager
进程挂掉,我们添加参数task.cancellation.timeout: 0
显然,问题1是由于问题2导致的,那问题2是什么原因导致的呢,从问题1的日志堆栈可以看到在执行StreamTask.invoke
,此堆栈好像也没提供比较有用的信息。我们只能猜测,某个Task在执行Cancel
的时候未被Cancel
掉,可能是因为某种原因hang住导致,下面再进一步分析。但还有一个问题是这个Task是什么原因需要去执行Cancel
操作呢?因为没有人为的去执行Cancel
操作,所以肯定是Flink自己去Cancel
的,具体的原因继续往下看,我们发现standalonesession
(JobManager
日志)日志里存在checkpoint
超时的错误日志,关键信息为expired before completing
:
INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 58 of job f46ee0d14fe0e6f91253e78487796f5b expired before completing.
INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1673) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1650) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:91) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1783) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_181]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
因此接着看第三个问题
expired before completing
的日志意味着checkpoint
发生超时,确认任务配置参数,配置如下:
execution.checkpointing.interval: 60000
execution.checkpointing.timeout: 60000
execution.checkpointing.max-concurrent-checkpoints: 500
execution.checkpointing.min-pause: 500
一开始以为checkpoint
超时时间设置太短,于是增大超时时间到30分钟,但从web界面发现,大量checkpoint
处于pendding状态,最终还会超时。因为未设置execution.checkpointing.tolerable-failed-checkpoints
,因此一旦发生超时,任务将会发生重启。
看代码和日志都看不出个所以然,只能查看TaskManager
进程的堆栈来排查了,目的是看下发生checkpoint
超时的时候内部线程运行情况是怎么样的。Flink1.11.2也提供了web界面查看stack的功能,但相比jstack命令打印的还是有点区别,这里还是采用jstack -l id > id.jstack的方式来进行排查。
查看stack发现有处于BLOCKED
状态的线程
"Source: Custom Source -> Map -> mysqlmsg (1/1)" #77 prio=5 os_prio=0 tid=0x00007fb560080800 nid=0x7c3b waiting for monitor entry [0x00007fb539609000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:91)
- waiting to lock <0x0000000702a61a58> (a java.lang.Object)
接着查看0x0000000702a61a58
对应的线程
"Legacy Source Thread - Source: Custom Source -> Map -> mysqlmsg (1/1)" #82 prio=5 os_prio=0 tid=0x00007fb48c485000 nid=0x7c4e in Object.wait() [0x00007fb47f3fd000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.ibatis.datasource.pooled.PooledDataSource.popConnection(PooledDataSource.java:451) - locked <0x000000070396d180> (a org.apache.ibatis.datasource.pooled.PoolState) at org.apache.ibatis.datasource.pooled.PooledDataSource.getConnection(PooledDataSource.java:90) at org.apache.ibatis.transaction.jdbc.JdbcTransaction.openConnection(JdbcTransaction.java:139) at org.apache.ibatis.transaction.jdbc.JdbcTransaction.getConnection(JdbcTransaction.java:61) at org.apache.ibatis.executor.BaseExecutor.getConnection(BaseExecutor.java:338) at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:84) at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:62) at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:326) at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156) at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109) at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:83) at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:148) at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:141) at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:77) at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:83) at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:59) at com.sun.proxy.$Proxy35.selectByPrimaryKey(Unknown Source) //省略 org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) - locked <0x0000000702a61a58> (a java.lang.Object) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) - locked <0x0000000702a61a58> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) Locked ownable synchronizers: - None
可以看到PooledDataSource.popConnection
一直在阻塞,即在获取连接时阻塞了。于是查看初始化连接池的配置,没有配置poolMaximumActiveConnections
,即默认最大连接数为10,而代码在ProcessFunction
的processElement
方法里采用短连接方式获取数据库连接,每次来一波数据都创建连接,发送完断开连接。因此很容易因为获取不到连接而使得processElement
方法处于阻塞状态。而processElement
方法阻塞进而影响Barrier
的流动,所以导致了Checkpoint
发生超时。
基于以上几个问题的定位,这个问题就很好解释了,首先由于阻塞导致了Checkpoint
发生超时(问题3),然后导致任务重启,在重启时由于阻塞的线程hang住无法Cancel
(问题2),由于TaskCancelerWatchDog
的存在导致超过默认时间180s后TaskManager
挂掉(问题1)。最后导致了问题4数据无法正常同步。
原因定位清楚了,离解决问题就近在咫尺了,可以采用几种方式:
poolMaximumActiveConnections
;open
时初始化连接,close
方法销毁连接;flink-jdbc-connector
来发送数据,因为数据源涉及上百张表,需要有分流的操作。本文基于实时同步任务遇到无法正常同步的问题进行排查分析,旨在提供一种当遇到Flink Checkpoint
超时问题时的排查思路,同时也顺便介绍了在Standalone
部署模式下运行Flink任务的一种典型问题-TaskManager
无缘无故挂掉的问题,希望给正在使用Flink的同学提供一种思路,避免踩坑。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。