当前位置:   article > 正文

使用Flink CDC实现 Oracle数据库数据同步(非SQL)_flink cdc oracle

flink cdc oracle


前言

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。


一、开启归档日志

1)数据库服务器终端,使用sysdba角色连接数据库

 sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;
  • 1
  • 2
  • 3
  • 4

2)检查归档日志是否开启

archive log list;
  • 1

(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态

二、创建flinkcdc专属用户

2.1 对于Oracle 非CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  GRANT CREATE SESSION TO flinkuser;
  GRANT SET CONTAINER TO flinkuser;
  GRANT SELECT ON V_$DATABASE to flinkuser;
  GRANT FLASHBACK ANY TABLE TO flinkuser;
  GRANT SELECT ANY TABLE TO flinkuser;
  GRANT SELECT_CATALOG_ROLE TO flinkuser;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
  GRANT SELECT ANY TRANSACTION TO flinkuser;
  GRANT LOGMINING TO flinkuser;
  GRANT ANALYZE ANY TO flinkuser;

  GRANT CREATE TABLE TO flinkuser;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser;
  GRANT ALTER ANY TABLE TO flinkuser;
  GRANT CREATE SEQUENCE TO flinkuser;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

  GRANT SELECT ON V_$LOG TO flinkuser;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
  GRANT SELECT ON V_$LOGFILE TO flinkuser;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

2.2 对于Oracle CDB数据库,执行如下sql

  CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
  GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
  GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
  GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
  GRANT LOGMINING TO flinkuser CONTAINER=ALL;
  GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
  -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
  GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
  GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;

  GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
  GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;

  GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
  GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

三、指定oracle表、库级启用

-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

四、使用flink-connector-oracle-cdc实现数据库同步

4.1 引入pom依赖

 <dependency>
     <groupId>com.ververica</groupId>
     <artifactId>flink-connector-oracle-cdc</artifactId>
     <version>2.4.0</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

4.1 Java主代码

package test.datastream.cdc.oracle;


import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;

import java.util.Properties;

public class OracleCdcExample {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        //数字类型数据 转换为字符
        properties.setProperty("decimal.handling.mode", "string");
        SourceFunction<String> sourceFunction = OracleSource.<String>builder()
//                .startupOptions(StartupOptions.latest()) // 从最晚位点启动
                .url("jdbc:oracle:thin:@localhost:1521:orcl")
                .port(1521)
                .database("ORCL") // monitor XE database
                .schemaList("c##flink_user") // monitor inventory schema
                .tableList("c##flink_user.TEST2") // monitor products table
                .username("c##flink_user")
                .password("flinkpw")
                .debeziumProperties(properties)
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
        SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());

        SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new CacheDataAllWindowFunction());
		//批量同步
        winStream.addSink(new DbCdcSinkFunction(null));
        env.execute();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

4.1 json转换为row

package test.datastream.cdc.oracle.function;

import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * @desc cdc json解析,并转换为Row
 */
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
    private Map<String,Integer> columnMap =new HashMap<>();
    @Override
    public void open(Configuration parameters) throws Exception {
        columnMap.put("ID",0);
        columnMap.put("NAME",1);
        columnMap.put("DESCRIPTION",2);
        columnMap.put("AGE",3);
        columnMap.put("CREATE_TIME",4);
        columnMap.put("SCORE",5);
        columnMap.put("C_1",6);
        columnMap.put("B_1",7);
    }
    @Override
    public void flatMap(String s, Collector<Row> collector) throws Exception {
        System.out.println("receive: "+s);
        VsConfiguration conf=VsConfiguration.from(s);
        String op = conf.getString(CdcConstants.K_OP);
        VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
        VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
        Row row =null;
        if(CdcConstants.OP_C.equals(op)){
            //插入,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }else if(CdcConstants.OP_U.equals(op)){
            //更新,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.UPDATE_AFTER);
        }else if(CdcConstants.OP_D.equals(op)){
            //删除,使用before数据
            row = convertToRow(before);
            row.setKind(RowKind.DELETE);
        }else {
            //r 操作,使用after数据
            row = convertToRow(after);
            row.setKind(RowKind.INSERT);
        }
        collector.collect(row);
    }

    private Row convertToRow(VsConfiguration data){
        Set<String> keys = data.getKeys();
        int size = keys.size();
        Row row=new Row(8);
        int i=0;
        for (String key:keys) {
            Integer index = this.columnMap.get(key);
            Object value=data.get(key);
            if(key.equals("CREATE_TIME")){
                //long日期转timestamp
                value=long2Timestamp((Long)value);
            }
            row.setField(index,value);
        }
        return row;
    }
    private static  java.sql.Timestamp long2Timestamp(Long time){
        Timestamp timestamp = new Timestamp(time/1000);
        System.out.println(timestamp);
        return timestamp;
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/796714
推荐阅读
相关标签
  

闽ICP备14008679号