当前位置:   article > 正文

FlinkSQL系列04-CDC连接器_flink sql cdc

flink sql cdc

CDC

CDC,Change Data Capture,变更数据获取的简称,使用 CDC 我们可以从数据库中获取已提交的更改,并将这些更改发送到下游,供下游使用。这些变更可以包括 INSERT, DELETE, UPDATE 等。

使用场景

  • 使用 flink sql 进行数据同步,将数据从一个地方同步到其他地方,比如从 mysql 到 doris
  • 可以在源数据库上实时的物化一个聚合视图
  • 因为只是增量同步,所以可以实时的低延迟的同步数据

flinkSQL 对 cdc 的支持

在 flinksql 中,cdc 数据几乎等价于 changelog,核心就在对 record 的 rowkind(+I/-U/+U/-D)进行适配。
flinksql 中操作 cdc 数据,通过 cdc 连接器即可。flink 的 cdc connector,在核心包中是没有集成的, 需要额外引入依赖。

mysql-cdc-connector

示例:用 flink-mysql-cdc 连接器,映射源表,并进行查询计算写回mysql表

public static void main(String[] args) {

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
	env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint");

	StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

	// 建表
	tenv.executeSql("CREATE TABLE flink_score (" +
			"      id INT," +
			"      name string," +
			"      gender string," +
			"      score double," +
			"     PRIMARY KEY(id) NOT ENFORCED" +
			"     ) WITH (" +
			"     'connector' = 'mysql-cdc'," +
			"     'hostname' = 'hdp01'," +
			"     'port' = '3306'," +
			"     'username' = 'root'," +
			"     'password' = 'root'," +
			"     'database-name' = 'flinktest'," +
			"     'table-name' = 'score'" +
			")");

	// 查询
	tenv.executeSql("select * from flink_score")/*.print()*/;

	tenv.executeSql("select  gender,avg(score) as avg_score  from  flink_score group by gender")/*.print()*/;

	// 建一个目标表,用来存放查询结果:每种性别中,总分最高的前2个人
	tenv.executeSql(
			"create table flink_rank(" +
					"   gender    string," +
					"   name      string," +
					"   score_amt double," +
					"   rn        bigint," +
					"   primary key(gender,rn) not enforced" +
					") with (" +
					"  'connector' = 'jdbc'," +
					"  'url' = 'jdbc:mysql://hdp01:3306/flinktest'," +
					"  'table-name' = 'score_rank'," +
					"  'username' = 'root'," +
					"  'password' = 'root' " +
					")"
	);

	tenv.executeSql("insert into flink_rank" +
			"SELECT" +
			"  gender," +
			"  name," +
			"  score_amt," +
			"  rn" +
			"from(" +
			"SELECT" +
			"  gender," +
			"  name," +
			"  score_amt," +
			"  row_number() over(partition by gender order by score_amt desc) as rn" +
			"from" +
			"(" +
			"SELECT" +
			"gender," +
			"name," +
			"sum(score) as score_amt" +
			"from flink_score" +
			"group by gender,name" +
			") o1" +
			") o2" +
			"where rn <= 2");
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/937863
推荐阅读
相关标签
  

闽ICP备14008679号