赞
踩
将下列依赖包放在flink/lib
flink-sql-connector-kafka-1.16.2
创建MySQL映射表
- CREATE TABLE if not exists mysql_user (
- id int,
- name STRING,
- birth STRING,
- gender STRING,
- PRIMARY KEY (`id`) NOT ENFORCED
- ) WITH (
- 'connector'= 'mysql-cdc',
- 'hostname'= '192.168.0.1',
- 'port'= '3306',
- 'username'= 'user',
- 'password'='password',
- 'server-time-zone'= 'Asia/Shanghai',
- 'debezium.snapshot.mode'='initial',
- 'database-name'= 'bigdata',
- 'table-name'= 'user'
- );
-
- select * from mysql_user;
创建upsert-kafka 表
- CREATE TABLE kafka_user_upsert(
- id int,
- name string,
- birth string,
- gender string,
- PRIMARY KEY (`id`) NOT ENFORCED
- ) WITH (
- 'connector' = 'upsert-kafka',
- 'topic' = 'flink-cdc-user',
- 'properties.bootstrap.servers' = '192.168.0.4:6668',
- 'properties.group.id' = 'flink-cdc-kafka-group',
- 'key.format' = 'json',
- 'value.format' = 'json'
- );
这里指定的Kafka topic会自动创建,也可以预先自行创建
- insert into kafka_user_upsert select * from mysql_user;
-
- select * from kafka_user_upsert;
系列文章
Fink CDC数据同步(一)环境部署https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502
Fink CDC数据同步(二)MySQL数据同步https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hivehttps://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkahttps://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hivehttps://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。