赞
踩
启动Yarn Session
$FLINK_HOME/bin/yarn-session.sh -jm 16384 -tm 16384 -d 2>&1 &
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
Flink SQL 代码:
-- 设置checkpoint的时间间隔
set execution.checkpointing.interval=60sec;
-- 设置任务结束后不清空checkpoint文件,便于后续恢复
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
-- 同时只能有一个checkpoint进程
set execution.checkpointing.max-concurrent-checkpoints=1;
CREATE TABLE flink_mysql_cdc1 (
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hp8',
'port' = '3306',
'username' = 'root',
'password' = 'abc123',
'database-name' = 'test',
'table-name' = 'mysql_cdc',
'server-id' = '5409-5415',
'scan.incremental.snapshot.enabled'='true'
);
CREATE TABLE flink_hudi_mysql_cdc1(
id BIGINT NOT NULL PRIMARY KEY NOT ENFORCED,
name varchar(100)
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hp5:8020/tmp/hudi/flink_hudi_mysql_cdc1',
'table.type' = 'MERGE_ON_READ',
'changelog.enabled' = 'true',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.precombine.field' = 'name',
'compaction.async.enabled' = 'true',
'hive_sync.enable' = 'true',
'hive_sync.table' = 'flink_hudi_mysql_cdc1',
'hive_sync.db' = 'test',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hp5:9083',
'hive_sync.conf.dir'='/home/apache-hive-3.1.2-bin/conf'
);
set table.exec.resource.default-parallelism=4;
insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
Flink web界面:
2000w数据初始化已经完成
checkpoint日志量真的多
hdfs查看checkpoint日志量
在Flink web界面将Flink SQL任务手工结束掉
查找最近的checkpoint:
代码:
set 'execution.savepoint.path'='hdfs://hp5:8020/vmcluster/flink-checkpoints/a2874606453b4aebfdaca2f627355f99/chk-23';
insert into flink_hudi_mysql_cdc1 select * from flink_mysql_cdc1;
待测试:
如果是整个yarn-session异常,也可以启动yarnsession的时候指定checkpoint。
$FLINK_HOME/bin/yarn-session.sh -jm 8192 -tm 8192 -d -s hdfs://hp5:8020/vmcluster/flink-checkpoints/c12eb538c2e8965d2d94c170b67641f2/chk-1/_metadata
/home/flink-1.14.5/bin/sql-client.sh embedded -s yarn-session
service mysqld stop
Flink可以自己重试,这个还是比较不错,无需人工干预。
等mysql启动成功之后,任务又可以继续衔接上。
别人的checkpoint
我的checkpoint
看来是我的checkpoint都没成功
修改checkpoint时间间隔:
-- 修改前:
set execution.checkpointing.interval=10sec;
-- 修改后:
set execution.checkpointing.interval=60sec;
报错信息:
Checkpoint Coordinator is suspending.
解决方案:
把yarn-session的资源由8G提升到16G问题解决。
对于一些大表,最好还是先通过Spark进行初始化,然后在接增量。
https://developer.aliyun.com/ask/435979
http://events.jianshu.io/p/032396543ceb
在网上看到的资源都是针对代码级别的,没有看到Flink SQL级别的
还是得上官网查找
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/
set execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。