赞
踩
FlinkCDC-Hudi:Mysql数据实时入湖全攻略一:初试风云
FlinkCDC-Hudi:Mysql数据实时入湖全攻略二:Hudi与Spark整合时所遇异常与解决方案
FlinkCDC-Hudi:Mysql数据实时入湖全攻略三:探索实现FlinkCDC mysql 主从库同步高可用
FlinkCDC-Hudi:Mysql数据实时入湖全攻略四:两种FlinkSql kafka connector的特征与应用
FlinkCDC-Hudi:Mysql数据实时入湖全攻略五:FlinkSQL同时输出到kafka与hudi的几种实现
在探索完FlinkCDC-Hudi的特征和基础应用之后,我们对FlinkCDC-Hudi入湖
程序进行了极限压测,也因此炸出了很多坑,一些是内存不够引起的,一些是bug引起的。相应坑点与解决方法记录如下。
内存不足时,表现出的异常有很多种,有各种time out,gc overhead,oom等。内存不足导致taskmanager重启时,大概率会产生数据丢失,因为重启后的运行状态不一定能与出异常时的状态完成一致。这时只能重新拉取数据。所以FlinkCDC-Hudi作业一定要配置足够的内存资源,否则出现的异常可能会影响数据的可用性。这种问题直接加大内存来解决。
内存不足主要原因是hudi merge on read 表在compact时需要大量额外的内存。
Full GC 下的超时:
S0 S1 E O M CCS YGC YGCT FGC FGCT GCT
0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475
0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475
0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475
0.00 0.00 100.00 99.92 95.40 92.78 2156 34.635 126 85.840 120.475
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_e09_1636707402790_0448_01_000030 timed out.
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
java.io.IOException: Could not perform checkpoint 17 for operator hoodie_stream_write (1/1)#13.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1048)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 17 for operator hoodie_stream_write (1/1)#13. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1092)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1076)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1032)
... 19 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upsetting bucketType UPDATE for partition :
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.handleUpsertPartition(BaseFlinkCommitActionExecutor.java:199)
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:108)
at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:70)
at org.apache.hudi.table.action.commit.FlinkWriteHelper.write(FlinkWriteHelper.java:72)
at org.apache.hudi.table.action.commit.delta.FlinkUpsertDeltaCommitActionExecutor.execute(FlinkUpsertDeltaCommitActionExecutor.java:49)
at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.upsert(HoodieFlinkMergeOnReadTable.java:72)
at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:145)
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:183)
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:460)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:453)
at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:130)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:150)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
... 29 more
Caused by: java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:362)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:56)
at org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:34)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:188)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:134)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:157)
at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:124)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:109)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:100)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:53)
作业异常进行回滚时,回滚失败。这个异常由于instance已经完成,但仍被要求回滚产生的。这个问题0.9已经出现,0.10.0 bug修复时只修复了一个场景,在0.10.1再次进行了修复。最新的master代码又将0.10.1的代码重构了。更新后暂未重现这个问题。
[BUG] ROLLBACK meet Cannot use marker based rollback strategy on completed error #4439
2022-01-28 14:30:27
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'hoodie_stream_write' (operator 3e96073598340713e5fe75c259385e9e).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:170)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [initialize instant ] error
... 5 more
Caused by: org.apache.hudi.exception.HoodieRollbackException: Failed to rollback hdfs:///tmp/flink/cdcata/flinkcdc_user_cdc commits 20220127163259397
at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:661)
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:957)
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:940)
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:928)
at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:816)
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143)
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:815)
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:808)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93)
... 3 more
Caused by: java.lang.IllegalArgumentException: Cannot use marker based rollback strategy on completed instant:[20220127163259397__commit__COMPLETED]
at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.<init>(BaseRollbackActionExecutor.java:90)
at org.apache.hudi.table.action.rollback.BaseRollbackActionExecutor.<init>(BaseRollbackActionExecutor.java:71)
at org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor.<init>(MergeOnReadRollbackActionExecutor.java:48)
at org.apache.hudi.table.HoodieFlinkMergeOnReadTable.rollback(HoodieFlinkMergeOnReadTable.java:131)
at org.apache.hudi.client.AbstractHoodieWriteClient.rollback(AbstractHoodieWriteClient.java:646)
... 13 more
AbstractHoodieWriteClient.getPendingRollbackInfos使用stream.collect.map出现key重复,新版已修复。
[SUPPORT] java.lang.IllegalStateException: Duplicate key Option #4227
2022-01-27 17:08:52,198 ERROR org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [initialize instant ] error java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@14a67ea4} at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) ~[?:1.8.0_202]
at java.util.HashMap.merge(HashMap.java:1254) ~[?:1.8.0_202]
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_202] at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_202]
at org.apache.hudi.client.AbstractHoodieWriteClient.getPendingRollbackInfos(AbstractHoodieWriteClient.java:910) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:927) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:917) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:810) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:809) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:802) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] 2022-01-27 17:08:52,221 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure. org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'hoodie_stream_write' (operator 3e96073598340713e5fe75c259385e9e).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553) ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:170) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:103) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202] Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [initialize instant ] error ... 5 more Caused by: java.lang.IllegalStateException: Duplicate key Option{val=org.apache.hudi.common.HoodiePendingRollbackInfo@14a67ea4}
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) ~[?:1.8.0_202] at java.util.HashMap.merge(HashMap.java:1254) ~[?:1.8.0_202]
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_202] at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_202]
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_202]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_202]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_202]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_202] at org.apache.hudi.client.AbstractHoodieWriteClient.getPendingRollbackInfos(AbstractHoodieWriteClient.java:910) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:927) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.rollbackFailedWrites(AbstractHoodieWriteClient.java:917) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.lambda$startCommitWithTime$97cdbdca$1(AbstractHoodieWriteClient.java:810) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.common.util.CleanerUtils.rollbackFailedWrites(CleanerUtils.java:143) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:809) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.client.AbstractHoodieWriteClient.startCommitWithTime(AbstractHoodieWriteClient.java:802) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.startInstant(StreamWriteOperatorCoordinator.java:334) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0]
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$initInstant$5(StreamWriteOperatorCoordinator.java:361) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$execute$0(NonThrownExecutor.java:93) ~[hudi-flink-bundle_2.11-0.10.0.jar:0.10.0] ... 3 more
Flinkcdc同步binlog时,高频写入多个更新语句时,概率性出现ArrayIndexOutOfBoundsException。该异常会导致作业异常重启,并且存在重启失败、数据丢失的可能。目前社区中已有反馈,bug未修复。
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:141)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onEventDeserializationFailure(MySqlStreamingChangeEventSource.java:1193)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:958)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
... 5 more
Caused by: java.lang.ArrayIndexOutOfBoundsException
at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:65)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:262)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:241)
at java.io.InputStream.skip(InputStream.java:224)
at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.skipToTheEndOfTheBlock(ByteArrayInputStream.java:280)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:305)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:232)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:233)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:945)
... 3 more
以上是压测中炸出来的坑,后续再有新坑,另行补充。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。