赞
踩
下载、安装并启动 Flink 集群
# 查看java版本
java -version
# 如下显示已经安装 java 8
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
下载并解压 Flink。本示例使用 Flink 1.14.5。
# 下载 Flink
wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# 解压 Flink
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# 进入 Flink 目录
cd flink-1.14.5
# 启动 Flink 集群
./bin/start-cluster.sh
# 返回如下信息,表示成功启动 flink 集群
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
下载 Flink CDC connector
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
下载 flink-connector-starrocks,并且其版本需要对应 Flink 的版本。
下载并解压 SMT,并将其放在 flink-1.14.5 目录下
# 适用于 Linux x86
wget https://releases.starrocks.io/resources/smt.tar.gz
# 适用于 macOS ARM64
wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
# 开启 Binlog 日志
log_bin = ON
# 设置 Binlog 的存储位置
log_bin =/var/lib/mysql/mysql-bin
# 设置 server_id
# 在 MySQL 5.7.3 及以后版本,如果没有 server_id,那么设置 binlog 后无法开启 MySQL 服务
server_id = 1
# 设置 Binlog 模式为 ROW
binlog_format = ROW
# binlog 日志的基本文件名,后面会追加标识来表示每一个 Binlog 文件
log_bin_basename =/var/lib/mysql/mysql-bin
# binlog 文件的索引文件,管理所有 Binlog 文件的目录
log_bin_index =/var/lib/mysql/mysql-bin.index
# 使用 service 启动
service mysqld restart
# 使用 mysqld 脚本启动
/etc/init.d/mysqld restart
-- 连接 MySQL
mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx
-- 检查是否已经开启 MySQL Binlog,`ON`就表示已开启
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.00 sec)
[db] host = xxx.xx.xxx.xx port = 3306 user = user1 password = xxxxxx [other] # number of backends in StarRocks be_num = 1 # `decimal_v3` is supported since StarRocks-1.18.1 use_decimal_v3 = true # file to save the converted DDL SQL output_dir = ./result [table-rule.1] # pattern to match databases for setting properties database = ^demo.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port> flink.starrocks.load-url= <fe_host>:<fe_http_port> flink.starrocks.username=user2 flink.starrocks.password=xxxxxx flink.starrocks.sink.properties.format=csv flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000
# 运行 SMT
./starrocks-migrate-tool
# 进入并查看 result 目录中的文件
cd result
ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql
[root@node01 ~]# mysql -hxxx -P9030 -uroot -proot < ./smt/result/starrocks-create.all.sql
[root@node01 bin]# ./sql-client.sh -f /xxx/smt/result/flink-create.all.sql
[ERROR] Could not execute SQL statement. Reason:
java.io.StreamCorruptedException: unexpected block data
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。