赞
踩
项目可以成功运行,并且成功监听数据库pigxx_user.user表打,但点击页面随便一个查询时报错:
报错展示:
- 2023-08-21 09:44:43.082 INFO 22880 --- [td. Out (1/1)#0] c.v.c.d.internal.DebeziumChangeFetcher : Database snapshot phase can't perform checkpoint, acquired Checkpoint lock.
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=1,username=1,password=1,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283000,snapshot=true,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283003}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=2,username=8,password=8,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283006,snapshot=true,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283006}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=1681914600579715074}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=1681914600579715074,username=【测试】张三,password=2222333,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283007,snapshot=true,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283007}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1692582283, file=mysql-bin.000004, pos=2058}} ConnectRecord{topic='mysql_binlog_source.pigxx_user.user', kafkaPartition=null, key=Struct{id=1691718995043532801}, keySchema=Schema{mysql_binlog_source.pigxx_user.user.Key:STRUCT}, value=Struct{after=Struct{id=1691718995043532801,username=aa,password=22333,del_flag=0},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1692582283007,snapshot=last,db=pigxx_user,table=user,server_id=0,file=mysql-bin.000004,pos=2058,row=0},op=r,ts_ms=1692582283007}, valueSchema=Schema{mysql_binlog_source.pigxx_user.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- 2023-08-21 09:44:43.085 INFO 22880 --- [td. Out (1/1)#0] c.v.c.d.internal.DebeziumChangeFetcher : Received record from streaming binlog phase, released checkpoint lock.
- 2023-08-21 09:44:43.156 INFO 22880 --- [rce-coordinator] i.d.c.m.MySqlStreamingChangeEventSource : Keepalive thread is running
- 2023-08-21 09:44:48.081 WARN 22880 --- [tp820854836-191] org.eclipse.jetty.server.HttpChannel : /user/page
- java.lang.NoSuchMethodError: org.eclipse.jetty.http.pathmap.PathMappings.getMatched(Ljava/lang/String;)Lorg/eclipse/jetty/http/pathmap/MatchedResource;
- at org.eclipse.jetty.servlet.ServletHandler.getMatchedServlet(ServletHandler.java:577) ~[jetty-servlet-9.4.51.v20230217.jar:9.4.51.v20230217]
- at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473) ~[jetty-servlet-9.4.51.v20230217.jar:9.4.51.v20230217]
- at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1582) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.Server.handle(Server.java:516) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:556) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375) ~[flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) [flink-connector-mysql-cdc-2.0.0.jar:2.0.0]
- at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) [jetty-util-9.4.51.v20230217.jar:9.4.51.v20230217]
- at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) [jetty-util-9.4.51.v20230217.jar:9.4.51.v20230217]
- at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
我的依赖:
- <!-- CDC -->
- <!-- Apache Flink Java API,用于编写 Flink 作业 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>1.13.0</version>
- </dependency>
- <!-- Apache Flink 流处理 Java API,用于实时数据流处理 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.12</artifactId>
- <version>1.13.0</version>
- </dependency>
- <!-- Apache Flink 客户端库,用于提交和管理 Flink 作业 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_2.12</artifactId>
- <version>1.13.0</version>
- </dependency>
- <!-- Apache Hadoop 客户端库,用于访问 HDFS 和其他 Hadoop 功能 -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.1.3</version>
- </dependency>
- <!-- MySQL 数据库 Java 驱动程序,用于连接和操作 MySQL 数据库 -->
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.21</version>
- </dependency>
- <!-- Apache Flink 表计划器库 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table-planner-blink_2.12</artifactId>
- <version>1.12.0</version>
- </dependency>
- <!-- Apache Flink MySQL CDC 连接器,用于捕获 MySQL 数据库变更 -->
- <dependency>
- <groupId>com.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>2.0.0</version>
- </dependency>
- <!-- 阿里巴巴 Fastjson 库,用于 JSON 数据序列化和反序列化 -->
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>1.2.75</version>
- </dependency>
Flink CDC监听:
- @Component
- public class FlinkCDC implements ApplicationRunner, Serializable {
-
- @Override
- public void run(ApplicationArguments args) throws Exception {
- //1.获取Flink执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- //通过FlinkCDC构建SourceFunction
- DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
- .hostname("localhost")
- .port(3306)
- .username("root")
- .password("666666")
- .databaseList("pigxx_user") //监控的数据库
- .tableList("pigxx_user.user") //监控的数据库下的表
- .deserializer(new StringDebeziumDeserializationSchema())//反序列化
- .startupOptions(StartupOptions.initial())
- .serverTimeZone("GMT-8")
- .build();
- DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);
-
- //3.数据打印
- dataStreamSource.print();
-
- //4.启动任务
- env.execute("FlinkCDC");
- }
- }
分析:ServletHandler这个类找不到方法getMatchedServlet,编码阶段通过,说明编译阶段这个方法存在,运行时找不到这个方法。说明可能存在两个ServletHandler类。
点击查看:
当时第一反应排除flink-connector-mysql-cdc依赖里的jetty,结果任何作用都没有,后来仔细观察pom文件,注释掉Hadoop客户端后就没有报错了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。