赞
踩
Flink CDC有两种方式同步数据库:
1. 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步;
2. 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行,可以多表多库。
本方案使用DataStream方法,同步两表中的数据。
不需要部署Flink,可单独使用。
主要源码:
package org.apache.flink; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.jvnet.hk2.annotations.Service; import java.sql.Connection; import java.sql.DriverManager; import java.sql.Statement; /** * @author dake */ @Service public class MysqlSink extends RichSinkFunction<String> { private Connection connection = null; Statement sqlExecute; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); try { if (connection == null) { Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动 connection = DriverManager.getConnection("jdbc:mysql://IP地址:3306/test_target?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true" , "账号", "密码");//获取连接 sqlExecute = connection.createStatement(); } }catch (Exception e){ e.printStackTrace(); } } @Override public void invoke(String value, Context context) { try { sqlExecute.execute(value); }catch (Exception e){ e.printStackTrace(); } System.out.println("value = " + value); } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } super.close(); } }
package org.apache.flink; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.config.JsonDebeziumDeserializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.Properties; public class MysqlCDC { public static void main(String[] args) throws Exception { MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("IP地址") .port(3306) .startupOptions(StartupOptions.initial()) //全量同步 .scanNewlyAddedTableEnabled(true) // 开启支持新增表 .databaseList("test_master_resource") // set captured database .tableList("test_master_resource.test1,test_master_resource.test2") // set captured table .username("账号") .password("密码") .debeziumProperties(getDebeziumProperties()) .serverTimeZone("Asia/Shanghai") .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build(); Configuration configuration = new Configuration(); // 生产环境夏下,改成参数传进来 // configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); // enable checkpoint env.enableCheckpointing(3000); // 设置本地 env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck"); env .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source") .setParallelism(4) .addSink(new MysqlSink()); // .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message ordering env.execute("Print MySQL Snapshot + Binlog"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。