当前位置:   article > 正文

Hudi系列25: Flink SQL使用checkpoint恢复job异常

Hudi系列25: Flink SQL使用checkpoint恢复job异常

一. 通过Flink SQL将MySQL数据写入Hudi

启动Yarn Session

$FLINK_HOME/bin/yarn-session.sh -jm 16384 -tm 16384 -d  2>&1 &

/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session 
  • 1
  • 2
  • 3

Flink SQL 代码:

-- 设置checkpoint的时间间隔
set execution.checkpointing.interval=60sec;
-- 设置任务结束后不清空checkpoint文件,便于后续恢复
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
-- 同时只能有一个checkpoint进程
set execution.checkpointing.max-concurrent-checkpoints=1;

CREATE TABLE flink_mysql_cdc1 (
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'hp8',
    'port' = '3306',
    'username' = 'root',
    'password' = 'abc123',
    'database-name' = 'test',
    'table-name' = 'mysql_cdc',
    'server-id' = '5409-5415',
    'scan.incremental.snapshot.enabled'='true'
);

CREATE TABLE flink_hudi_mysql_cdc1(
    id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
    name varchar(100)
  ) WITH (
   'connector' = 'hudi',
   'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc1',
   'table.type' = 'MERGE_ON_READ',
   'changelog.enabled' = 'true',
   'hoodie.datasource.write.recordkey.field' = 'id',
   'write.precombine.field' = 'name',
   'compaction.async.enabled' = 'true',
   'hive_sync.enable' = 'true',
   'hive_sync.table' = 'flink_hudi_mysql_cdc1',
   'hive_sync.db' = 'test',
   'hive_sync.mode' = 'hms',
   'hive_sync.metastore.uris' = 'thrift://hp5:9083',
   'hive_sync.conf.dir'='/home/apache-hive-3.1.2-bin/conf'
);


set table.exec.resource.default-parallelism=4;

insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

Flink web界面:
2000w数据初始化已经完成
image.png

checkpoint日志量真的多
image.png

hdfs查看checkpoint日志量
image.png

二. 模拟Flink任务异常

2.1 手工停止job

在Flink web界面将Flink SQL任务手工结束掉
image.png

2.2 指定checkpoint来恢复数据

查找最近的checkpoint:
image.png

代码:

set 'execution.savepoint.path'='hdfs://hp5:8020/vmcluster/flink-checkpoints/a2874606453b4aebfdaca2f627355f99/chk-23';

insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
  • 1
  • 2
  • 3

image.png

image.png

2.3 整个yarn-session上的任务恢复

待测试:
如果是整个yarn-session异常,也可以启动yarnsession的时候指定checkpoint。

$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d -s hdfs://hp5:8020/vmcluster/flink-checkpoints/c12eb538c2e8965d2d94c170b67641f2/chk-1/_metadata

/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session 
  • 1
  • 2
  • 3

三. 模拟源端异常

3.1 手工关闭源端 MySQL 服务

service mysqld stop
  • 1

3.2 FLink任务查看

Flink可以自己重试,这个还是比较不错,无需人工干预。

mysql启动成功之后,任务又可以继续衔接上。

image.png

FAQ:

1. checkpoint未写入数据

别人的checkpoint
image.png

我的checkpoint
看来是我的checkpoint都没成功
image.png

修改checkpoint时间间隔:

-- 修改前:
set execution.checkpointing.interval=10sec;

-- 修改后:
set execution.checkpointing.interval=60sec;
  • 1
  • 2
  • 3
  • 4
  • 5

2. checkpoint 失败

报错信息:

Checkpoint Coordinator is suspending.
  • 1

image.png

解决方案:
把yarn-session的资源由8G提升到16G问题解决。

对于一些大表,最好还是先通过Spark进行初始化,然后在接增量。

3. 手工取消Flink job后,checkpoint文件自动删除

https://developer.aliyun.com/ask/435979
http://events.jianshu.io/p/032396543ceb

在网上看到的资源都是针对代码级别的,没有看到Flink SQL级别的
还是得上官网查找
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/

set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
  • 1

参考:

  1. https://blog.csdn.net/qq_31866793/article/details/103069646
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/876397
推荐阅读
相关标签
  

闽ICP备14008679号