当前位置:   article > 正文

FLINK CDC近期使用小结_flink cdc安装

flink cdc安装

1、Flink cdc是什么?

        CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术.

2、Flink cdc 安装部署

2.1、安装包要求

  1. flink版本: flink-1.13.0-bin-scala_2.11.tgz
  2. java版本:jdk-8u271-linux-x64.tar.gz
  3. mysql版本:mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz

2.2、 部署安装

2.2.1、安装过程

  1. 1、解压flink cdc安装包
  2. tar -zxvf flink-1.13.0-bin-scala_2.11.tgz
  3. 2、安装Java以及Mysql
  4. 百度自行解决,注意配置环境变量⚠️
  5. 3、flink cdc中java配置
  6. cd flink-1.13.0
  7. vim conf/flink-conf.yaml
  8. 添加配置:env.java.home=/usr/local/java/jdk1.8.0_271 ---- 版本号需要对应
  9. 4、上传驱动包,放在flink1.13.0/lib 目录下
  10. 注意⚠️:以上过程仅使用mysql数据源,若涉及其他数据源需要对应的驱动包
  11. flink-connector-jdbc_2.11-1.13.6.jar
  12. flink-sql-connector-mysql-cdc-2.1.0.jar
  13. mysql-connector-java-8.0.27.jar
  14. 5、启动服务
  15. 注意⚠️:服务启动后可通过8081端口查看是否正常
  16. /bin/start-cluster.sh -- 启动集群,如果安装包或者配置改变,需要重启服务
  17. /bin/sql-client.sh --启动SQL客户端

2.2.2、注意事项

  1. 1、数据库用户权限,最好不用root角色,但是需要相应的权限。如果发生权限报错,可百度或者评论问我
  2. 2、数据库需要开启bin-log,并且是row模式
  3. 3、各个软件包版本、驱动版本对应关系,否则会启动报错
  4. 4、java版本需要在1.7以上,mysql版本测试适配是5.7

2.2.3、flink cdc 配置

进入/conf       

vim  flink-conf

  1. #任务存储,建议调大一点
  2. jobmanager.memory.process.size: 3200m
  3. #任务存储,建议调大一点
  4. taskmanager.memory.process.size: 3200m
  5. #任务槽数:不同任务所需槽,根据实际情况配置
  6. #默认是1 - 如果配置1,导致无法执行2个或者2个以上任务
  7. #看一下电脑cpu的核数,建议调成内核数的12倍,有利于cpu发挥最大效能
  8. taskmanager.numberOfTaskSlots: 50
  9. #任务并行度:涉及到读取效率,默认是1
  10. parallelism.default: 5
  11. #checkponit、savepoint存储方式,这种能降低内存使用,面临大数据的问题;做测试用默认就可以
  12. #注意:需要在/home/admin 下建立 /state/ckp /state/savepoint的两个文件夹
  13. state.backend: rocksdb
  14. state.checkpoints.dir: file:///home/admin/state/ckp
  15. state.savepoints.dir: file:///home/admin/state/savepoint

3、读写测试

3.1、建Mysql表

  1. 首先需要在mysql中建立相应的映射表
  2. 如:static_pay_order_his
  3. CREATE TABLE `static_pay_order_his` (
  4. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  5. `all_amount` bigint(20) comment '历史成交总金额',
  6. `all_count` bigint(20) comment '历史成交总数',
  7. PRIMARY KEY (`id`)
  8. ) ENGINE=InnoDB AUTO_INCREMENT=106 DEFAULT CHARSET=utf8 COMMENT='支付平台统计值 --- 历史'
  9. 建立:pay_order
  10. CREATE TABLE `pay_order` (
  11. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  12. PRIMARY KEY (`id`)
  13. ) ENGINE=InnoDB AUTO_INCREMENT=106 DEFAULT CHARSET=utf8 COMMENT='支付订单'

3.2、建cdc映射表

  1. #设置ck快照时间,时间越长越能降低存储消耗
  2. SET execution.checkpointing.interval = 3000s;
  3. #建立表单表
  4. CREATE TABLE pay_order (
  5. id bigint NOT NULL,
  6. amount bigint,
  7. state STRING,
  8. isv_no STRING,
  9. create_time timestamp,
  10. update_time timestamp,
  11. PRIMARY KEY (`id`) NOT ENFORCED
  12. ) WITH (
  13. 'scan.incremental.snapshot.enabled' = 'true', ---如果配置了false,则会锁表
  14. 'connector' = 'mysql-cdc', ---cdc是source,读表
  15. 'hostname' = 'localhost',
  16. 'port' = '3306',
  17. 'username' = '',
  18. 'password' = '',
  19. 'database-name' = '',
  20. 'table-name' = '',
  21. 'server-id' = '5502'
  22. );
  23. ---交易总金额
  24. CREATE TABLE static_pay_order_his
  25. (
  26. id bigint,
  27. all_amount bigint,
  28. all_count bigint,
  29. PRIMARY KEY (`id`) NOT ENFORCED
  30. )
  31. WITH
  32. (
  33. 'connector' = 'jdbc', --- jdbc则是sink,写表
  34. 'url' = '',
  35. 'username' = '',
  36. 'password' = '',
  37. 'table-name' = 'static_pay_order_his'
  38. );
  39. ---写sink表
  40. insert into static_pay_order_his (id, all_amount, all_count)
  41. select
  42. cast(replace(cast(current_date as string), '-', '') as bigint) as id,
  43. sum(amount) as all_amount,
  44. count(*) as all_count
  45. from pay_order
  46. where isv_no='62f9e7d9e4b0cb4057de15ae'
  47. and cast(create_time as string) >= '2023-06-01'
  48. ;

3.3、运行状态查看

 4、应用场景

1、实时大屏(双11大屏)

2、数据湖 & 实时数仓(olap)

5、优化思路:

  1. 1、看一下电脑cpu的核数,建议调成内核数的12倍,有利于cpu发挥最大效能
  2. taskmanager.numberOfTaskSlots: cpu内核数*(1或者2)的系数
  3. 比如8内核cpu,taskmanager.numberOfTaskSlots 可以设置为8或者16
  4. 2、并行度影响slot消耗
  5. 3、minibatch优化
  6. 如果数据量较大且造成了一定的吞吐压力,可以调整minbatch参数,实现在满足设置的数据数量或者延迟时间后统一处理,降低资源消耗;但是会造成一定的延时,需要折中
  7. configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
  8. configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
  9. configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/381091
推荐阅读
相关标签
  

闽ICP备14008679号