赞
踩
[mysqld]
# 前面还有其他配置
# 添加的部分
server-id = 12345
log-bin = mysql-bin
# 必须为ROW
binlog_format = ROW
# 必须为FULL,MySQL-5.7后才有该参数
binlog_row_image = FULL
expire_logs_days = 10
systemctl restart mysqld
show variables like '%log_bin%'; 运行后应该显示 log_bin 是 ON
show variables like 'binlog_format'; 运行后应该显示 binlog_format 是 ROW
<properties> <java.version>11</java.version> <flink.version>1.13.6</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.25</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>11</source> <target>11</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <!-- 可以将依赖打到jar包中 --> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class DataStreamExample { public static void main(String[] args) throws Exception { // 1.获取Flink执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.通过FlinkCDC构建SourceFunction DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder() .hostname("10.0.0.12") .port(3306) .username("root") .password("123456") .databaseList("db_inventory_cdc") // 订阅的库 .tableList("db_inventory_cdc.tb_products_cdc") // 订阅的表 .deserializer(new JsonDebeziumDeserializationSchema()) //反序列化 //initial 当启动时都数据库,可以读历史数据 //earliest 从binlog开始读 //latest //从binlog末尾读. .startupOptions(StartupOptions.initial()) .build(); // 3. 数据打印 env.addSource(sourceFunction).print(); // 4. 启动任务 env.execute(); } }
./bin/flink run -m hadoop102:8081 -c com.tianyi.FlinkCDC ./flink-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.types.Row; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkSQLExample { public static void main(String[] args) throws Exception { // 1. 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2. 创建 Flink-MySQL-CDC 的 Source tableEnv.executeSql("CREATE TABLE flink_sql (" + " id STRING primary key," + " name STRING," + " sex STRING" + ") WITH (" + " 'connector' = 'mysql-cdc'," + " 'scan.startup.mode' = 'latest-offset'," + " 'hostname' = '10.0.0.12'," + " 'port' = '3306'," + " 'username' = 'root'," + " 'password' = '123456'," + " 'database-name' = 'db_inventory_cdc'," + " 'table-name' = 'flink_sql'" + ")"); //3. 查询数据并转换为流输出 Table table = tableEnv.sqlQuery("select * from flink_sql"); DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class); retractStream.print(); env.execute(); } }
DataStream 在 Flink1.12 和 1.13 都可以用,而 FlinkSQL 只能在 Flink1.13 使用。
DataStream 可以同时监控多库多表,而 FlinkSQL 只能监控单表。
参考网址:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。