赞
踩
基于flink-1.15.4
使用的 mongodb-cdc 不是官网的,官网提供要在1.16以后
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example.db</groupId> <artifactId>flink-mong-starrocks</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>flink-mong-starrocks</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.15.4</flink.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <log4j.version>2.20.0</log4j.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- mongo-cdc--> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mongodb-cdc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mongodb-cdc</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>1.15.4</version> </dependency> <!-- mongo-cdc end --> <!-- cdc--> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jdk8</artifactId> <version>2.15.2</version> </dependency> <dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>1.2.7_flink-1.15</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-guava</artifactId> <version>30.1.1-jre-15.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> </dependency> <!-- cdc--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <scope>compile</scope> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <scope>compile</scope> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <scope>compile</scope> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.26</version> </dependency> </dependencies> <build> <finalName>flink-mong-starrocks</finalName> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <!--声明绑定到maven的compile阶段 --> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- Maven Assembly Plugin --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <!-- get all project dependencies --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <!--拷贝依赖到jar外面的lib目录--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.5.0</version> <executions> <execution> <id>copy-dependencies</id> <phase>prepare-package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <!-- 拷贝项目依赖包到lib/目录下 --> <outputDirectory>${project.build.directory}/lib</outputDirectory> <overWriteReleases>false</overWriteReleases> <overWriteSnapshots>false</overWriteSnapshots> <overWriteIfNewer>true</overWriteIfNewer> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
public class MongoDbCdc { /** * flink-cdc-mongo https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mongodb-cdc.html#data-type-mapping * https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/mongodb-cdc(ZH).md * @param args * @throws Exception */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序 //2.1 开启Checkpoint,每隔5秒钟做一次CK env.enableCheckpointing(5000L); //2.2 指定CK的一致性语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //2.3 设置任务关闭的时候保留最后一次CK数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定从CK自动重启策略 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));*/ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 数据源表 String sourceDDL = "CREATE TABLE `test_article` (\n" + " `_id` STRING,\n" + " `articleId` INT,\n" + " `title` STRING,\n" + " PRIMARY KEY (`_id`) NOT ENFORCED\n" + //mongo 主键 Primary key must be _id field ") WITH (\n" + " 'connector' = 'mongodb-cdc',\n" + " 'hosts' = '192.x.x.x:27017',\n" + //多个 host " 'username' = 'flinkuser',\n" + " 'password' = 'flinkpw',\n" + " 'database' = 'test',\n" + " 'collection' = 'test_article'\n" + ")"; // 输出目标表 String sinkDDL = "CREATE TABLE `t_article` (\n" + " `articleId` bigint,\n" + " `title` varchar(255),\n" + " PRIMARY KEY (`articleId`) NOT ENFORCED\n" + ") WITH (\n" + " 'sink.properties.format' = 'json',\n" + " 'username' = 'root',\n" + " 'password' = 'root',\n" + " 'sink.max-retries' = '10',\n" + " 'sink.buffer-flush.max-rows' = '1000000',\n" + " 'sink.buffer-flush.max-bytes' = '300000000',\n" + " 'sink.properties.strip_outer_array' = 'true',\n" + " 'sink.buffer-flush.interval-ms' = '1000',\n" + " 'load-url' = '192.x.xx.x:8030',\n" + //8030 " 'database-name' = 'test',\n" + " 'jdbc-url' = 'jdbc:mysql://192.xx.xx.x:9030/test?useUnicode=true" + //9030 "&characterEncoding=UTF-8&userSSL=false&serverTimezone=Asia/Shanghai',\n" + " 'connector' = 'starrocks',\n" + " 'table-name' = 't_article'" + ")"; String transformSQL = "INSERT INTO t_article(articleId,title) SELECT articleId,title FROM test_article"; tableEnv.executeSql(sourceDDL); tableEnv.executeSql(sinkDDL); TableResult tableResult = tableEnv.executeSql(transformSQL); tableResult.print(); env.execute("abc"); } }
use admin; db.createRole( { role: "flinkrole", privileges: [{ // 所有数据库中所有非系统集合的 grant 权限 resource: { db: "", collection: "" }, actions: [ "splitVector", "listDatabases", "listCollections", "collStats", "find", "changeStream" ] }], roles: [ // 阅读 config.collections 和 config.chunks // 用于分片集群快照拆分。 { role: 'read', db: 'config' } ] } ); db.createUser( { user: 'flinkuser', pwd: 'flinkpw', roles: [ { role: 'flinkrole', db: 'admin' } ] } );
CREATE TABLE `t_article` (
`articleId` bigint NOT NULL,
`title` varchar(255) DEFAULT NULL
)
PRIMARY KEY(`articleId`)
DISTRIBUTED BY HASH(articleId) BUCKETS 3
PROPERTIES
(
"replication_num" = "1"
);
PS: flink 同步数据时, jar 依赖太恶心, 一点也不友好
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。