当前位置:   article > 正文

Flink写入数据到Doris_flink 写入doris 分区

flink 写入doris 分区

1.Doris建表

Doris中建表

CREATE TABLE IF NOT EXISTS demo.user
(
 `id`   INT NOT NULL,
 `name` VARCHAR(255),
 `age`  INT
) DISTRIBUTED BY HASH(`id`)
PROPERTIES (
 "replication_num" = "1"
);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.Doris依赖

Flink开发相关依赖

    <properties>
        <flink.version>1.12.1</flink.version>
        <scala.version>2.12.13</scala.version>
        <mysql.version>8.0.25</mysqlc.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- 写入数据到doris -->
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <version>${mysql.version</version>
         </dependency>
        <!-- flink核心API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

3.Bean实体类

User.java

package com.daniel.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/

@Data
@Builder
public class User {
    public int id;
    public String name;
    public int age;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

4.Doris业务写入逻辑

DorisSinkFunction.java

package com.daniel.util;

import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
* @Author Daniel
* @Date: 2023/7/3 15:36
* @Description
**/


public class DorisSinkFunction extends RichSinkFunction<User> {
 Connection conn = null;
 String sql;

 public DorisSinkFunction(String sql) {
     this.sql = sql;
 }

 @Override
 public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     conn = getConn("localhost", 9030, "demo");
 }

 @Override
 public void close() throws Exception {
     super.close();
     if (conn != null) {
         conn.close();
     }
 }

 // 定义具体的操作
 @Override
 public void invoke(User user, Context context) throws Exception {
     // 批量插入
     PreparedStatement preparedStatement = conn.prepareStatement(sql);
     preparedStatement.setLong(1, user.id);
     preparedStatement.setString(2, user.name);
     preparedStatement.setLong(3, user.age);
     preparedStatement.addBatch();

     long startTime = System.currentTimeMillis();
     int[] batchResult = preparedStatement.executeBatch();
     long endTime = System.currentTimeMillis();
     System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + batchResult.length);
 }

 public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
     Class.forName("com.mysql.cj.jdbc.Driver");
     String address = "jdbc:mysql://" + host + ":" + port + "/" + database;
     conn = DriverManager.getConnection(address, "root", "");
     return conn;
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

5.测试写入类

DorisWriteTest.java

package com.daniel;

import com.daniel.bean.User;
import com.daniel.util.DorisSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
* @Author Daniel
* @Date: 2023/7/3 15:37
* @Description
**/

public class DorisWriteTest {
 public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
     // Source
     DataStream<String> ds = env.socketTextStream("localhost", 9999);

     // Transform
     SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
         String[] split = data.split(",");
         return User.builder()
                 .id(Integer.parseInt(split[0]))
                 .name(split[1])
                 .age(Integer.parseInt(split[2]))
                 .build();
     });

     // Sink
     String sql = "INSERT INTO demo.user (id, name, age) VALUES (?,?,?)";
     DorisSinkFunction jdbcSink = new DorisSinkFunction(sql);
     dataStream.addSink(jdbcSink);
     env.execute("flink-doris-write");
 }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

6.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999
  • 1

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27
  • 1
  • 2
  • 3
  • 4

然后启动DorisWriteTest.java程序

在这里插入图片描述
查询数据

 select *
 from demo.user;
  • 1
  • 2

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/765704
推荐阅读
相关标签
  

闽ICP备14008679号