赞
踩
上一篇采用 DataStream 方式 Spring Boot+Flink CDC —— MySQL 同步 Elasticsearch (DataStream方式) ,该篇采用Table API方式。
基于jdk17 + springboot3.0.2 + elasticsearch7.17.9 + flink1.16.0 + flink CDC2.3.0
<properties>
<java.version>17</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>3.0.2</spring-boot.version>
<flink.version>1.16.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Flink CDC connector for MySQL -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>3.0.0-1.16</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MysqlEventListener implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
mysql2es();
}
/**
* mysql to es
*/
private void mysql2es() {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置1个并行源任务
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 数据源表
String sourceDDL =
"""
CREATE TABLE IF NOT EXISTS system_dept (
id BIGINT,
name VARCHAR(30),
sort INT,
leader_user_id BIGINT,
phone VARCHAR(11),
email VARCHAR(50),
status TINYINT,
creator VARCHAR(64),
create_time TIMESTAMP(19),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkcdc',
'password' = 'flinkcdc',
'database-name' = 'db01',
'table-name' = 'system_dept'
)
""";
// 输出目标表
String sinkDDL =
"""
CREATE TABLE IF NOT EXISTS system_dept_es (
id BIGINT,
name VARCHAR(30),
sort INT,
leader_user_id BIGINT,
phone VARCHAR(11),
email VARCHAR(50),
status TINYINT,
creator VARCHAR(64),
create_time TIMESTAMP(19),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'system_dept_search',
'sink.bulk-flush.max-actions' = '1'
)
""";
// 简单的聚合处理
String transformSQL = "INSERT INTO system_dept_es SELECT * FROM system_dept";
tableEnv.executeSql(sourceDDL);
tableEnv.executeSql(sinkDDL);
TableResult result = tableEnv.executeSql(transformSQL);
result.print();
env.executeAsync("mysql-cdc-es");
} catch (Exception e) {
log.error("mysql --> es, Exception=", e);
}
}
}
到此就大功告成啦!
代码地址:https://gitee.com/qianxkun/lakudouzi-components/tree/master/flink-cdc-mysql2es-2
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。