赞
踩
遵循历史,依然要感谢各位在网上的输出!
首先要明确的是是否需要开启BinLog,如果在正式环境上,一般不用自己开启,运维侧一般为了恢复数据等恢复策略已经开启了BinLog哈,我这里的操作仅支持个人测试使用哦!
Linux:更改my.cnf文件
Windows:更改my.ini文件(可以从“服务”入手查看my.ini文件地址)
在my.ini中的[mysqld]下添加以下语句:
#log-bin:备份的文件目录和文件名
#binlog_do_db:指定数据库名,不写即默认所有数据库
log-bin="D:/mysql/data/log"
#binlog_do_db=test
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.13.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.3</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.13.3</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.12.5</version> </dependency> </dependencies>
这里的代码示例与refer中的示例类似哈,可以借鉴那位大哥的一样的
package com.example.flinkcdc; import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource; import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class mysqlBinlogSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("localhost") .port(3306) .databaseList("20210906_test") .tableList("20210906_test.city") .username("root") .password("123456") .serverTimeZone("Asia/Shanghai") // 可以自定义反序列化器,类似于binlog的不同数据处理方式的处理办法 .deserializer(new StringDebeziumDeserializationSchema()) .build(); // //TODO 2.检查点配置 // //2.1 开启检查点 // env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);//5秒执行一次,模式:精准一次性 // //2.2 设置检查点超时时间 // env.getCheckpointConfig().setCheckpointTimeout(60*1000); // //2.3 设置重启策略 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2*1000));//两次,两秒执行一次 // //2.4 设置job取消后检查点是否保留 // env.getCheckpointConfig().enableExternalizedCheckpoints( // CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//保留 // //2.5 设置状态后端-->保存到hdfs // env.setStateBackend(new FsStateBackend("hdfs://192.168.231.121:8020/ck")); // //2.6 指定操作hdfs的用户 // System.setProperty("HADOOP_USER_NAME", "gaogc"); //3.2 从源端获取数据 DataStreamSource<String> sourceDS = env.addSource(sourceFunction); //打印测试 sourceDS.print(); //执行 env.execute(); } }
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=binlog.000001, pos=1572}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=110011}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{after=Struct{ID=110011,Name=TIANJIN,CountryCode=IDN,District=WEST JAVA,Population=123123123},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=20210906_test,table=city,server_id=0,file=binlog.000001,pos=1572,row=0},op=c,ts_ms=1639388695105}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1639388718, file=binlog.000001, pos=1637, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=110011}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{before=Struct{ID=110011,Name=TIANJIN,CountryCode=IDN,District=WEST JAVA,Population=123123123},after=Struct{ID=110011,Name=beijing,CountryCode=IDN,District=WEST JAVA,Population=123123123},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1639388718000,db=20210906_test,table=city,server_id=1,file=binlog.000001,pos=1784,row=0,thread=5},op=u,ts_ms=1639388718331}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1639388757, file=binlog.000001, pos=1978, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=10012}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{after=Struct{ID=10012,Name=dongbei,CountryCode=IDN,District=WEST JAVA,Population=1231231231},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1639388757000,db=20210906_test,table=city,server_id=1,file=binlog.000001,pos=2125,row=0,thread=5},op=c,ts_ms=1639388757341}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1639388768, file=binlog.000001, pos=2287, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.20210906_test.city', kafkaPartition=null, key=Struct{ID=10012}, keySchema=Schema{mysql_binlog_source.20210906_test.city.Key:STRUCT}, value=Struct{before=Struct{ID=10012,Name=dongbei,CountryCode=IDN,District=WEST JAVA,Population=1231231231},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1639388768000,db=20210906_test,table=city,server_id=1,file=binlog.000001,pos=2434,row=0,thread=5},op=d,ts_ms=1639388768361}, valueSchema=Schema{mysql_binlog_source.20210906_test.city.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。