当前位置:   article > 正文

FlinkDCD同步MySQL到StarRocks_centos7 flink 实现 mysql 的数据实时同步至 starrocks

centos7 flink 实现 mysql 的数据实时同步至 starrocks

下载并安装同步工具

  • 同步时需要使用 SMT、 Flink、Flink CDC connector、flink-starrocks-connector,下载和安装步骤如下:

下载、安装并启动 Flink 集群

  • 您需要提前在操作系统中安装 Java 8 或者 Java 11,以正常运行 Flink。您可以通过以下命令来检查已经安装的 Java 版本。
# 查看java版本
java -version

# 如下显示已经安装 java 8
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

下载并解压 Flink。本示例使用 Flink 1.14.5。

# 下载 Flink
wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# 解压 Flink  
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# 进入 Flink 目录
cd flink-1.14.5

# 启动 Flink 集群
./bin/start-cluster.sh

# 返回如下信息,表示成功启动 flink 集群
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

下载 Flink CDC connector

  • 本示例的数据源为 MySQL,因此下载 flink-sql-connector-mysql-cdc-x.x.x.jar。并且版本需支持对应的 Flink 版本,两者版本支持度,请参见 Supported Flink Versions。
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
  • 1

下载 flink-connector-starrocks,并且其版本需要对应 Flink 的版本。

  • 由于本文使用 Flink 版本号 1.14.5,Scala 版本号 2.11,因此可以下载 flink-connector-starrocks JAR 包 1.2.3_flink-1.14_2.11.jar
  • 将 Flink CDC connector、Flink-connector-starrocks 的 JAR 包 flink-sql-connector-mysql-cdc-2.2.0.jar、1.2.3_flink-1.14_2.11.jar 移动至 Flink 的 lib 目录

下载并解压 SMT,并将其放在 flink-1.14.5 目录下

# 适用于 Linux x86
wget https://releases.starrocks.io/resources/smt.tar.gz
# 适用于 macOS ARM64
wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
  • 1
  • 2
  • 3
  • 4

开启 MySQL Binlog 日志

  • 编辑 MySQL 配置文件 my.cnf(默认路径为 /etc/my.cnf),以开启 MySQL Binlog。
# 开启 Binlog 日志
log_bin = ON
# 设置 Binlog 的存储位置
log_bin =/var/lib/mysql/mysql-bin
# 设置 server_id 
# 在 MySQL 5.7.3 及以后版本,如果没有 server_id,那么设置 binlog 后无法开启 MySQL 服务 
server_id = 1
# 设置 Binlog 模式为 ROW
binlog_format = ROW
# binlog 日志的基本文件名,后面会追加标识来表示每一个 Binlog 文件
log_bin_basename =/var/lib/mysql/mysql-bin
# binlog 文件的索引文件,管理所有 Binlog 文件的目录
log_bin_index =/var/lib/mysql/mysql-bin.index
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 执行如下命令,重启 MySQL,生效修改后的配置文件
 # 使用 service 启动
 service mysqld restart
 # 使用 mysqld 脚本启动
 /etc/init.d/mysqld restart
  • 1
  • 2
  • 3
  • 4
  • 连接 MySQL,执行如下语句确认是否已经开启 Binlog
-- 连接 MySQL
mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx

-- 检查是否已经开启 MySQL Binlog,`ON`就表示已开启
mysql> SHOW VARIABLES LIKE 'log_bin'; 
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

同步库表结构

  • 配置 SMT 配置文件。 进入 SMT 的 conf 目录,编辑配置文件 config_prod.conf。例如源 MySQL 连接信息、待同步库表的匹配规则,flink-starrocks-connector 配置信息等
[db]
host = xxx.xx.xxx.xx
port = 3306
user = user1
password = xxxxxx

[other]
# number of backends in StarRocks
be_num = 1
# `decimal_v3` is supported since StarRocks-1.18.1
use_decimal_v3 = true
# file to save the converted DDL SQL
output_dir = ./result

[table-rule.1]
# pattern to match databases for setting properties
database = ^demo.*$
# pattern to match tables for setting properties
table = ^.*$

############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 执行如下命令,SMT 会读取 MySQL 中同步对象的库表结构,并且结合配置文件信息,在 result 目录生成 SQL 文件,用于 StarRocks 集群创建库表(starrocks-create.all.sql), 用于向 Flink 集群提交同步数据的 flink job(flink-create.all.sql)。 并且源表不同,则 starrocks-create.all.sql 中建表语句默认创建的数据模型不同
# 运行 SMT
./starrocks-migrate-tool

# 进入并查看 result 目录中的文件
cd result
ls result
flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
flink-create.all.sql  starrocks-create.1.sql
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 生成Flink table并开始同步
[root@node01 ~]# mysql -hxxx -P9030 -uroot -proot < ./smt/result/starrocks-create.all.sql

[root@node01 bin]# ./sql-client.sh -f /xxx/smt/result/flink-create.all.sql
  • 1
  • 2
  • 3
  • 可以通过 Flink WebUI 或者在 Flink 命令行执行命令bin/flink list -running,查看 Flink 集群中正在运行的 Flink job,以及 Flink job ID
    。Flink WebUI:http://193.168.9.113:8081/

注意执行Flink JOB报错

[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data
  • 1
  • 2
  • 类加载顺序问题,flink默认是child-first,在flink的flink-conf.yaml文件中添加classloader.resolve-order: parent-first 改成parent-first,重启集群即可。

参考文档地址:

从 MySQL 实时同步

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/579471
推荐阅读
相关标签
  

闽ICP备14008679号