当前位置:   article > 正文

flink-cdc安装使用教程(java)_flink cdc安装

flink cdc安装

Flink安装使用教程

1.安装

  • 将flink-1.14.4.tar.gz上传到需要安装的路径、解压(flink-1.14.4.tar.gz 自行在官网下载相应的压缩包)

  • 解压后,将lib文件夹的jar包上传到解压后的路径的lib下

  • 修改配置文件


    在这里插入图片描述

    具体的修改如下:

jobmanager.memory.process.size: 2600m


# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

taskmanager.memory.process.size: 2728m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
# taskmanager.memory.flink.size: 1280m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
#槽的个数
taskmanager.numberOfTaskSlots: 10

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#作用是使用文件存储checkpoint,而不是内存,避免某些时刻checkpoint失败
state.checkpoints.dir: file:///opt/flink/checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#开启增量监听
state.backend.incremental: true

# submit jars dir
#使用java代码模型运行flink sql的打包后的代码的上传存储位置
web.upload.dir: /opt/flink/jars
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 启动flink

    ##cd 到flink安装目录下的bin路径,直接执行
    ./start-cluster.sh
    
    • 1
    • 2

2.上传

安装成功后,就可以在浏览器中输入:http://ip:8081来访问flink控制台

  • 上传jar包

    将我们提供的jar包通过flink控制台上传

    选择左边的Submit New Job选项,然后点击Add New按钮上传flink-cdc-java.jar
    在这里插入图片描述

3.使用

上传所需的jar后,就可以通过自己界面或者postman新建作业任务来运行flink job,其实就是通过flink cdc的接口,将指定的jar 和 flink sql 结合去跑自己的作业。

(附上flink1.14.4的官方接口文档:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/ops/rest_api/#api)

添加作业名称和监听代码,提示代码如下:

oralce同步到mysql代码:

CREATE TABLE `FLINK_TEST_DATA_SRC` ( 
  ID STRING NOT NULL, 
  NAME STRING, 
  ADDR STRING, 
  AGE INT, 
  PRIMARY KEY (ID) NOT ENFORCED
) WITH ( 
  'connector' = 'oracle-cdc', 
  'hostname' = '*.*.*.*', 
  'username' = 'root', 
  'password' = '****', 
  'database-name' = 'orcl', 
  'schema-name' = 'flink_test',
  'table-name' = 'FLINK_TEST_DATA'
);

CREATE TABLE `FLINK_TEST_DATA_DST` ( 
  `id` STRING NOT NULL, 
  `name` STRING, 
  `addr` STRING, 
  `age` INT, 
  PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc', 
  'driver' = 'com.mysql.cj.jdbc.Driver', 
  'url' = 'jdbc:mysql://*.*.*.*:3306/flink_test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf-8&createDatabaseIfNotExist=true&useSSL=false&nullCatalogMeansCurrent=true&allowPublicKeyRetrieval=true', 
  'username' = 'root', 
  'password' = '*****', 
  'table-name' = 'flink_test_data'
);

insert into FLINK_TEST_DATA_DST select ID,NAME,ADDR,AGE from FLINK_TEST_DATA_SRC;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

mysql同步到oralce代码:

CREATE TABLE `FLINK_TEST_DATA_SRC` ( 
  ID STRING NOT NULL, 
  NAME STRING, 
  ADDR STRING, 
  AGE INT, 
  PRIMARY KEY (ID) NOT ENFORCED
) WITH ( 
  'connector' = 'mysql-cdc', 
  'hostname' = '*.*.*.*', 
  'username' = 'root', 
  'password' = '*****', 
  'database-name' = 'flink_test', 
  'table-name' = 'flink_test_data'
);

CREATE TABLE FLINK_TEST_DATA_DST ( 
  ID STRING NOT NULL, 
  NAME STRING, 
  ADDR STRING, 
  AGE INT, 
  PRIMARY KEY (ID) NOT ENFORCED
) WITH (
  'connector' = 'jdbc', 
  'driver' = 'oracle.jdbc.driver.OracleDriver', 
  'url' = 'jdbc:oracle:thin:@*.*.*.*:1521:orcl', 
  'username' = 'root', 
  'password' = '*****', 
  'table-name' = 'FLINK_TEST_DATA2'
);

insert into FLINK_TEST_DATA_DST
select * from FLINK_TEST_DATA_SRC;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
注意点:
1.输入端需要使用cdc,也就是说输入的connector应该填写mysql-cdc或者oralce-cdc
2.但source为oralce-cdc时,需要增加一个属性schema-name,它的值为需要访问的数据库schema
3.CREATE TABLE中的字段建议都使用大写,因为在oralce中默认是大写的,不然可能会导致后台报错

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/381127
推荐阅读
相关标签
  

闽ICP备14008679号