赞
踩
要将Flink数据实时传入MySQL,可以使用Flink的JDBC sink连接器。下面是一个简单的代码示例:
- 1. 导入依赖:
-
- ```java
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
- import java.sql.Connection;
- import java.sql.DriverManager;
- import java.sql.PreparedStatement;
- ```
-
- 2. 创建Flink的执行环境:
-
- ```java
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- ```
-
- 3. 创建输入数据流:
-
- ```java
- DataStream<String> inputDataStream = env.fromElements("data1", "data2", "data3");
- ```
-
- 4. 创建JDBC sink连接器并将数据发送到MySQL:
-
- ```java
- String jdbcUrl = "jdbc:mysql://localhost:3306/my_database";
- String username = "root";
- String password = "password";
-
- inputDataStream.addSink(new RichSinkFunction<String>() {
- private Connection connection;
- private PreparedStatement preparedStatement;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- connection = DriverManager.getConnection(jdbcUrl, username, password);
- preparedStatement = connection.prepareStatement("INSERT INTO my_table (data) VALUES (?)");
- }
-
- @Override
- public void invoke(String value, Context context) throws Exception {
- preparedStatement.setString(1, value);
- preparedStatement.executeUpdate();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (preparedStatement != null) {
- preparedStatement.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- });
- ```
-
- 在上述代码中,我们使用了Flink的`RichSinkFunction`作为sink连接器,并在`open`方法中创建了MySQL的连接,`invoke`方法中执行了插入数据的SQL语句。
-
- 5. 执行Flink作业:
-
- ```java
- env.execute("Sink to MySQL");
- ```
以上代码将创建一个Flink的作业,将输入数据实时写入到MySQL数据库中。你可以根据自己的需求对JDBC连接进行配置,并根据具体的数据表结构修改SQL语句。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。