当前位置:   article > 正文

flink数据实时传入mysql_flink mysql同步到mysql

flink mysql同步到mysql

要将Flink数据实时传入MySQL,可以使用Flink的JDBC sink连接器。下面是一个简单的代码示例:

  1. 1. 导入依赖:
  2. ```java
  3. import org.apache.flink.configuration.Configuration;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
  7. import java.sql.Connection;
  8. import java.sql.DriverManager;
  9. import java.sql.PreparedStatement;
  10. ```
  11. 2. 创建Flink的执行环境:
  12. ```java
  13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  14. ```
  15. 3. 创建输入数据流:
  16. ```java
  17. DataStream<String> inputDataStream = env.fromElements("data1", "data2", "data3");
  18. ```
  19. 4. 创建JDBC sink连接器并将数据发送到MySQL:
  20. ```java
  21. String jdbcUrl = "jdbc:mysql://localhost:3306/my_database";
  22. String username = "root";
  23. String password = "password";
  24. inputDataStream.addSink(new RichSinkFunction<String>() {
  25.     private Connection connection;
  26.     private PreparedStatement preparedStatement;
  27.     @Override
  28.     public void open(Configuration parameters) throws Exception {
  29.         super.open(parameters);
  30.         connection = DriverManager.getConnection(jdbcUrl, username, password);
  31.         preparedStatement = connection.prepareStatement("INSERT INTO my_table (data) VALUES (?)");
  32.     }
  33.     @Override
  34.     public void invoke(String value, Context context) throws Exception {
  35.         preparedStatement.setString(1, value);
  36.         preparedStatement.executeUpdate();
  37.     }
  38.     @Override
  39.     public void close() throws Exception {
  40.         super.close();
  41.         if (preparedStatement != null) {
  42.             preparedStatement.close();
  43.         }
  44.         if (connection != null) {
  45.             connection.close();
  46.         }
  47.     }
  48. });
  49. ```
  50. 在上述代码中,我们使用了Flink的`RichSinkFunction`作为sink连接器,并在`open`方法中创建了MySQL的连接,`invoke`方法中执行了插入数据的SQL语句。
  51. 5. 执行Flink作业:
  52. ```java
  53. env.execute("Sink to MySQL");
  54. ```

以上代码将创建一个Flink的作业,将输入数据实时写入到MySQL数据库中。你可以根据自己的需求对JDBC连接进行配置,并根据具体的数据表结构修改SQL语句。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/926246
推荐阅读
相关标签
  

闽ICP备14008679号