赞
踩
将flink-1.14.4.tar.gz上传到需要安装的路径、解压(flink-1.14.4.tar.gz 自行在官网下载相应的压缩包)
解压后,将lib文件夹的jar包上传到解压后的路径的lib下
修改配置文件
具体的修改如下:
jobmanager.memory.process.size: 2600m # The total process memory size for the TaskManager. # # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. taskmanager.memory.process.size: 2728m # To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. # It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. # # taskmanager.memory.flink.size: 1280m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. #槽的个数 taskmanager.numberOfTaskSlots: 10 # Directory for checkpoints filesystem, when using any of the default bundled # state backends. #作用是使用文件存储checkpoint,而不是内存,避免某些时刻checkpoint失败 state.checkpoints.dir: file:///opt/flink/checkpoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). #开启增量监听 state.backend.incremental: true # submit jars dir #使用java代码模型运行flink sql的打包后的代码的上传存储位置 web.upload.dir: /opt/flink/jars
启动flink
##cd 到flink安装目录下的bin路径,直接执行
./start-cluster.sh
安装成功后,就可以在浏览器中输入:http://ip:8081来访问flink控制台
上传jar包
将我们提供的jar包通过flink控制台上传
选择左边的Submit New Job选项,然后点击Add New按钮上传flink-cdc-java.jar
上传所需的jar后,就可以通过自己界面或者postman新建作业任务来运行flink job,其实就是通过flink cdc的接口,将指定的jar 和 flink sql 结合去跑自己的作业。
(附上flink1.14.4的官方接口文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/ops/rest_api/#api)
添加作业名称和监听代码,提示代码如下:
oralce同步到mysql代码:
CREATE TABLE `FLINK_TEST_DATA_SRC` ( ID STRING NOT NULL, NAME STRING, ADDR STRING, AGE INT, PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '*.*.*.*', 'username' = 'root', 'password' = '****', 'database-name' = 'orcl', 'schema-name' = 'flink_test', 'table-name' = 'FLINK_TEST_DATA' ); CREATE TABLE `FLINK_TEST_DATA_DST` ( `id` STRING NOT NULL, `name` STRING, `addr` STRING, `age` INT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver' = 'com.mysql.cj.jdbc.Driver', 'url' = 'jdbc:mysql://*.*.*.*:3306/flink_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&createDatabaseIfNotExist=true&useSSL=false&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true', 'username' = 'root', 'password' = '*****', 'table-name' = 'flink_test_data' ); insert into FLINK_TEST_DATA_DST select ID,NAME,ADDR,AGE from FLINK_TEST_DATA_SRC;
mysql同步到oralce代码:
CREATE TABLE `FLINK_TEST_DATA_SRC` ( ID STRING NOT NULL, NAME STRING, ADDR STRING, AGE INT, PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '*.*.*.*', 'username' = 'root', 'password' = '*****', 'database-name' = 'flink_test', 'table-name' = 'flink_test_data' ); CREATE TABLE FLINK_TEST_DATA_DST ( ID STRING NOT NULL, NAME STRING, ADDR STRING, AGE INT, PRIMARY KEY (ID) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'driver' = 'oracle.jdbc.driver.OracleDriver', 'url' = 'jdbc:oracle:thin:@*.*.*.*:1521:orcl', 'username' = 'root', 'password' = '*****', 'table-name' = 'FLINK_TEST_DATA2' ); insert into FLINK_TEST_DATA_DST select * from FLINK_TEST_DATA_SRC;
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。