赞
踩
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。
1)数据库服务器终端,使用sysdba角色连接数据库
sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;
2)检查归档日志是否开启
archive log list;
(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT ANALYZE ANY TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
GRANT LOGMINING TO flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
-- 指定表启用补充日志记录:
ALTER TABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 为数据库的所有表启用
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- 指定数据库启用补充日志记录
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.4.0</version>
</dependency>
package test.datastream.cdc.oracle;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.types.Row;
import test.datastream.cdc.oracle.function.CacheDataAllWindowFunction;
import test.datastream.cdc.oracle.function.CdcString2RowMap;
import test.datastream.cdc.oracle.function.DbCdcSinkFunction;
import java.util.Properties;
public class OracleCdcExample {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
//数字类型数据 转换为字符
properties.setProperty("decimal.handling.mode", "string");
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
// .startupOptions(StartupOptions.latest()) // 从最晚位点启动
.url("jdbc:oracle:thin:@localhost:1521:orcl")
.port(1521)
.database("ORCL") // monitor XE database
.schemaList("c##flink_user") // monitor inventory schema
.tableList("c##flink_user.TEST2") // monitor products table
.username("c##flink_user")
.password("flinkpw")
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message ordering
SingleOutputStreamOperator<Row> mapStream = source.flatMap(new CdcString2RowMap());
SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new CacheDataAllWindowFunction());
//批量同步
winStream.addSink(new DbCdcSinkFunction(null));
env.execute();
}
}
package test.datastream.cdc.oracle.function;
import cn.com.victorysoft.common.configuration.VsConfiguration;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import test.datastream.cdc.CdcConstants;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @desc cdc json解析,并转换为Row
*/
public class CdcString2RowMap extends RichFlatMapFunction<String, Row> {
private Map<String,Integer> columnMap =new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
columnMap.put("ID",0);
columnMap.put("NAME",1);
columnMap.put("DESCRIPTION",2);
columnMap.put("AGE",3);
columnMap.put("CREATE_TIME",4);
columnMap.put("SCORE",5);
columnMap.put("C_1",6);
columnMap.put("B_1",7);
}
@Override
public void flatMap(String s, Collector<Row> collector) throws Exception {
System.out.println("receive: "+s);
VsConfiguration conf=VsConfiguration.from(s);
String op = conf.getString(CdcConstants.K_OP);
VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);
VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);
Row row =null;
if(CdcConstants.OP_C.equals(op)){
//插入,使用after数据
row = convertToRow(after);
row.setKind(RowKind.INSERT);
}else if(CdcConstants.OP_U.equals(op)){
//更新,使用after数据
row = convertToRow(after);
row.setKind(RowKind.UPDATE_AFTER);
}else if(CdcConstants.OP_D.equals(op)){
//删除,使用before数据
row = convertToRow(before);
row.setKind(RowKind.DELETE);
}else {
//r 操作,使用after数据
row = convertToRow(after);
row.setKind(RowKind.INSERT);
}
collector.collect(row);
}
private Row convertToRow(VsConfiguration data){
Set<String> keys = data.getKeys();
int size = keys.size();
Row row=new Row(8);
int i=0;
for (String key:keys) {
Integer index = this.columnMap.get(key);
Object value=data.get(key);
if(key.equals("CREATE_TIME")){
//long日期转timestamp
value=long2Timestamp((Long)value);
}
row.setField(index,value);
}
return row;
}
private static java.sql.Timestamp long2Timestamp(Long time){
Timestamp timestamp = new Timestamp(time/1000);
System.out.println(timestamp);
return timestamp;
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。