当前位置:   article > 正文

flink读取mysql表中的时间字段error java.time.LocalDateTime cannot be cast to java.sql.Timestamp_flowable的 java.time.localdatetime cannot be cast t

flowable的 java.time.localdatetime cannot be cast to java.lang.string

版本说明
flink 版本 1.13.5
mysql 版本 8.0.26

操作

使用mysql table api 读取mysql8中的一个表,这个表的时间字段类型是datetime,在创建数据源的时候,对应mysql 的时间类型,设置成 datatime,String 都不行,执行都报错。
这让我emo好久,找不出问题所在。这应该是mysql的一个bug。

具体错误信息

Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
	at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
	at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152)
	at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160)
	at com.quant.flowcalculation.flinkapi.msql.ReadsMysql.main(ReadsMysql.java:69)
Caused by: java.io.IOException: Failed to fetch job execution result
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
	at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
	... 5 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
	... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
	at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
	... 7 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
	at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
	at org.apache.flink.api.common.typeutils.base.SqlTimestampSerializer.copy(SqlTimestampSerializer.java:30)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copyPositionBased(RowSerializer.java:163)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:142)
	at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:72)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79

解决办法

目前定位问题是 mysql8的驱动问题,服务器mysql的版本是8.0.26,我试了驱动 8.0.26 8.0.25 好几个版本,最后发现低版本的可以解决这个问题,我使用mysql 8.0.16 ,设置时间类型为 timestamp 就可以查询了。

复现测试

public class ReadsMysql {
    public static void main(String[] args) throws Exception {
        // 连接mysql 数据源
        String sourceDDL = "CREATE TABLE c_ex_stock (\n" +
                "  EndDate TIMESTAMP,\n" +
                "  InnerCode INT,\n" +
                "  StatisPeriod INT,\n" +
                "  StatisType VARCHAR,\n" +
                "  SecuCode VARCHAR,\n" +
                "  SecuAbbr VARCHAR\n" +
                "  CalDate TIMESTAMP\n" +
                ") WITH (\n" +
                "'connector.type' = 'jdbc'," +
                "'connector.url' = 'jdbc:mysql://xx.xx.120.201:3306/fl?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT&2B8'," +
                "'connector.driver' = 'com.mysql.cj.jdbc.Driver'," +
                "'connector.table' = 'c_ex_stock'," +
                " 'connector.username' = 'xxx'," +
                " 'connector.password' = 'xxxx'"+
                ")";

        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 注册source和sink
        tEnv.executeSql(sourceDDL);
 
        // 数据提取
        Table sourceTable = tEnv.from("c_ex_stock");

        TableResult tableResult = tEnv.executeSql("select * from " + sourceTable);
        tableResult.print();

        // 执行作业
        env.execute("flink-read-mysql");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/560581
推荐阅读
相关标签
  

闽ICP备14008679号