赞
踩
CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改,并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT, DELETE, UPDATE 等。
在 flinksql 中,cdc 数据几乎等价于 changelog,核心就在对 record 的 rowkind(+I/-U/+U/-D)进行适配。
flinksql 中操作 cdc 数据,通过 cdc 连接器即可。flink 的 cdc connector,在核心包中是没有集成的, 需要额外引入依赖。
示例:用 flink-mysql-cdc 连接器,映射源表,并进行查询计算写回mysql表
public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint"); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); // 建表 tenv.executeSql("CREATE TABLE flink_score (" + " id INT," + " name string," + " gender string," + " score double," + " PRIMARY KEY(id) NOT ENFORCED" + " ) WITH (" + " 'connector' = 'mysql-cdc'," + " 'hostname' = 'hdp01'," + " 'port' = '3306'," + " 'username' = 'root'," + " 'password' = 'root'," + " 'database-name' = 'flinktest'," + " 'table-name' = 'score'" + ")"); // 查询 tenv.executeSql("select * from flink_score")/*.print()*/; tenv.executeSql("select gender,avg(score) as avg_score from flink_score group by gender")/*.print()*/; // 建一个目标表,用来存放查询结果:每种性别中,总分最高的前2个人 tenv.executeSql( "create table flink_rank(" + " gender string," + " name string," + " score_amt double," + " rn bigint," + " primary key(gender,rn) not enforced" + ") with (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:mysql://hdp01:3306/flinktest'," + " 'table-name' = 'score_rank'," + " 'username' = 'root'," + " 'password' = 'root' " + ")" ); tenv.executeSql("insert into flink_rank" + "SELECT" + " gender," + " name," + " score_amt," + " rn" + "from(" + "SELECT" + " gender," + " name," + " score_amt," + " row_number() over(partition by gender order by score_amt desc) as rn" + "from" + "(" + "SELECT" + "gender," + "name," + "sum(score) as score_amt" + "from flink_score" + "group by gender,name" + ") o1" + ") o2" + "where rn <= 2"); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。