赞
踩
在这一步需要配置Oracle。主要包含。
注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。
下面开始配置步骤。在安装Oracle的机器上执行:
- su - oracle
- sqlplus / as sysdba
进入Sqlplus。然后开启Archive log。
- alter system set db_recovery_file_dest_size = 10G;
- alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
- shutdown immediate;
- startup mount;
- alter database archivelog;
- alter database open;
- # 检查Archive log是否成功开启
- archive log list;
注意:'/opt/oracle/oradata/recovery_area' 这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)
/opt/oracle/oradata/recovery_area
目录oracle用户需要有读写权限。alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
的时候报ORA-32001: write to SPFILE requested but no SPFILE is in use。需要检查spfile文件。- show parameter spfile;
- # 如果输出value为空,说明没有创建spfile,执行下面SQL创建
- create spfile from pfile;
- # 关闭并重启
- shutdown immediate;
- startup;
- # 检查spfile是否成功创建
- show parameter spfile;
接下来进行相关重要的配置:
- -- 检查日志归档是否开启
- archive log list;
-
- -- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。
- ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
-
- - 创建表空间
- CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
-
- -- 创建用户family绑定表空间LOGMINER_TBS
- CREATE USER C##family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
-
- -- 授予family用户dba的权限
- grant connect,resource,dba to C##family;
-
- - 并授予权限
- GRANT CREATE SESSION TO C##family;
- GRANT SELECT ON V_$DATABASE to C##family;
- GRANT FLASHBACK ANY TABLE TO C##family;
- GRANT SELECT ANY TABLE TO C##family;
- GRANT SELECT_CATALOG_ROLE TO C##family;
- GRANT EXECUTE_CATALOG_ROLE TO C##family;
- GRANT SELECT ANY TRANSACTION TO C##family;
- GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##family;
- GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
- GRANT CREATE TABLE TO C##family;
- GRANT LOCK ANY TABLE TO C##family;
- GRANT ALTER ANY TABLE TO C##family;
- GRANT CREATE SEQUENCE TO C##family;
-
- GRANT EXECUTE ON DBMS_LOGMNR TO C##family;
- GRANT EXECUTE ON DBMS_LOGMNR_D TO C##family;
-
- GRANT SELECT ON V_$LOG TO C##family;
- GRANT SELECT ON V_$LOG_HISTORY TO C##family;
- GRANT SELECT ON V_$LOGMNR_LOGS TO C##family;
- GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;
- GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C##family;
- GRANT SELECT ON V_$LOGFILE TO C##family;
- GRANT SELECT ON V_$ARCHIVED_LOG TO C##family;
- GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C##family;
本地用Navicate连接Oracle:
- -- 创建 STUDENT_INFO 表
- create table student_info (
- sid number(10) constraint pk_sid primary key,
- sname varchar2(10),
- sex varchar2(2)
- );
-
- -- 修改STUDENT_INFO表让其支持增量日志,先在Oracle里创建STUDENT_INFO表再执行下面的语句
- ALTER TABLE C##FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-
至此,Oracle配置完成;
Flink CDC 配置:
这里用Flink SQL CLI 做演示:
使用以下命令切换到 Flink 目录
cd flink-1.14.0
使用以下命令启动 Flink SQL CLI:
./bin/sql-client.sh
我们应该看到 CLI 客户端的欢迎屏幕。
在 Flink SQL CLI 中使用 Flink DDL 创建表
创建从相应数据库表中捕获更改数据的表。
Oracle CDC 表可以定义如下:
- -- register an Oracle table 'student_info' in Flink SQL
- Flink SQL> CREATE TABLE student_info (
- SID INT NOT NULL,
- SNAME STRING,
- SEX STRING,
- PRIMARY KEY(SID) NOT ENFORCED
- ) WITH (
- 'connector' = 'oracle-cdc',
- 'hostname' = 'localhost',
- 'port' = '1521',
- 'username' = 'C##family',
- 'password' = 'zyhcdc',
- 'database-name' = 'ORCLCDB',
- 'schema-name' = 'C##FAMILY',
- 'table-name' = 'STUDENT_INFO');
-
- -- read snapshot and binlogs from products table
- Flink SQL> SELECT * FROM student_info;
在定义一个数据流出到MySQL的表:
- -- register an Oracle table 'mysql_user' in Flink SQL
- Flink SQL> CREATE TABLE mysql_user (
- SID INT ,
- SNAME STRING,
- SEX STRING,
- PRIMARY KEY(SID) NOT ENFORCED
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://localhost:3306/test_cdc',
- 'username' = 'root',
- 'password' = 'root123',
- 'table-name' = 'user'
- );
-
- Flink SQL> insert into mysql_user select SID,SNAME,SEX from student_info;
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] SQL update statement has been successfully submitted to the cluster:
- Job ID: 1966fdd63bb36c14908fe8e31408db58
完成以上配置;Flink connector Oracle CDC实时数据同步到MySQL的操作就完成了;后续可以自行测试一下。
如果有主键的话会自动进行merge。
更多文章请扫码关注公众号,有问题的小伙伴也可以在公众号上提出。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。