当前位置:   article > 正文

## flink- mysql同步数据至starrocks-2.5.0之数据同步_flinkcdc mysql同步starrocks jar

flinkcdc mysql同步starrocks jar

flink- mysql同步数据至starrocks-2.5.0之数据同步

mysql 创建 表

CREATE TABLE `t_user` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `user_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

starrocks 创建 表

starrocks 默认用户是 root ,不需要密码

mysql -P9030 -h127.0.0.1 -uroot --prompt="StarRocks > "
  • 1

也可以修改密码

SET PASSWORD FOR 'root' = PASSWORD('root');
  • 1

创建表t_user_sink:


StarRocks > use flink;
StarRocks > CREATE TABLE `t_user_sink` (
    ->   `id` bigint NOT NULL,
    ->   `user_name` varchar(255) DEFAULT NULL,
    ->   `age` int DEFAULT NULL
    -> )
    -> PRIMARY KEY(`id`)
    -> DISTRIBUTED BY HASH(id) BUCKETS 3
    -> PROPERTIES
    -> (
    ->     "replication_num" = "1" 
    -> );
Query OK, 0 rows affected (0.08 sec)

StarRocks > select * from t_user_sink;
Empty set (0.02 sec)

StarRocks > SHOW PROC '/frontends'\G
*************************** 1. row ***************************
             Name: 192.168.16.2_9010_1686905244720
               IP: 192.168.16.2
      EditLogPort: 9010
         HttpPort: 8030
        QueryPort: 9030
          RpcPort: 9020
             Role: LEADER
        ClusterId: 776853271
             Join: true
            Alive: true
ReplayedJournalId: 53824
    LastHeartbeat: 2023-06-21 02:01:41
         IsHelper: true
           ErrMsg: 
        StartTime: 2023-06-20 11:50:07
          Version: 2.5.0-0ee1b3b8c
1 row in set (0.02 sec)

# 查看be
StarRocks > SHOW PROC '/backends'\G
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

**记住 HttpPort, QueryPort, 代码中要用到 **

sql:

CREATE TABLE `t_user_sink` (
  `id` bigint NOT NULL,
  `user_name` varchar(255) DEFAULT NULL,
  `age` int DEFAULT NULL
)
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES
(
    "replication_num" = "1" 
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

程序

依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example.db</groupId>
    <artifactId>flink-cdc-starrocks</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>flink-cdc-starrocks</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.15.4</flink.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <log4j.version>2.20.0</log4j.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
        <!--        cdc-->
        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jdk8</artifactId>
            <version>2.15.2</version>
        </dependency>
        <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>1.2.7_flink-1.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-guava</artifactId>
            <version>30.1.1-jre-15.0</version>
        </dependency>
          <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table-api-java-bridge</artifactId>
              <version>${flink.version}</version>
          </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--        cdc-->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.33</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>flink-cdc-starrocks</finalName>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <!--声明绑定到maven的compile阶段 -->
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- Maven Assembly Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4.1</version>
                <configuration>
                    <!-- get all project dependencies -->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <!-- bind to the packaging phase -->
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!--拷贝依赖到jar外面的lib目录-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.5.0</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <!-- 拷贝项目依赖包到lib/目录下 -->
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
MysqlDbCdc

采用 flink table api 方式

public class MysqlDbCdc {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      /*  env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔5秒钟做一次CK
        env.enableCheckpointing(5000L);
        //2.2 指定CK的一致性语义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2.3 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //2.4 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));*/
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        // 数据源表
        String sourceDDL =
               "CREATE TABLE `t_user` (\n" +
                "  `id` bigint,\n" +
                "  `user_name` varchar(255),\n" +
                "  `age` int,\n" +
                "  PRIMARY KEY (`id`) NOT ENFORCED\n" +
                ") WITH (\n" +
                        " 'connector' = 'mysql-cdc',\n" +
                        " 'hostname' = '192.168.x.xx',\n" +
                        " 'port' = '3306',\n" +
                        " 'username' = 'root',\n" +
                        " 'password' = 'root',\n" +
                        " 'database-name' = 'flink-db',\n" +
                       " 'server-time-zone' = 'Asia/Shanghai',\n" +
                        " 'table-name' = 't_user'\n" +
                        ")";
        // 输出目标表
        String sinkDDL =
                "CREATE TABLE `t_user_sink` (\n" +
                        "  `id` bigint,\n" +
                        "  `user_name` varchar(255),\n" +
                        "  `age` int,\n" +
                        "  PRIMARY KEY (`id`) NOT ENFORCED\n" +
                        ") WITH (\n" +
                        "  'sink.properties.format' = 'json',\n" +
                        "  'username' = 'root',\n" +
                        "  'password' = '',\n" +
                        "  'sink.max-retries' = '10',\n" +
                        "  'sink.buffer-flush.max-rows' = '1000000',\n" +
                        "  'sink.buffer-flush.max-bytes' = '300000000',\n" +
                        "  'sink.properties.strip_outer_array' = 'true',\n" +
                        "  'sink.buffer-flush.interval-ms' = '15000',\n" +
                        "  'load-url' = '192.168.x.xx:8030',\n" +
                        "  'database-name' = 'flink',\n" +
                        "  'jdbc-url' = 'jdbc:mysql://192.168.x.xx:9030/flink?useUnicode=true" +
                        "&characterEncoding=UTF-8&userSSL=false&serverTimezone=Asia/Shanghai',\n" +
                        "  'connector' = 'starrocks',\n" +
                        "  'table-name' = 't_user_sink'" +
                        ")";

        String transformSQL =
                "INSERT INTO t_user_sink  SELECT * FROM t_user";
        tableEnv.executeSql(sourceDDL);
       tableEnv.executeSql(sinkDDL);
        TableResult tableResult = tableEnv.executeSql(transformSQL);
        tableResult.print();
        env.execute("abc");
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

日志 log4j2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="error">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

上传flink job

将jar 包上传到 flink dashboard, 且需要将依赖包一并上传,不然 flink 缺少运行 jar包

在 mysql 中插入数据

INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (1, 'hello3', 12);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (2, 'abc', 1);
INSERT INTO `flink-db`.`t_user` (`id`, `user_name`, `age`) VALUES (3, 'dsd', 23);
  • 1
  • 2
  • 3

查看 flink dashboard 日志:

在这里插入图片描述

这样应该就是同步成功了

查看 starrocks 数据库:

StarRocks > select * from t_user_sink;
+------+-----------+------+
| id   | user_name | age  |
+------+-----------+------+
|    2 | abc       |    1 |
|    3 | dsd       |   23 |
|    1 | hello3    |   12 |
+------+-----------+------+
3 rows in set (0.01 sec)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

进行 删除后,发现 starrocks 也同步进行了删除

good luck!

参考

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/381277
推荐阅读
相关标签
  

闽ICP备14008679号