赞
踩
flink-connector-cdc 独立于flink项目,顾名思义集成的时候要注意版本,注意版本,注意版本
flink-1.14.3 cdc jar 免费下载
如上flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar
即为flink1.14的依赖,需要在flink_home/lib/下面添加该依赖。
这个依赖需要自己编译,官方提供的只到2.1.1(在2022-03-11 17:05还没最新的)。方法如下:
官方提供的方法:flink-cdc readme
git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests
编译好后直接用啥取啥,编译过程会下亿堆插件,so慢
接下来进入flink的/bin目录启动集群
start-cluster.sh
查看
点这里看flink-web-ui
### demo flink-sql cdc mysql 数据
需要开启mysql的binlog,并且创建的表要有主键
3. 创建mysql表:
-- mysql
show databases;
use test;
create table if not exists test (
id int primary key auto_increment,
name varchar(32)
);
sql-client.sh
-- flink sql
CREATE TABLE test (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = '用户名',
'password' = '密码',
'database-name' = '数据库名',
'table-name' = '表名'
);
-- mysql
insert into test values(0, "pjs");
insert into test values(0, "jyl");
6.查看flink-sql输出
-- flink sql
select * from test;
pom
<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink.version>1.14.3</flink.version> <scala.version>2.11.12</scala.version> <scala.binary.version>2.11</scala.binary.version> <flink.cdc.version>2.2-SNAPSHOT</flink.cdc.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>${flink.cdc.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
demo:
gitee
public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("test") // set captured database .tableList("test.test") // set captured table .username("kuro") .password("pwdsdfsa;_=sfds") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JString .build(); Configuration configuration = Configuration.fromMap(Map.of("rest.port", "10010")); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration); // enable checkpoint env.enableCheckpointing(3000); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); }
输出:
{"before":null,"after":{"id":2,"name":"ljy"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221523,"transaction":null}
{"before":null,"after":{"id":1,"name":"kuro"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221522,"transaction":null}
{"before":null,"after":{"id":3,"name":"liyouqiang"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":0,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1646993221524,"transaction":null}
3月 11, 2022 6:07:05 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to localhost:3306 at mysql-bin.000001/3592 (sid:5536, cid:33)
{"before":{"id":1,"name":"kuro"},"after":{"id":1,"name":"LJY"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1646993370000,"snapshot":"false","db":"test","sequence":null,"table":"test","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3813,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1646993370264,"transaction":null}
flink-cdc 第一次会全量同步数据,其后就会增量进行同步
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。