赞
踩
组件 | 版本 |
---|---|
flink | 1.14.4 |
Flink-mysql-cdc | 2.2.1 |
Mysql | 5.7+ |
Dinky | 0.6.6 |
依赖图:
create database emp_1;
use emp_1;
CREATE TABLE IF NOT EXISTS `employees_1` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;
EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop102',
'port' = '3306',
'username' = 'root',
'password' = 'xxxxxx',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'emp_1\.employees_[0-9]+',
'sink.connector' = 'print',
)
查看作业详情栏, 如下图右上角所示, 他们的含义分别为:
名称 | 含义 |
---|---|
智能停止 | 触发一次Savepoint, 并停止作业 |
Savepoint 触发 | 触发一次Savepoint. 作业继续运行 |
Savepoint 暂停 | 触发一次Savepoint. 并暂停运行 |
Savepoint 停止 | 触发一次Savepoint, 并停止作业 |
insert into employees_1 VALUES ("55", "2020-09-15", "huang", "meiji", "F", "2022-04-12");
create database emp_2;
use emp_2;
CREATE TABLE IF NOT EXISTS `employees_2` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- flink sql
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;
EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop102',
'port' = '3306',
'username' = 'root',
'password' = 'xxxxxx',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'emp_2\.employees_[0-9]+',
'sink.connector' = 'print',
)
insert into employees_2 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_2 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
insert into employees_2 VALUES ("13", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。