赞
踩
1、下载flink-1.18.1-bin-scala_2.12.tgz,linux通过:
wget https://archive.apache.org/dist/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
2、oracle11g客户端安装,下载:
instantclient-basic-linux.x64-11.2.0.4.0.zip
instantclient-sdk-linux.x64-11.2.0.4.0.zip
instantclient-sqlplus-linux.x64-11.2.0.4.0.zip
以上文件,在ORACLE网站下载。
3、配置oracle客户端:
- [root@wn1 ~]# ls /usr/local/
- [root@wn1 ~]# cp instantclient-* /usr/local
- [root@wn1 ~]# cd /usr/local
- [root@wn1 ~]# unzip instantclient-basic-linux.x64-11.2.0.4.0.zip
- [root@wn1 ~]# unzip instantclient-sdk-linux.x64-11.2.0.4.0.zip
- [root@wn1 ~]# unzip instantclient-sqlplus-linux.x64-11.2.0.4.0.zip
- [root@wn1 ~]# mv instantclient_11_2 oracle_11
- [root@wn1 ~]# rm instantclient-*
- [root@wn1 ~]# vi /etc/profile
- #增加以下内容
- export ORACLE_HOME=/usr/local/oracle_11
- export PATH=.:${PATH}:$ORACLE_HOME
- export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ORACLE_HOME
-
- #保存退出,执行
- [root@wn1 ~]# source /etc/profile
-
- #修改ld配置
- [root@wn1 ~]# vi /etc/ld.so.conf.d/oracle.conf
- #写入内容
- /usr/local/oracle_11
- #保存退出,执行
- [root@wn1 ~]#ldconfig
- #配置oracle连接参数
- [root@wn1 ~]# mkdir -p network/admin/
- [root@wn1 ~]# cd network/admin/
- #找一个tnsnames.ora文件,直接上传到服务器
- [root@wn1 ~]# cp /root/tnsnames.ora ./
-
- #测试连接
- [root@wn1 ~]# sqlplus sys/manager@//192.168.56.1/orcl
- SQL*Plus: Release 11.2.0.4.0 Production on Sun Mar 24 18:04:15 2024
-
- Copyright (c) 1982, 2013, Oracle. All rights reserved.
-
-
- Connected to:
- Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production
- With the Partitioning, OLAP, Data Mining and Real Application Testing options
-
- SQL>
-
-
4、配置oracle数据库,启用归档日志,这步需要参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/connectors/legacy-flink-cdc-sources/oracle-cdc/
5、下载oracle cdc 连接器
wget https://maven.aliyun.com/repository/public/com/ververica/flink-sql-connector-oracle-cdc/3.0.1/flink-sql-connector-oracle-cdc-3.0.1.jar
解压:
tar zxvf flink-1.18.1-bin-scala_2.12.tgz
将flink-sql-connector-oracle-cdc-3.0.1.jar复制到flink-1.18.1/lib目录中
6、下载 flink-connector-jdbc-3.1.1-1.17.jar,postgresql-42.7.3.jar
https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
将jar包复制到flink-1.18.1/lib目录中
7、安装postgresql就不说了,相信你已经有了数据库了
8、修改Flink的配置文件 /home/flink/flink-1.18.1/conf/flink-conf.yaml ,主要是各种服务的绑定地址,默认为localhost,统统改为0.0.0.0,如:rest.address: 0.0.0.0 #localhost
9、启动
- [flink@cn1 bin]$ ./start-cluster.sh
- [flink@cn1 bin]$ ./sql-client.sh
- Flink SQL>
- #创建ORACLE源表
-
- SET execution.checkpointing.interval = 3s;
-
- create table SYS_DIC_DEPT
- (
- DEPT_CODE STRING,
- DEPT_NAME STRING,
- DEPT_ADDR STRING,
- DEPT_MEMO STRING,
- DEPT_FLAG STRING,
- DEPT_GZYZLTJFLAG STRING,
- DEPT_UPPER STRING,
- PRIMARY KEY (DEPT_CODE) NOT ENFORCED
- )
- WITH (
- 'connector' = 'oracle-cdc',
- 'hostname' = '192.168.56.1',
- 'port' = '1521',
- 'username' = 'username',
- 'password' = '123456',
- 'database-name' = 'ORCL',
- 'schema-name' = 'schema-name',
- 'table-name' = 'table-name',
- 'debezium.log.mining.strategy'='online_catalog',
- 'debezium.log.mining.continuos.mine'='true',
- 'debezium.snapshot.mode' = 'initial',
- 'debezium.database.tablename.case.insensitive'='true'
- );
-
- Flink SQL> select * from SYS_DIC_DEPT;
如果看不到数据,请检查ORACLE的字段是否全部大写
10、创建PG Sink:
- Flink SQL>
- create table sys_dic_dept_sink
- (
- dept_code STRING,
- dept_name STRING,
- dept_addr STRING,
- dept_memo STRING,
- dept_flag STRING,
- dept_gzyzltjflag STRING,
- dept_upper STRING,
- PRIMARY KEY (dept_code) NOT ENFORCED
- )
- with(
- 'connector' = 'jdbc',
- 'url' = 'jdbc:postgresql://192.168.56.90:5432/postgres?currentSchema=public',
- 'username' = 'postgres',
- 'password' = '123456',
- 'table-name' = 'sys_dic_dept'
- );
11、抽数据
Flink SQL> insert into sys_dic_dept_sink select * from SYS_DIC_DEPT;
12、查看任务执行 http://192.168.56.90:8081/#/job/running
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。