赞
踩
着重点在本地调试flink-cdc踩坑
选择flink-cdc的原因是为简化cdc过程中依赖的工具链,flink-cdc通过复用debezium的connect和kafka-connect实现直连flink,再者可通过flink平台适配的各种source sink和SQL client 轻松实现数据源同步。
本地调试也需要flink的依赖, 注意冲突此处使用1.13.6 [国内仓库下载]
(https://mirrors.huaweicloud.com/apache/flink/)
[mysqld]
server-id = 1
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
本地调试flink非常重要的点,很多错误不加日志配置根本看不到
monitorInterval=30 # This affects logging for both user code and Flink rootLogger.level=INFO logger.aaron.level=INFO logger.aaron.name=org.apache.flink rootLogger.appenderRef.file.ref=MainAppender logger.akka.name=akka logger.akka.level=INFO logger.kafka.name=org.apache.kafka logger.kafka.level=INFO logger.hadoop.name=org.apache.hadoop logger.hadoop.level=INFO logger.zookeeper.name=org.apache.zookeeper logger.zookeeper.level=INFO logger.shaded_zookeeper.name=org.apache.flink.shaded.zookeeper3 logger.shaded_zookeeper.level=INFO # Log all infos in the given file appender.main.name=MainAppender appender.main.type=RollingFile appender.main.append=true appender.main.fileName=console.log appender.main.filePattern=console.log.%i appender.main.layout.type=PatternLayout appender.main.layout.pattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.main.policies.type=Policies appender.main.policies.size.type=SizeBasedTriggeringPolicy appender.main.policies.size.size=100MB appender.main.policies.startup.type=OnStartupTriggeringPolicy appender.main.strategy.type=DefaultRolloverStrategy appender.main.strategy.max=${env:MAX_LOG_FILE_NUMBER:-10} logger.netty.name=org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level=OFF
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class Application { public static void main(String[] args) throws Exception { mysql_cdc(); } public static void mysql_cdc() throws Exception { Properties properties = new Properties(); properties.setProperty("decimal.handling.mode", "double"); //debezium 小数转换处理策略 properties.setProperty("database.serverTimezone", "GMT+8"); //debezium 配置以database. 开头的属性将被传递给jdbc url MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("192.168.31.233") .port(3306) .databaseList("test_database") // set captured database .tableList("test_database.test_table") // set captured table .username("root") .password("123456") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .debeziumProperties(properties) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // enable checkpoint env.enableCheckpointing(3000); env.getCheckpointConfig().setCheckpointStorage( new FileSystemCheckpointStorage("file:///flink-ck/checkpoints")); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") // set 4 parallel source tasks .setParallelism(4) .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } }
依赖参考
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flink-cdc</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.13.6</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.11</scala.binary.version> <maven.compiler.source>${java.version}</maven.compiler.source> <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> <!--基础依赖 flink-streaming-java flink-clients 有这两个包就能本地跑 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.22</version> </dependency> <!-- 日志框架 --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.14.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>2.14.1</version> </dependency> </dependencies> <build> <plugins> <!-- Java Compiler --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.example.cdc.Application</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
第一次是全量快照, 之后是binlog的offset拉取,Flink Checkpoint 持久化断点续传
{"before":null,"after":{"id":1,"name":"zhangsan"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288811,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288815,"transaction":null}
{"before":null,"after":{"id":3,"name":"wangwu"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null}
{"before":null,"after":{"id":2,"name":"lisi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677288816,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1640677288816,"transaction":null}
十二月 28, 2021 3:41:30 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to 192.168.31.233:3308 at mysql-bin.000003/2526 (sid:6257, cid:36)
{"before":null,"after":{"id":5,"name":"qianqi"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677322000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":2724,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1640677323291,"transaction":null}
{"before":{"id":5,"name":"qianqi"},"after":{"id":5,"name":"钱七"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1640677338000,"snapshot":"false","db":"test_database","sequence":null,"table":"test_table","server_id":1,"gtid":null,"file":"mysql-bin.000003","pos":3001,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1640677338533,"transaction":null}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。