赞
踩
CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术.
- flink版本: flink-1.13.0-bin-scala_2.11.tgz
- java版本:jdk-8u271-linux-x64.tar.gz
- mysql版本:mysql-5.7.40-linux-glibc2.12-x86_64.tar.gz
- 1、解压flink cdc安装包
- tar -zxvf flink-1.13.0-bin-scala_2.11.tgz
-
- 2、安装Java以及Mysql
- 百度自行解决,注意配置环境变量⚠️
-
- 3、flink cdc中java配置
- cd flink-1.13.0
- vim conf/flink-conf.yaml
- 添加配置:env.java.home=/usr/local/java/jdk1.8.0_271 ---- 版本号需要对应
-
- 4、上传驱动包,放在flink1.13.0/lib 目录下
- 注意⚠️:以上过程仅使用mysql数据源,若涉及其他数据源需要对应的驱动包
-
- flink-connector-jdbc_2.11-1.13.6.jar
- flink-sql-connector-mysql-cdc-2.1.0.jar
- mysql-connector-java-8.0.27.jar
-
- 5、启动服务
- 注意⚠️:服务启动后可通过8081端口查看是否正常
-
- /bin/start-cluster.sh -- 启动集群,如果安装包或者配置改变,需要重启服务
- /bin/sql-client.sh --启动SQL客户端
- 1、数据库用户权限,最好不用root角色,但是需要相应的权限。如果发生权限报错,可百度或者评论问我
-
- 2、数据库需要开启bin-log,并且是row模式
-
- 3、各个软件包版本、驱动版本对应关系,否则会启动报错
-
- 4、java版本需要在1.7以上,mysql版本测试适配是5.7
进入/conf
vim flink-conf
- #任务存储,建议调大一点
- jobmanager.memory.process.size: 3200m
-
- #任务存储,建议调大一点
- taskmanager.memory.process.size: 3200m
-
-
- #任务槽数:不同任务所需槽,根据实际情况配置
- #默认是1 - 如果配置1,导致无法执行2个或者2个以上任务
- #看一下电脑cpu的核数,建议调成内核数的1~2倍,有利于cpu发挥最大效能
-
- taskmanager.numberOfTaskSlots: 50
-
-
- #任务并行度:涉及到读取效率,默认是1
- parallelism.default: 5
-
-
- #checkponit、savepoint存储方式,这种能降低内存使用,面临大数据的问题;做测试用默认就可以
- #注意:需要在/home/admin 下建立 /state/ckp /state/savepoint的两个文件夹
- state.backend: rocksdb
- state.checkpoints.dir: file:///home/admin/state/ckp
- state.savepoints.dir: file:///home/admin/state/savepoint
- 首先需要在mysql中建立相应的映射表
- 如:static_pay_order_his
-
- CREATE TABLE `static_pay_order_his` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
- `all_amount` bigint(20) comment '历史成交总金额',
- `all_count` bigint(20) comment '历史成交总数',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=106 DEFAULT CHARSET=utf8 COMMENT='支付平台统计值 --- 历史'
-
- 建立:pay_order表
- CREATE TABLE `pay_order` (
- `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=106 DEFAULT CHARSET=utf8 COMMENT='支付订单'
- #设置ck快照时间,时间越长越能降低存储消耗
- SET execution.checkpointing.interval = 3000s;
-
- #建立表单表
- CREATE TABLE pay_order (
- id bigint NOT NULL,
- amount bigint,
- state STRING,
- isv_no STRING,
- create_time timestamp,
- update_time timestamp,
- PRIMARY KEY (`id`) NOT ENFORCED
- ) WITH (
- 'scan.incremental.snapshot.enabled' = 'true', ---如果配置了false,则会锁表
- 'connector' = 'mysql-cdc', ---cdc是source,读表
- 'hostname' = 'localhost',
- 'port' = '3306',
- 'username' = '',
- 'password' = '',
- 'database-name' = '',
- 'table-name' = '',
- 'server-id' = '5502'
- );
-
- ---交易总金额
- CREATE TABLE static_pay_order_his
- (
- id bigint,
- all_amount bigint,
- all_count bigint,
- PRIMARY KEY (`id`) NOT ENFORCED
- )
- WITH
- (
- 'connector' = 'jdbc', --- jdbc则是sink,写表
- 'url' = '',
- 'username' = '',
- 'password' = '',
- 'table-name' = 'static_pay_order_his'
- );
-
-
- ---写sink表
- insert into static_pay_order_his (id, all_amount, all_count)
- select
- cast(replace(cast(current_date as string), '-', '') as bigint) as id,
- sum(amount) as all_amount,
- count(*) as all_count
- from pay_order
- where isv_no='62f9e7d9e4b0cb4057de15ae'
- and cast(create_time as string) >= '2023-06-01'
- ;
1、实时大屏(双11大屏)
2、数据湖 & 实时数仓(olap)
- 1、看一下电脑cpu的核数,建议调成内核数的1~2倍,有利于cpu发挥最大效能
- taskmanager.numberOfTaskSlots: cpu内核数*(1或者2)的系数
- 比如8内核cpu,taskmanager.numberOfTaskSlots 可以设置为8或者16
-
- 2、并行度影响slot消耗
-
- 3、minibatch优化
- 如果数据量较大且造成了一定的吞吐压力,可以调整minbatch参数,实现在满足设置的数据数量或者延迟时间后统一处理,降低资源消耗;但是会造成一定的延时,需要折中
- configuration.set("table.exec.mini-batch.enabled", "true"); // enable mini-batch optimization
-
- configuration.set("table.exec.mini-batch.allow-latency", "5 s"); // use 5 seconds to buffer input records
-
- configuration.set("table.exec.mini-batch.size", "5000"); // the maximum number of records can be buffered by each aggregate operator task
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。