赞
踩
CREATE TABLE `t_user` (
`id` bigint NOT NULL AUTO_INCREMENT,
`user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
starrocks 默认用户是 root ,不需要密码
mysql -P9030 -h127.0.0.1 -uroot --prompt="StarRocks > "
也可以修改密码
SET PASSWORD FOR 'root' = PASSWORD('root');
创建表t_user_sink
:
StarRocks > use flink; StarRocks > CREATE TABLE `t_user_sink` ( -> `id` bigint NOT NULL, -> `user_name` varchar(255) DEFAULT NULL, -> `age` int DEFAULT NULL -> ) -> PRIMARY KEY(`id`) -> DISTRIBUTED BY HASH(id) BUCKETS 3 -> PROPERTIES -> ( -> "replication_num" = "1" -> ); Query OK, 0 rows affected (0.08 sec) StarRocks > select * from t_user_sink; Empty set (0.02 sec) StarRocks > SHOW PROC '/frontends'\G *************************** 1. row *************************** Name: 192.168.16.2_9010_1686905244720 IP: 192.168.16.2 EditLogPort: 9010 HttpPort: 8030 QueryPort: 9030 RpcPort: 9020 Role: LEADER ClusterId: 776853271 Join: true Alive: true ReplayedJournalId: 53824 LastHeartbeat: 2023-06-21 02:01:41 IsHelper: true ErrMsg: StartTime: 2023-06-20 11:50:07 Version: 2.5.0-0ee1b3b8c 1 row in set (0.02 sec) # 查看be StarRocks > SHOW PROC '/backends'\G
**记住 HttpPort, QueryPort, 代码中要用到 **
sql:
CREATE TABLE `t_user_sink` (
`id` bigint NOT NULL,
`user_name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL
)
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES
(
"replication_num" = "1"
);
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example.db</groupId> <artifactId>flink-cdc-starrocks</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>flink-cdc-starrocks</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.15.4</flink.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <log4j.version>2.20.0</log4j.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- cdc--> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jdk8</artifactId> <version>2.15.2</version> </dependency> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>1.2.7_flink-1.15</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-guava</artifactId> <version>30.1.1-jre-15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency> <!-- cdc--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.33</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <scope>compile</scope> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <scope>compile</scope> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <scope>compile</scope> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> </dependencies> <build> <finalName>flink-cdc-starrocks</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <!--声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- Maven Assembly Plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <!-- get all project dependencies --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <!--拷贝依赖到jar外面的lib目录--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.5.0</version> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <!-- 拷贝项目依赖包到lib/目录下 --> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
采用 flink table api 方式
public class MysqlDbCdc { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序 //2.1 开启Checkpoint,每隔5秒钟做一次CK env.enableCheckpointing(5000L); //2.2 指定CK的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次CK数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从CK自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));*/ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 数据源表 String sourceDDL = "CREATE TABLE `t_user` (\n" + " `id` bigint,\n" + " `user_name` varchar(255),\n" + " `age` int,\n" + " PRIMARY KEY (`id`) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'mysql-cdc',\n" + " 'hostname' = '192.168.x.xx',\n" + " 'port' = '3306',\n" + " 'username' = 'root',\n" + " 'password' = 'root',\n" + " 'database-name' = 'flink-db',\n" + " 'server-time-zone' = 'Asia/Shanghai',\n" + " 'table-name' = 't_user'\n" + ")"; // 输出目标表 String sinkDDL = "CREATE TABLE `t_user_sink` (\n" + " `id` bigint,\n" + " `user_name` varchar(255),\n" + " `age` int,\n" + " PRIMARY KEY (`id`) NOT ENFORCED\n" + ") WITH (\n" + " 'sink.properties.format' = 'json',\n" + " 'username' = 'root',\n" + " 'password' = '',\n" + " 'sink.max-retries' = '10',\n" + " 'sink.buffer-flush.max-rows' = '1000000',\n" + " 'sink.buffer-flush.max-bytes' = '300000000',\n" + " 'sink.properties.strip_outer_array' = 'true',\n" + " 'sink.buffer-flush.interval-ms' = '15000',\n" + " 'load-url' = '192.168.x.xx:8030',\n" + " 'database-name' = 'flink',\n" + " 'jdbc-url' = 'jdbc:mysql://192.168.x.xx:9030/flink?useUnicode=true" + "&characterEncoding=UTF-8&userSSL=false&serverTimezone=Asia/Shanghai',\n" + " 'connector' = 'starrocks',\n" + " 'table-name' = 't_user_sink'" + ")"; String transformSQL = "INSERT INTO t_user_sink SELECT * FROM t_user"; tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sinkDDL); TableResult tableResult = tableEnv.executeSql(transformSQL); tableResult.print(); env.execute("abc"); } }
日志 log4j2.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="error">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
将jar 包上传到 flink dashboard, 且需要将依赖包一并上传,不然 flink 缺少运行 jar包
在 mysql 中插入数据
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (1, 'hello3', 12);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (2, 'abc', 1);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (3, 'dsd', 23);
查看 flink dashboard 日志:
这样应该就是同步成功了
查看 starrocks 数据库:
StarRocks > select * from t_user_sink;
+------+-----------+------+
| id | user_name | age |
+------+-----------+------+
| 2 | abc | 1 |
| 3 | dsd | 23 |
| 1 | hello3 | 12 |
+------+-----------+------+
3 rows in set (0.01 sec)
进行 删除后,发现 starrocks 也同步进行了删除
good luck!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。