当前位置:   article > 正文

基于Flink CDC datastream mysql to mysql 序列化sql 数据同步_mysql to mysql数据同步使用datastream api

mysql to mysql数据同步使用datastream api

基于Flink CDC datastream mysql to mysql 序列化sql 数据同步

Flink CDC有两种方式同步数据库:

1. 一种是通过FlinkSQL直接输入两表数据库映射进行数据同步,缺点是只能单表进行同步;
2. 一种是通过DataStream开发一个maven项目,打成jar包上传到服务器运行,可以多表多库。

本方案使用DataStream方法,同步两表中的数据。
不需要部署Flink,可单独使用。

主要工作

工程下载连接,点击进入

  • 运行简单,只需要配置源数据库与目标数据库信息,运行MysqlCDC文件中的main函数即可使用。
  • 修复了mysql数据时区问题、修复了deatatime同步到数据库报错问题
  • 使用分为以下三步:

一、 修改源数据库信息

在这里插入图片描述

二、 修改目标数据库信息

在这里插入图片描述

三、 启动服务

在这里插入图片描述
主要源码:

package org.apache.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.jvnet.hk2.annotations.Service;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

/**
 * @author dake
 */
@Service
public class MysqlSink extends RichSinkFunction<String> {
    private Connection connection = null;
    Statement sqlExecute;



    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        try {
            if (connection == null) {
                Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
                connection = DriverManager.getConnection("jdbc:mysql://IP地址:3306/test_target?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=true"
                        , "账号", "密码");//获取连接
                sqlExecute = connection.createStatement();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void invoke(String value, Context context) {
        try {
            sqlExecute.execute(value);
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println("value = " + value);
    }

    @Override
    public void close() throws Exception {
        super.close();

        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
package org.apache.flink;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.config.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

public class MysqlCDC {
    public static void main(String[] args) throws Exception {

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("IP地址")
                .port(3306)
                .startupOptions(StartupOptions.initial()) //全量同步
                .scanNewlyAddedTableEnabled(true) // 开启支持新增表
                .databaseList("test_master_resource") // set captured database
                .tableList("test_master_resource.test1,test_master_resource.test2") // set captured table
                .username("账号")
                .password("密码")
                .debeziumProperties(getDebeziumProperties())
                .serverTimeZone("Asia/Shanghai")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        Configuration configuration = new Configuration();
        // 生产环境夏下,改成参数传进来
//        configuration.setString("execution.savepoint.path","file:///tmp/flink-ck/1980d53f557a886f885172bcdf4be8e8/chk-21");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);

        // enable checkpoint
        env.enableCheckpointing(3000);
        // 设置本地
        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-ck");
        env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(4)
                .addSink(new MysqlSink());
//                .print("==>").setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print MySQL Snapshot + Binlog");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/811555
推荐阅读
相关标签
  

闽ICP备14008679号