当前位置:   article > 正文

Flink CDC 实时同步mysql_基于flinkcdc mysql的数据同步

基于flinkcdc mysql的数据同步

1、创建环境

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
//设置job名称
tEnv.getConfig().getConfiguration().setString("pipeline.name", "FlinkCDCTest");    
  • 1
  • 2
  • 3
  • 4
  • 5

2、创建基于原mysql表的DWD表

connector = ‘mysql-cdc’

 String createSqlDwd = "CREATE TABLE dwd_order (" +
            "  GUID VARCHAR," +
            "  tenant_guid VARCHAR," +
            "  order_no VARCHAR," +
		    ....
            "  primary key(GUID)  NOT ENFORCED" +
            ") WITH (" +
            "'connector' = 'mysql-cdc'," +
            "'hostname'=''hostname," +
            "'port'='port'," +
            "'username'='username'," +
            "'password'='password'," +
            "'database-name'='datebase'," +
            "'table-name'='table'" +
            ")";
    tEnv.executeSql(createSqlDwd);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3、创建聚合后的主题宽表DWS表,作为目标表

connector’ = ‘jdbc’

String createSqlDws = "CREATE TABLE flink (" +
                "account_month varchar(6), " +
                "sales_territory_guid varchar(32), " +
                "primary key(account_month)  NOT ENFORCED" +
                ") WITH (" +
                "'connector' = 'jdbc'," +
                "'url' = 'jdbc:mysql://ip:port/database'," +
                "'username' = 'root'," +
                "'password' = 'password'," +
                "'table-name' = 'flink_test'," +
                "'sink.buffer-flush.max-rows' = '1'," +
                "'sink.buffer-flush.interval' = '1s'," +
                "'sink.max-retries' = '3'" +
                ")";
                
                tEnv.executeSql(createSqlDws);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

3、实时监听DWD表处理数据到DWS表

String insertSql = "INSERT INTO flink (account_month,sales_territory_guid) " +
                "select account_month, " +
                "sales_territory_guid " +
                "from dwd_order ";

        tEnv.executeSql(insertSql);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4、pom.xml设置主程序入口

<build>
        <finalName>flink-test-job</finalName>
        <resources>
            <resource>
                <directory>src/main/java</directory>
            </resource>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
        </resources>
        <testResources>
            <testResource>
                <directory>src/test/java</directory>
            </testResource>
        </testResources>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <!-- 可以设置jar包的入口类(可选) -->
                            <mainClass>com.csbr.cloud.flinkserver.task.Test</mainClass>
                        </manifest>
                    </archive>

                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

项目目录:
在这里插入图片描述
Flink 使用之 MySQL CDC:
https://www.jianshu.com/p/0a47e387de51

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/594799
推荐阅读
相关标签
  

闽ICP备14008679号