当前位置:   article > 正文

Flink connector Oracle CDC 实时同步数据到MySQL(Oracle19c)_flinkcdc oracle19c

flinkcdc oracle19c

准备工作

在这一步需要配置Oracle。主要包含。

  1. 开启Archive log
  2. 开启数据库和数据表的supplemental log
  3. 创建CDC用户并赋予权限

注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。

下面开始配置步骤。在安装Oracle的机器上执行:

  1. su - oracle
  2. sqlplus / as sysdba

进入Sqlplus。然后开启Archive log。

  1. alter system set db_recovery_file_dest_size = 10G;
  2. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
  3. shutdown immediate;
  4. startup mount;
  5. alter database archivelog;
  6. alter database open;
  7. # 检查Archive log是否成功开启
  8. archive log list;

注意:'/opt/oracle/oradata/recovery_area' 这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)

  1. 本步骤需要重启数据库,请选择在合适的时间操作。
  2. 例子中的/opt/oracle/oradata/recovery_area目录oracle用户需要有读写权限。
  3. 如果执行alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。
  1. show parameter spfile;
  2. # 如果输出value为空,说明没有创建spfile,执行下面SQL创建
  3. create spfile from pfile;
  4. # 关闭并重启
  5. shutdown immediate;
  6. startup;
  7. # 检查spfile是否成功创建
  8. show parameter spfile;

接下来进行相关重要的配置:

  1. -- 检查日志归档是否开启
  2. archive log list;
  3. -- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
  4. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  5. - 创建表空间
  6. CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  7. -- 创建用户family绑定表空间LOGMINER_TBS
  8. CREATE USER C##family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
  9. -- 授予family用户dba的权限
  10. grant connect,resource,dba to C##family;
  11. - 并授予权限
  12. GRANT CREATE SESSION TO C##family;
  13. GRANT SELECT ON V_$DATABASE to C##family;
  14. GRANT FLASHBACK ANY TABLE TO C##family;
  15. GRANT SELECT ANY TABLE TO C##family;
  16. GRANT SELECT_CATALOG_ROLE TO C##family;
  17. GRANT EXECUTE_CATALOG_ROLE TO C##family;
  18. GRANT SELECT ANY TRANSACTION TO C##family;
  19. GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##family;
  20. GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
  21. GRANT CREATE TABLE TO C##family;
  22. GRANT LOCK ANY TABLE TO C##family;
  23. GRANT ALTER ANY TABLE TO C##family;
  24. GRANT CREATE SEQUENCE TO C##family;
  25. GRANT EXECUTE ON DBMS_LOGMNR TO C##family;
  26. GRANT EXECUTE ON DBMS_LOGMNR_D TO C##family;
  27. GRANT SELECT ON V_$LOG TO C##family;
  28. GRANT SELECT ON V_$LOG_HISTORY TO C##family;
  29. GRANT SELECT ON V_$LOGMNR_LOGS TO C##family;
  30. GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
  31. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C##family;
  32. GRANT SELECT ON V_$LOGFILE TO C##family;
  33. GRANT SELECT ON V_$ARCHIVED_LOG TO C##family;
  34. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C##family;

本地用Navicate连接Oracle:

  1. -- 创建 STUDENT_INFO 表
  2. create table student_info (
  3. sid number(10) constraint pk_sid primary key,
  4. sname varchar2(10),
  5. sex varchar2(2)
  6. );
  7. -- 修改STUDENT_INFO表让其支持增量日志,先在Oracle里创建STUDENT_INFO表再执行下面的语句
  8. ALTER TABLE C##FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

至此,Oracle配置完成;

Flink CDC 配置:

这里用Flink SQL CLI 做演示:

使用以下命令切换到 Flink 目录

cd flink-1.14.0

 使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。

在 Flink SQL CLI 中使用 Flink DDL 创建表

创建从相应数据库表中捕获更改数据的表。

Oracle CDC 表可以定义如下:

  1. -- register an Oracle table 'student_info' in Flink SQL
  2. Flink SQL> CREATE TABLE student_info (
  3. SID INT NOT NULL,
  4. SNAME STRING,
  5. SEX STRING,
  6. PRIMARY KEY(SID) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'oracle-cdc',
  9. 'hostname' = 'localhost',
  10. 'port' = '1521',
  11. 'username' = 'C##family',
  12. 'password' = 'zyhcdc',
  13. 'database-name' = 'ORCLCDB',
  14. 'schema-name' = 'C##FAMILY',
  15. 'table-name' = 'STUDENT_INFO');
  16. -- read snapshot and binlogs from products table
  17. Flink SQL> SELECT * FROM student_info;

在定义一个数据流出到MySQL的表:

  1. -- register an Oracle table 'mysql_user' in Flink SQL
  2. Flink SQL> CREATE TABLE mysql_user (
  3. SID INT ,
  4. SNAME STRING,
  5. SEX STRING,
  6. PRIMARY KEY(SID) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'jdbc',
  9. 'url' = 'jdbc:mysql://localhost:3306/test_cdc',
  10. 'username' = 'root',
  11. 'password' = 'root123',
  12. 'table-name' = 'user'
  13. );
  14. Flink SQL> insert into mysql_user select SID,SNAME,SEX from student_info;
  15. [INFO] Submitting SQL update statement to the cluster...
  16. [INFO] SQL update statement has been successfully submitted to the cluster:
  17. Job ID: 1966fdd63bb36c14908fe8e31408db58

完成以上配置;Flink connector Oracle CDC实时数据同步到MySQL的操作就完成了;后续可以自行测试一下。

如果有主键的话会自动进行merge。 

更多文章请扫码关注公众号,有问题的小伙伴也可以在公众号上提出。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/797074
推荐阅读
相关标签
  

闽ICP备14008679号