当前位置:   article > 正文

Flink 读写MySQL数据(DataStream和Table API)_flink mysql

flink mysql

Flink提供了基于JDBC的方式,可以将读取到的数据写入到MySQL中;本文通过两种方式将数据下入到MySQL数据库,其他的基于JDBC的数据库类似,另外,Table API方式的Catalog指定为Hive Catalog方式,持久化DDL操作。

另外,JDBC 连接器允许使用 JDBC 驱动程序从任何关系数据库读取数据并将数据写入其中。 本文档介绍如何设置 JDBC 连接器以针对关系数据库运行 SQL 查询。

如果 DDL 上定义了主键,则 JDBC sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。

默认提供 exactly-once的保证。

MySQL配置

MySQL中创建表Events数据表

  1. -- flink.events definition
  2. CREATE TABLE `events` (
  3. `user` varchar(100) DEFAULT NULL,
  4. `url` varchar(200) DEFAULT NULL,
  5. `timestamp` bigint(20) DEFAULT NULL
  6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

插入几条数据

  1. INSERT INTO flink.events
  2. (`user`, url, `timestamp`)
  3. VALUES('Alice', './home', 1000);

Maven依赖,包含了Hive Catalog的相关依赖
 

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <flink.version>1.14.4</flink.version>
  5. <scala.binary.version>2.12</scala.binary.version>
  6. <hadoop.version>3.1.2</hadoop.version>
  7. <hive.version>3.1.2</hive.version>
  8. </properties>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.flink</groupId>
  12. <artifactId>flink-java</artifactId>
  13. <version>${flink.version}</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.flink</groupId>
  17. <artifactId>flink-streaming-java_2.12</artifactId>
  18. <version>${flink.version}</version>
  19. <!-- <scope>provided</scope>-->
  20. </dependency>
  21. <dependency>
  22. <groupId>org.apache.flink</groupId>
  23. <artifactId>flink-clients_2.12</artifactId>
  24. <version>${flink.version}</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.flink</groupId>
  28. <artifactId>flink-table-api-java</artifactId>
  29. <version>${flink.version}</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.flink</groupId>
  33. <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
  34. <version>${flink.version}</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>mysql</groupId>
  38. <artifactId>mysql-connector-java</artifactId>
  39. <version>5.1.49</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.flink</groupId>
  43. <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
  44. <version>${flink.version}</version>
  45. </dependency>
  46. <!-- Flink 的 Hive 连接器-->
  47. <dependency>
  48. <groupId>org.apache.flink</groupId>
  49. <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
  50. <version>${flink.version}</version>
  51. </dependency>
  52. <!-- Hive 依赖 -->
  53. <dependency>
  54. <groupId>org.apache.hive</groupId>
  55. <artifactId>hive-exec</artifactId>
  56. <version>${hive.version}</version>
  57. <exclusions>
  58. <exclusion>
  59. <groupId>org.apache.calcite</groupId>
  60. <artifactId>*</artifactId>
  61. </exclusion>
  62. </exclusions>
  63. </dependency>
  64. <!-- Hive 依赖 -->
  65. <dependency>
  66. <groupId>org.apache.hadoop</groupId>
  67. <artifactId>hadoop-common</artifactId>
  68. <version>${hadoop.version}</version>
  69. </dependency>
  70. <!-- Hive 依赖 -->
  71. <dependency>
  72. <groupId>org.apache.hadoop</groupId>
  73. <artifactId>hadoop-mapreduce-client-core</artifactId>
  74. <version>${hadoop.version}</version>
  75. </dependency>
  76. <!-- Hive 依赖 -->
  77. <dependency>
  78. <groupId>org.apache.hadoop</groupId>
  79. <artifactId>hadoop-hdfs</artifactId>
  80. <version>${hadoop.version}</version>
  81. </dependency>
  82. </dependencies>

DataStream方式读写MySQL数据

  1. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
  2. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
  3. import org.apache.flink.connector.jdbc.JdbcSink;
  4. import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import java.sql.PreparedStatement;
  8. import java.sql.SQLException;
  9. public class JdbcSinkExample {
  10. public static void main(String[] args) throws Exception {
  11. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  12. env.setParallelism(1);
  13. SingleOutputStreamOperator<Event> eventStream = env.fromElements(
  14. new Event("Alice", "./home", 1000L),
  15. new Event("Bob", "./cart", 2000L),
  16. new Event("Alice", "./prod?id=1", 5 * 1000L),
  17. new Event("Cary", "./home", 60 * 1000L),
  18. new Event("Bob", "./prod?id=3", 90 * 1000L),
  19. new Event("Alice", "./prod?id=1", 105 * 1000L)
  20. );
  21. eventStream.addSink(JdbcSink.sink(
  22. "insert into events (user, url, timestamp) values (?, ?, ?)",
  23. new JdbcStatementBuilder<Event>() {
  24. @Override
  25. public void accept(PreparedStatement preparedStatement, Event event) throws SQLException {
  26. preparedStatement.setString(1, event.user);
  27. preparedStatement.setString(2, event.url);
  28. preparedStatement.setLong(3, event.timestamp);
  29. }
  30. },
  31. JdbcExecutionOptions.builder()
  32. .withBatchSize(1000)
  33. .withBatchIntervalMs(200)
  34. .withMaxRetries(5)
  35. .build(),
  36. new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
  37. .withUrl("jdbc:mysql://127.0.0.1:3306/flink")
  38. .withDriverName("com.mysql.jdbc.Driver")
  39. .withUsername("root")
  40. .withPassword("xxx")
  41. .build()
  42. ));
  43. env.execute();
  44. }
  45. }

Table API的方式读写MySQL,其中Flink的Catalog使用Hive Catalog的方式。熟悉 Flink 或者 Spark 等大数据引擎的同学应该都知道这两个计算引擎都有一个共同的组件叫 Catalog。下面是 Flink 的 Catalog 的官方定义。

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

Table API与SQL API实现了Apache Flink的批流统一的实现方式。Table API与SQL API的核心概念就是TableEnviroment。TableEnviroment对象提供方法注册数据源与数据表信息。那么数据源与数据表的信息则存储在CataLog中。所以,CataLog是TableEnviroment的重要组成部分。

Apache Flink在获取TableEnviroment对象后,可以通过Register实现对数据源与数据表进行注册。注册完成后数据库与数据表的原信息则存储在CataLog中。CataLog中保存了所有的表结构信息、数据目录信息等。

简单来说,Catalog 就是元数据管理中心,其中元数据包括数据库、表、表结构等信息。

  1. public class JDBCTable {
  2. public static void main(String[] args) throws Exception {
  3. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4. env.setParallelism(1);
  5. SingleOutputStreamOperator<Event> eventStream = env.fromElements(
  6. new Event("Alice", "./home", 1000L),
  7. new Event("Bob", "./cart", 2000L),
  8. new Event("Alice", "./prod?id=1", 5 * 1000L),
  9. new Event("Cary", "./home", 60 * 1000L),
  10. new Event("Bob", "./prod?id=3", 90 * 1000L),
  11. new Event("Alice", "./prod?id=1", 105 * 1000L)
  12. );
  13. //获取表环境
  14. //StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
  15. EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
  16. TableEnvironment tableEnv = TableEnvironment.create(settings);
  17. String name = "myhive";
  18. String defaultDatabase = "default";
  19. String hiveConfDir = "/opt/hive";
  20. // 创建一个 HiveCatalog,并在表环境中注册
  21. HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
  22. tableEnv.registerCatalog("myhive", hive);
  23. // 使用 HiveCatalog 作为当前会话的 catalog
  24. tableEnv.useCatalog("myhive");
  25. TableResult tableResult = tableEnv.executeSql("CREATE TABLE IF NOT EXISTS EventTable (\n" +
  26. "`user` STRING,\n" +
  27. "url STRING,\n" +
  28. "`timestamp` BIGINT\n" +
  29. ") WITH (\n" +
  30. "'connector' = 'jdbc',\n" +
  31. "'url' = 'jdbc:mysql://127.0.0.1:3306/flink',\n" +
  32. "'table-name' = 'events',\n" +
  33. "'username'='root',\n" +
  34. "'password'='xxx'\n" +
  35. ")");
  36. tableEnv.executeSql("insert into EventTable values('Alice','./prod?id=3',106000)");
  37. }

MySQL中的数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/1005205
推荐阅读
相关标签
  

闽ICP备14008679号