赞
踩
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
//设置job名称
tEnv.getConfig().getConfiguration().setString("pipeline.name", "FlinkCDCTest");
connector = ‘mysql-cdc’
String createSqlDwd = "CREATE TABLE dwd_order (" + " GUID VARCHAR," + " tenant_guid VARCHAR," + " order_no VARCHAR," + .... " primary key(GUID) NOT ENFORCED" + ") WITH (" + "'connector' = 'mysql-cdc'," + "'hostname'=''hostname," + "'port'='port'," + "'username'='username'," + "'password'='password'," + "'database-name'='datebase'," + "'table-name'='table'" + ")"; tEnv.executeSql(createSqlDwd);
connector’ = ‘jdbc’
String createSqlDws = "CREATE TABLE flink (" + "account_month varchar(6), " + "sales_territory_guid varchar(32), " + "primary key(account_month) NOT ENFORCED" + ") WITH (" + "'connector' = 'jdbc'," + "'url' = 'jdbc:mysql://ip:port/database'," + "'username' = 'root'," + "'password' = 'password'," + "'table-name' = 'flink_test'," + "'sink.buffer-flush.max-rows' = '1'," + "'sink.buffer-flush.interval' = '1s'," + "'sink.max-retries' = '3'" + ")"; tEnv.executeSql(createSqlDws);
String insertSql = "INSERT INTO flink (account_month,sales_territory_guid) " +
"select account_month, " +
"sales_territory_guid " +
"from dwd_order ";
tEnv.executeSql(insertSql);
<build> <finalName>flink-test-job</finalName> <resources> <resource> <directory>src/main/java</directory> </resource> <resource> <directory>src/main/resources</directory> </resource> </resources> <testResources> <testResource> <directory>src/test/java</directory> </testResource> </testResources> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <addClasspath>true</addClasspath> <!-- 可以设置jar包的入口类(可选) --> <mainClass>com.csbr.cloud.flinkserver.task.Test</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
项目目录:
Flink 使用之 MySQL CDC:
https://www.jianshu.com/p/0a47e387de51
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。