当前位置:   article > 正文

Spring boot集成Flink CDC时,运行时jetty报错_database snapshot phase can't perform checkpoint,

database snapshot phase can't perform checkpoint, acquired checkpoint lock.

一、报错展示

项目可以成功运行,并且成功监听数据库pigxx_user.user表打,但点击页面随便一个查询时报错:
报错展示:
 

  1. 2023-08-21 09:44:43.082 INFO 22880 --- [td. Out (1/1)#0] c.v.c.d.internal.DebeziumChangeFetcher : Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.
  2. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=1,username=1,password=1,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283000,snapshot=true,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283003}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  3. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=2,username=8,password=8,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283006,snapshot=true,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283006}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  4. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=1681914600579715074}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=1681914600579715074,username=【测试】张三,password=2222333,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283007,snapshot=true,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283007}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  5. SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=1691718995043532801}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=1691718995043532801,username=aa,password=22333,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283007,snapshot=last,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283007}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
  6. 2023-08-21 09:44:43.085 INFO 22880 --- [td. Out (1/1)#0] c.v.c.d.internal.DebeziumChangeFetcher : Received record from streaming binlog phase, released checkpoint lock.
  7. 2023-08-21 09:44:43.156 INFO 22880 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Keepalive thread is running
  8. 2023-08-21 09:44:48.081 WARN 22880 --- [tp820854836-191] org.eclipse.jetty.server.HttpChannel : /user/page
  9. java.lang.NoSuchMethodError: org.eclipse.jetty.http.pathmap.PathMappings.getMatched(Ljava/lang/String;)Lorg/eclipse/jetty/http/pathmap/MatchedResource;
  10. at org.eclipse.jetty.servlet.ServletHandler.getMatchedServlet(ServletHandler.java:577) ~[jetty-servlet-9.4.51.v20230217.jar:9.4.51.v20230217]
  11. at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) ~[jetty-servlet-9.4.51.v20230217.jar:9.4.51.v20230217]
  12. at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1582) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  13. at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  14. at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  15. at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  16. at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  17. at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  18. at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  19. at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  20. at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  21. at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  22. at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  23. at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  24. at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
  25. at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) [jetty-util-9.4.51.v20230217.jar:9.4.51.v20230217]
  26. at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) [jetty-util-9.4.51.v20230217.jar:9.4.51.v20230217]
  27. at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]

我的依赖:

  1. <!-- CDC -->
  2. <!-- Apache Flink Java API,用于编写 Flink 作业 -->
  3. <dependency>
  4. <groupId>org.apache.flink</groupId>
  5. <artifactId>flink-java</artifactId>
  6. <version>1.13.0</version>
  7. </dependency>
  8. <!-- Apache Flink 流处理 Java API,用于实时数据流处理 -->
  9. <dependency>
  10. <groupId>org.apache.flink</groupId>
  11. <artifactId>flink-streaming-java_2.12</artifactId>
  12. <version>1.13.0</version>
  13. </dependency>
  14. <!-- Apache Flink 客户端库,用于提交和管理 Flink 作业 -->
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-clients_2.12</artifactId>
  18. <version>1.13.0</version>
  19. </dependency>
  20. <!-- Apache Hadoop 客户端库,用于访问 HDFS 和其他 Hadoop 功能 -->
  21. <dependency>
  22. <groupId>org.apache.hadoop</groupId>
  23. <artifactId>hadoop-client</artifactId>
  24. <version>3.1.3</version>
  25. </dependency>
  26. <!-- MySQL 数据库 Java 驱动程序,用于连接和操作 MySQL 数据库 -->
  27. <dependency>
  28. <groupId>mysql</groupId>
  29. <artifactId>mysql-connector-java</artifactId>
  30. <version>8.0.21</version>
  31. </dependency>
  32. <!-- Apache Flink 表计划器库 -->
  33. <dependency>
  34. <groupId>org.apache.flink</groupId>
  35. <artifactId>flink-table-planner-blink_2.12</artifactId>
  36. <version>1.12.0</version>
  37. </dependency>
  38. <!-- Apache Flink MySQL CDC 连接器,用于捕获 MySQL 数据库变更 -->
  39. <dependency>
  40. <groupId>com.ververica</groupId>
  41. <artifactId>flink-connector-mysql-cdc</artifactId>
  42. <version>2.0.0</version>
  43. </dependency>
  44. <!-- 阿里巴巴 Fastjson 库,用于 JSON 数据序列化和反序列化 -->
  45. <dependency>
  46. <groupId>com.alibaba</groupId>
  47. <artifactId>fastjson</artifactId>
  48. <version>1.2.75</version>
  49. </dependency>

Flink CDC监听:
 

  1. @Component
  2. public class FlinkCDC implements ApplicationRunner, Serializable {
  3. @Override
  4. public void run(ApplicationArguments args) throws Exception {
  5. //1.获取Flink执行环境
  6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7. env.setParallelism(1);
  8. //通过FlinkCDC构建SourceFunction
  9. DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
  10. .hostname("localhost")
  11. .port(3306)
  12. .username("root")
  13. .password("666666")
  14. .databaseList("pigxx_user") //监控的数据库
  15. .tableList("pigxx_user.user") //监控的数据库下的表
  16. .deserializer(new StringDebeziumDeserializationSchema())//反序列化
  17. .startupOptions(StartupOptions.initial())
  18. .serverTimeZone("GMT-8")
  19. .build();
  20. DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
  21. //3.数据打印
  22. dataStreamSource.print();
  23. //4.启动任务
  24. env.execute("FlinkCDC");
  25. }
  26. }

分析:ServletHandler这个类找不到方法getMatchedServlet,编码阶段通过,说明编译阶段这个方法存在,运行时找不到这个方法。说明可能存在两个ServletHandler类。
点击查看:

 当时第一反应排除flink-connector-mysql-cdc依赖里的jetty,结果任何作用都没有,后来仔细观察pom文件,注释掉Hadoop客户端后就没有报错了。

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/926243
推荐阅读
相关标签
  

闽ICP备14008679号