赞
踩
#mysql 权限报错考虑修改,然后重启msyql
# Client does not support authentication protocol requested by server
ALTER USER 'root'@'%' IDENTIFIED BY '123456' PASSWORD EXPIRE NEVER;
ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY '123456';
#pom冲突,会导致任务没反应,注释掉(mysql-connector-java)解决
package com.cdc import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.types.Row object MysqlCDC { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val bsSettings = EnvironmentSettings.newInstance.inStreamingMode.useBlinkPlanner.build val sTableEnv = StreamTableEnvironment.create(env,bsSettings) //env.enableCheckpointing(10000) cdc2版本需要开启 val userDDL = s""" |CREATE TABLE t_user ( | uid int, | name string |) WITH ( | 'connector' = 'mysql-cdc', | 'hostname' = 'jeff200', | 'port' = '3306', | 'username' = 'root', | 'password' = 'root', | 'database-name' = 'test_db', | 'table-name' = 't_user' |) |""".stripMargin sTableEnv.executeSql(userDDL) val filterSql = s""" |SELECT uid, name |FROM t_user |WHERE uid > 0 """.stripMargin val table: Table = sTableEnv.sqlQuery(filterSql) // 回撤流方式输出 val kafkaStream:DataStream[(Boolean, Row)] = sTableEnv.toRetractStream(table) kafkaStream.print() env.execute("mysql cdc") } }
#在/etc/my.cnf中的[mysqld]下面直接增加内容
vi /etc/my.cnf
server_id=1
log_bin = mysql-bin
binlog_format = ROW
#退出并保存
USE test_db;
CREATE TABLE t_user
(
`uid` INT(11),
`name` VARCHAR(25),
`age` INT(11),
`sex` VARCHAR(25),
`ts` timestamp default CURRENT_TIMESTAMP()
);
use test_db;
INSERT INTO t_user (uid, `name`, age, sex) VALUES (1, "小明", 20, '男');
INSERT INTO t_user (uid, `name`, age, sex) VALUES (2, "小丽", 34, '女');
SELECT * FROM t_user
<!-- flink table api --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <scope>${scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <scope>${scope}</scope> </dependency> <!-- Flink-CDC --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.1.0</version> </dependency>
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。