当前位置:   article > 正文

最佳实践:MySQL CDC 同步数据到 ES_flink cdc mysql es

flink cdc mysql es

作者:于乐,腾讯 CSIG 工程师

一、 方案描述

1.1 概述

在线教育是一种利用大数据、人工智能等新型互联网技术与传统教育行业相结合的新型教育方式。发展在线教育可以更好的构建网络化、数字化、个性化、终生化的教育体系,有利于构建“人人皆学、处处能学、实时可学”的学习型社会。

本文针对某知名在线教育平台在腾讯云流计算 Oceanus 的业务案例,介绍了其中可能存在的一些性能问题,并针对这种问题进行了参数调优相关的介绍。

b41868e7edfd8b767b16adc93aabf637.png

1.2 方案架构

某知名在线教育平台在流计算 Oceanus 上主要有两个业务应用场景,其一:单表同步,使用 MySQL CDC 将 MySQL 数据取出存入  Elasticsearch;其二:双流 JOIN,两条 MySQL CDC 流 JOIN 后存入 Elasticsearch。本文主要针对这两种场景进行了一些实践,并指出可能存在的特殊场景以及参数调优思路。

3c811b564b0ffdc796bc20e31f443913.png

涉及产品列表:

  • 腾讯云流计算 Oceanus

  • 腾讯云 MySQL 数据库

  • 腾讯云 Elasticsearch

二、前置准备

2.1 创建私有网络 VPC

私有网络(VPC)是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建 Oceanus、CKafka、COS、ClickHouse 集群等服务时选择的网络建议选择同一个 VPC,网络才能互通,否则需要使用对等连接、NAT 网关、VPN 等方式打通网络。私有网络 VPC 创建步骤请参考 帮助文档 。

2.2 创建流计算 Oceanus 集群

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

进入 Oceanus 控制台,点击左侧【计算资源】,单击左上角【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等,VPC 及子网使用刚刚创建好的网络。创建完后 Oceanus 的集群如下:

0623780e1adc3a7fee149fab20c648a6.png

2.3 创建云数据库 MySQL

云数据库 MySQL(TencentDB for MySQL)是腾讯云基于开源数据库 MySQL 专业打造的高性能分布式数据存储服务,让用户能够在云中更轻松地设置、操作和扩展关系数据库。

  1. 登录 云数据库 TencentDB 控制台,单击【新建】,新建 MySQL 服务。注意网络选择需为上文创建的网络。 

  2. 创建完 MySQL 服务后,点击进入 MySQL 实例,单击上方【数据库管理】>【参数设置】,确保binlog_format = ROW以及binlog_row_image = FULL

34373b57f01c1ac578d864e8769c5846.png

  1. 检查完参数后,单击右上角【登录】进入数据库,创建数据库和表用户接收数据。

2.4 创建 Elasticsearch 实例

腾讯云 Elasticsearch Service(ES)是基于开源引擎打造的云端全托管 ELK 服务,集成 X-Pack 特性、独有高性能自研内核、QQ 分词、集群巡检、一键升级等优势能力,引入极致性价比的腾讯自研星星海服务器。助您轻松管理和运维集群,高效构建日志分析、运维监控、信息检索、数据分析等业务。

进入 Elasticsearch Service 控制台,单击左侧【Elasticsearch 集群】,点击左上角【新建】,注意选择之前创建好的私有网络和子网,并设置账户和密码,具体操作请参考 帮助文档 。

e3992419121f33d7e1934b0f7b4eed21.png

三、场景一:单表同步

本场景使用 MySQL CDC 将数据从云数据库 MySQL 中取出后存入 ES,中间并无复杂的业务逻辑的计算。

3.1 Source 端参数配置

  1. -- Source 端配置,从云数据库 MySQL 读取数据
  2. 'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
  3. 'hostname' = 'xx.xx.xx.xx', -- 数据库的 IP
  4. 'port' = '3306', -- 数据库的访问端口
  5. 'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASESREPLICATION SLAVEREPLICATION CLIENTSELECTRELOAD 权限)
  6. 'password' = 'xxxxxxxxxxxxxx', -- 数据库访问的密码
  7. 'database-name' = 'xxxx', -- 需要同步的数据库
  8. 'table-name' = 'xxxx' -- 需要同步的数据表名

Source 端重要参数或可选参数说明  

  • scan.incremental.snapshot.enabled:  增量快照。开启此参数必须保证 MySQL 表有主键,开启此配置后可以开启多并行度读取 MySQL 数据,相对应的还需调大【作业参数】里面的【算子默认并行度】,并配置server-id为一个范围,例如 5100-5110,则可以开启 10 个并行度同时读取 MySQL 表的数据。此参数默认开启。

  • server-id:  server-id在任务启动时会注册到 MySQL 服务端,相同 MySQL 实例内不允许 server-id 重复,因此不同任务里配置的 CREATE TABLE 不管表是相同还是不同,都必须要有不同的 server-id,server-id 建议配置一个范围,例如 5100-5110,这个范围可以支持任务扩展到 10 个并发。

  • server-time-zone:  例如Asia/Shanghai,该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型。

  • debezium.database.history.store.only.monitored.tables.ddl:  如果 MySQL 实例表非常多并且经常发生 schema 变更,那么增加这个配置项可以减少状态大小,注意正则模糊匹配表的场景不要使用这个配置项。

特殊场景优化  

  1. 如果MySQL CDC 同步的表数量较大(千万或亿级),建议:  (1) 增加全量同步时的并发度,亿级推荐 10 以上。  (2) 需要设置 scan.incremental.snapshot.chunk.size 到更大的值,例如 1 亿数据量推荐设置为 30000。

  2. 如果使用正则模糊匹配多表时,建议增加 JobManager 的 CU 数到 2CU。

3.2 Sink 端参数配置

  1. -- Sink 端配置,将数据写入 ES
  2. 'connector' = 'elasticsearch-7', -- 输出到 Elasticsearch 7
  3. 'username' = 'elastic', -- 选填 用户名
  4. 'password' = 'xxxxxxxx', -- 选填 密码
  5. 'hosts' = 'http://xx.xx.xx.xx:9200', -- Elasticsearch 的连接地址
  6. 'index' = 'xxxxxxxx', -- Elasticsearch 的 Index 名
  7. 'document-id.key-delimiter' = '$', -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
  8. 'failure-handler' = 'ignore', -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)
  9. 'sink.flush-on-checkpoint' = 'true', -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
  10. 'sink.bulk-flush.max-actions' = '42', -- 可选参数, 每批次最多的条数
  11. 'sink.bulk-flush.max-size' = '42 mb', -- 可选参数, 每批次的累计最大大小 (只支持 mb)
  12. 'sink.bulk-flush.interval' = '1000', -- 可选参数, 批量写入的间隔 (ms)
  13. 'connection.max-retry-timeout' = '300', -- 每次请求的最大超时时间 (ms)
  14. 'format' = 'json' -- 输出数据格式, 目前只支持 'json'

Sink 端重要参数或可选参数说明  

  • sink.flush-on-checkpoint:  Flink 进行快照时,是否等待现有记录完全写入 Elasticsearch 。如果设置为 false,则可能造成恢复时部分数据丢失或者重复等异常情况,但快照速度会提升。默认为 true

  • sink.bulk-flush.max-actions:  批量写入的最大条数。设置为 0 则禁用批量功能。默认为1000 。 

  • sink.bulk-flush.max-size:  批量写入缓存的最大容量,必须以 mb 为单位。设置为 0 则禁用批量功能。默认为2mb

  • sink.bulk-flush.interval:批量写入的刷新周期。设置为 0 则禁用批量功能。默认为1s。 

性能测试  Source 端数据读取量大导致 Sink 端写入繁忙,从而引起反压问题导致作业不断重启,建议:适当调整sink.bulk-flush.max-actionssink.bulk-flush.max-sizesink.bulk-flush.interval这三个参数的值。默认情况下:

 sink.bulk-flush.max-actions = 1000  

sink.bulk-flush.max-size = 2mb  

sink.bulk-flush.interval = 1000ms    

针对 MySQL CDC 单表写入到 ES 这种场景,我们对sink.bulk-flush.interval参数做了简单的性能测试,包括吞吐量和写入 ES 延时。用户可以参考这些数据,根据自己实际业务情况做相应的调参。

测试条件:  

  • Elasticsearch:16核64G, 3个20GB SSD云硬盘 x 1

  • MySQL 版本:MySQL 5.7

  • 数据流入量级:每秒 40000 条

  • 作业中的参数sink.bulk-flush.max-actionssink.bulk-flush.max-size保持默认值

测试结果:  

并行度为 1:

daae67a6a7c39562d01cfd82fa50f9be.png

并行度为 4:

e6a2df2de765c070a178c7a998201974.png

结论:

  • sink.bulk-flush.max-actions的默认值是1000条,sink.bulk-flush.max-size的默认值是2mbsink.bulk-flush.interval默认1s,这三个满足满足任意一条即触发 ES 写操作。

  • 如果作业流量比较大,sink.bulk-flush.max-actions或者sink.bulk-flush.max-size会优先触发,此时调大sink.bulk-flush.interval的值,对写入 ES 的吞吐和延迟几乎没有影响。

  • 如果作业流量比较小,sink.bulk-flush.interval会优先触发,此时sink.bulk-flush.interval的大小会影响写入 ES 的延迟。

  • ES 的 refresh interval 参数影响数据在 ES 集群中可以查询的时延,比如 Kibana 可以查看到数据的时延在0~sink.bulk-flush.interval(如果bulk-flush优先触发)+ ES refresh interval区间。

  • sink.bulk-flush.interval不建议小于 500ms

  • 单并行度的时候,通过调节参数,对 ES 的吞吐量不再有影响的时候,增大作业的并行度后,ES 的吞吐有明显的提升。

建议:

  • sink.bulk-flush.max-actionssink.bulk-flush.max-size通常保持默认值即可,这两个条件满足其一,数据就会提交写入请求。

  • sink.bulk-flush.interval:定时提交数据的间隔,设置太小,ES 的吞吐量会变小,建议改值不小于500ms

  • sink.bulk-flush.max-actions = 1该设置表示每来一条数据就立即写入 ES,这会导致整个作业的吞吐降低,以本次的测试场景,吞吐最高只能到达 400条/秒,同时写入 ES 的平均延迟会增大到 1.5s,不建议该值设置太小。

  • 当作业中的参数对吞吐的影响极小的时候,可以通过增大作业的并行度来提高写入 ES 的吞吐。

  • 当并行度提高到一定的程度的时候,ES 的吞吐不再有变,此时可以检查一下 ES 的的写入性能。

四、场景二:双流 JOIN

这里总结一下双流 JOIN 常见的问题以及优化点。

SET 语句

  1. (一定要提前设置,避免状态不兼容导致无法恢复)如果存在多流 JOIN 或 GROUP BY,需要设置 SQL 状态保留时间(TTL),例如 :  SET execution.min-idle-state-retention = '5 h'  SET execution.max-idle-state-retention = '6 h'

  2. 如果快照时间较久(例如上述 JOIN 场景),需要调大快照过期时间到至少 30 min,例如:  SET CHECKPOINT_TIMEOUT = '30 min'

  3. 建议启用 Mini-Batch 功能以获取更好的性能(可能会导致较大延迟,请谨慎设置):  SET table.exec.mini-batch.enabled = true  SET table.exec.mini-batch.size = 5000  SET table.exec.mini-batch.allow-latency = 200 ms  

高级参数

  1. 如果作业快照较大,建议延长快照周期为至少 30 min

  2. 设置增量快照:  state.backend.incremental: true

  3. 建议调大作业重启阈值:  restart-strategy.fixed-delay.attempts: 10000

  4. 建议调大快照失败的容忍次数:  execution.checkpointing.tolerable-failed-checkpoints: 5

  5. 如果事件面板经常出现 TaskManager 异常退出事件,且错误信息包含 OOMKilled,则可以尝试设置如下参数,且提高 TaskManager 的规格为 2CU 或提升并行度:  taskmanager.memory.jvm-overhead.fraction: 0.3

总结

本文分析了某知名在线教育平台在流计算 Oceanus 上的两种业务场景:MySQL 单表同步到 Elasticsearch;两条 MySQL CDC 流 Regular JOIN。此外,本文还对两种场景的一些重要参数或可选参数进行了讲解,并对可能存在的特殊场景提供了参数调优思路。更多 Oceanus 最佳实践以及入门指引参见我们的 专栏文章,最后欢迎大家猛戳 一元购 试用 Oceanus,机不可失时不再来:)

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号