赞
踩
Doris中建表
CREATE TABLE IF NOT EXISTS demo.user
(
`id` INT NOT NULL,
`name` VARCHAR(255),
`age` INT
) DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "1"
);
Flink开发相关依赖
<properties> <flink.version>1.12.1</flink.version> <scala.version>2.12.13</scala.version> <mysql.version>8.0.25</mysqlc.version> <lombok.version>1.18.12</lombok.version> </properties> <dependencies> <!-- 写入数据到doris --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version</version> </dependency> <!-- flink核心API --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> </dependencies>
User.java
package com.daniel.bean; import lombok.Builder; import lombok.Data; /** * @Author Daniel * @Date: 2023/7/3 15:35 * @Description **/ @Data @Builder public class User { public int id; public String name; public int age; }
DorisSinkFunction.java
package com.daniel.util; import com.daniel.bean.User; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; /** * @Author Daniel * @Date: 2023/7/3 15:36 * @Description **/ public class DorisSinkFunction extends RichSinkFunction<User> { Connection conn = null; String sql; public DorisSinkFunction(String sql) { this.sql = sql; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); conn = getConn("localhost", 9030, "demo"); } @Override public void close() throws Exception { super.close(); if (conn != null) { conn.close(); } } // 定义具体的操作 @Override public void invoke(User user, Context context) throws Exception { // 批量插入 PreparedStatement preparedStatement = conn.prepareStatement(sql); preparedStatement.setLong(1, user.id); preparedStatement.setString(2, user.name); preparedStatement.setLong(3, user.age); preparedStatement.addBatch(); long startTime = System.currentTimeMillis(); int[] batchResult = preparedStatement.executeBatch(); long endTime = System.currentTimeMillis(); System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + batchResult.length); } public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException { Class.forName("com.mysql.cj.jdbc.Driver"); String address = "jdbc:mysql://" + host + ":" + port + "/" + database; conn = DriverManager.getConnection(address, "root", ""); return conn; } }
open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。
invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。
close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。
DorisWriteTest.java
package com.daniel; import com.daniel.bean.User; import com.daniel.util.DorisSinkFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @Author Daniel * @Date: 2023/7/3 15:37 * @Description **/ public class DorisWriteTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // Source DataStream<String> ds = env.socketTextStream("localhost", 9999); // Transform SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> { String[] split = data.split(","); return User.builder() .id(Integer.parseInt(split[0])) .name(split[1]) .age(Integer.parseInt(split[2])) .build(); }); // Sink String sql = "INSERT INTO demo.user (id, name, age) VALUES (?,?,?)"; DorisSinkFunction jdbcSink = new DorisSinkFunction(sql); dataStream.addSink(jdbcSink); env.execute("flink-doris-write"); } }
使用nc或者任意工具向指定端口发送数据
例如
nc -L -p 9999
发送数据
1,Daniel,25
2,David,38
3,James,16
4,Robert,27
然后启动DorisWriteTest.java
程序
查询数据
select *
from demo.user;
由于这里是并行插入,所以没有顺序可言
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。