当前位置:   article > 正文

Flink mysql cdc 读取_flinkcdc 如何读取mysql增量的

flinkcdc 如何读取mysql增量的
  • Flink1.11 读取mysql cdc
  • 返回DataStream[(Boolean, Row)],可以根据元组第一个值为True or false判定数据是弃用或者更新插入
#mysql 权限报错考虑修改,然后重启msyql
# Client does not support authentication protocol requested by server
ALTER USER 'root'@'%' IDENTIFIED BY '123456' PASSWORD EXPIRE NEVER;
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
#pom冲突,会导致任务没反应,注释掉(mysql-connector-java)解决
  • 1
  • 2
  • 3
  • 4
  • 5
package com.cdc

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.types.Row
object MysqlCDC {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance.inStreamingMode.useBlinkPlanner.build
    val sTableEnv = StreamTableEnvironment.create(env,bsSettings)
    //env.enableCheckpointing(10000) cdc2版本需要开启
    val userDDL =
      s"""
         |CREATE TABLE t_user (
         | uid int,
         | name string
         |) WITH (
         | 'connector' = 'mysql-cdc',
         | 'hostname' = 'jeff200',
         | 'port' = '3306',
         | 'username' = 'root',
         | 'password' = 'root',
         | 'database-name' = 'test_db',
         | 'table-name' = 't_user'
         |)
         |""".stripMargin
    sTableEnv.executeSql(userDDL)
    val filterSql =
      s"""
         |SELECT uid, name
         |FROM t_user
         |WHERE uid > 0
       """.stripMargin
    val table: Table = sTableEnv.sqlQuery(filterSql)
    // 回撤流方式输出
    val kafkaStream:DataStream[(Boolean, Row)] = sTableEnv.toRetractStream(table)
    kafkaStream.print()
    env.execute("mysql cdc")
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 开启binlog
#在/etc/my.cnf中的[mysqld]下面直接增加内容
vi /etc/my.cnf
	server_id=1
	log_bin = mysql-bin
	binlog_format = ROW
#退出并保存
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 在navicat或者sql客户端执行,建表,插入数
USE test_db;
CREATE TABLE t_user
(
`uid` INT(11),
`name` VARCHAR(25),
`age` INT(11),
`sex` VARCHAR(25),
`ts` timestamp default CURRENT_TIMESTAMP()
);

use test_db;
INSERT INTO t_user (uid, `name`, age, sex) VALUES (1, "小明", 20, '男');
INSERT INTO t_user (uid, `name`, age, sex) VALUES (2, "小丽", 34, '女');

SELECT * FROM t_user
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 添加mysql cdc 依赖 和table blink
<!-- flink table api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>${scope}</scope>
        </dependency>

        <!-- Flink-CDC -->
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.1.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 结果
    在这里插入图片描述
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/1005196
推荐阅读
相关标签
  

闽ICP备14008679号