当前位置:   article > 正文

Doris:读取Doris数据的N种方法_访问 doris,2024年最新大数据开发语言基础教程培训_arrowstreamreader close

arrowstreamreader close

先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7

深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
img

正文

import org.apache.arrow.vector.;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.
;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class QueryPlanMain {
private static String queryPlanUrlFormat = “%s/api/%s/%s/_query_plan”;

public static void main(String[] args) throws Exception {
    String dorisUrl = "http://127.0.0.1:8030";
    String username = "root";
    String password = "1234564";
    String database = "article";
    String table = "dim_user";
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

// String[] columns = {“id”,“name”,“merger_name”,“area_code”,“level”};
String[] columns = {“*”};
String sql =“select “+String.join(”,”,columns)+" from “+database+”.“+table
// +” where id<= 10 "
// + " order by id " //不支持order by
// + " limit 10" //不支持limit
;
JSONObject queryPlanResult = getQueryPlan(dorisUrl,username,password,database,table,sql);
System.out.println(“查询计划执行结果:”+queryPlanResult);
//根据查询计划结果,查询数据
if (queryPlanResult != null){
readData(queryPlanResult,username,password,database,table);
}
}

private static JSONObject getQueryPlan(String dorisUrl, String username,String password,String database, String table, String sql) throws Exception {
    try (CloseableHttpClient client = HttpClients.custom().build()) {

        String queryPlanUrl = String.format(queryPlanUrlFormat, dorisUrl, database, table);


        HttpPost post = new HttpPost(queryPlanUrl);
        System.out.println("执行查询计划url: "+queryPlanUrl);
        post.setHeader(HttpHeaders.EXPECT, "100-continue");
        post.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password));

        // The param is specific SQL, and the query plan is returned
        Map<String, String> params = new HashMap<>();
        params.put("sql", sql);
        System.out.println("执行查询计划参数: "+ params);
        StringEntity entity = new StringEntity(JSON.toJSONString(params));
        post.setEntity(entity);

        try (CloseableHttpResponse response = client.execute(post)) {

            if (response.getEntity() != null) {
                //解析查询计划
                JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity()));
                System.out.println("执行查询计划response: " + queryPlanJSONObject);
                JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data");
                //查询计划异常返回exception信息
                if (dataJSONObject.containsKey("exception")) {
                    throw new RuntimeException(dataJSONObject.getString("exception"));
                }
                return dataJSONObject;
            }
        }
    }
    return null;
}

private static String basicAuthHeader(String username, String password) {
    final String info = username + ":" + password;
    byte[] encoded = Base64.encodeBase64(info.getBytes(StandardCharsets.UTF_8));
    return "Basic " + new String(encoded);
}
private static void readData(JSONObject data,String username,String password,String database,String table) throws Exception {

    String queryPlan = data.getString("opaqued_query_plan");
    JSONObject partitions = data.getJSONObject("partitions");
    long readTotal = 0L;
    long totalTime = 0L;
    for (Map.Entry<String, Object> tablet : partitions.entrySet()) {
        System.out.println("tablet信息:"+ tablet);
        long startTime = System.currentTimeMillis();
        Long tabletId = Long.parseLong(tablet.getKey());
        JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue()));
        //get first backend
        String routingsBackend = value.getJSONArray("routings").getString(0);
        String backendHost = routingsBackend.split(":")[0];
        String backendPort = routingsBackend.split(":")[1];

        //connect backend
        TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
        TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort));
        TProtocol protocol = factory.getProtocol(transport);
        TDorisExternalService.Client client = new TDorisExternalService.Client(protocol);
        if (!transport.isOpen()) {
            transport.open();
        }

        //build params
        TScanOpenParams params = new TScanOpenParams();
        params.cluster = "default_cluster";
        params.database = database;
        params.table = table;
        params.tablet_ids = Collections.singletonList(tabletId);
        params.opaqued_query_plan = queryPlan;
        // max row number of one read batch
        params.setBatchSize(1000);
        params.setQueryTimeout(3600);
        params.setMemLimit(2147483648L);
        params.setUser(username);
        params.setPasswd(password);

        //open scanner
        TScanOpenResult tScanOpenResult = client.openScanner(params);
        if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) {
            throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.",
                    routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs()));
        }

        TScanNextBatchParams nextBatchParams = new TScanNextBatchParams();
        nextBatchParams.setContextId(tScanOpenResult.getContextId());
        boolean eos = false;
        //read data
        int offset = 0;
        long nums = 0L;
        while (!eos) {
            nextBatchParams.setOffset(offset);
            TScanBatchResult next = client.getNext(nextBatchParams);
            if (!TStatusCode.OK.equals(next.getStatus().getStatusCode())) {
                throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.",
                        routingsBackend, next.getStatus().getStatusCode(), next.getStatus().getErrorMsgs()));
            }
            eos = next.isEos();
            if (!eos) {
                int i = convertArrow(next);
                offset += i;
                nums += i;
            }
        }
        readTotal += nums;
        long cost_time =(System.currentTimeMillis() - startTime);
        totalTime +=cost_time;
        System.out.println("tabletId["+tabletId+"]任务结束,总数据量:"+nums+",总花费:"+cost_time+"ms");
        //close
        TScanCloseParams closeParams = new TScanCloseParams();
        closeParams.setContextId(tScanOpenResult.getContextId());
        client.closeScanner(closeParams);
        if (transport.isOpen()) {
            transport.close();
        }
    }
    System.out.println("总任务结束,总数据量:"+readTotal+",总花费:"+totalTime+"ms");

}


private static int convertArrow(TScanBatchResult nextResult) throws Exception {
    long startTime = System.currentTimeMillis();
    int offset = 0;
    RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
    ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator);
    VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
    while (arrowStreamReader.loadNextBatch()) {
        List<FieldVector> fieldVectors = root.getFieldVectors();
        //total data rows
        int rowCountInOneBatch = root.getRowCount();

        //按行获取数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136

// for (int row = 0; row < rowCountInOneBatch; row++) {
// List rowData = new ArrayList<>();
// for (FieldVector fieldVector : fieldVectors) {
// Types.MinorType minorType = fieldVector.getMinorType();
// Object v = convertValue(row, minorType, fieldVector);
// rowData.add(v);
// }
System.out.println(rowData);
// }
//按列获取数据
List rowData = new ArrayList<>();
for (FieldVector fieldVector : fieldVectors) {
Types.MinorType minorType = fieldVector.getMinorType();
List filedData = new ArrayList<>();
for (int row = 0; row < rowCountInOneBatch; row++) {
Object v = convertValue(row, minorType, fieldVector);
filedData.add(v);

            }
  • 1

// System.out.println(filedData);
}
offset += root.getRowCount();
}

    //处理完之后要关闭,否则容易内存溢出
    arrowStreamReader.close();

    return offset;
}

private static Object convertValue(int rowIndex,
                                   Types.MinorType minorType,
                                   FieldVector fieldVector) {
    Object fieldValue;
    switch (minorType) {
        case BIT:
            BitVector bitVector = (BitVector) fieldVector;
            fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
            break;
        case TINYINT:
            TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
            fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
            break;
        case SMALLINT:
            SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
            fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
            break;
        case INT:
            IntVector intVector = (IntVector) fieldVector;
            fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
            break;
        case BIGINT:
            BigIntVector bigIntVector = (BigIntVector) fieldVector;
            fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
            break;
        case FLOAT4:
            Float4Vector float4Vector = (Float4Vector) fieldVector;
            fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
            break;
        case FLOAT8:
            Float8Vector float8Vector = (Float8Vector) fieldVector;
            fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
            break;
        case VARBINARY:
            VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector;
            fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
            break;
        case DECIMAL:
            DecimalVector decimalVector = (DecimalVector) fieldVector;
            fieldValue = decimalVector.getObject(rowIndex).stripTrailingZeros();
            break;
        case VARCHAR:
            VarCharVector date = (VarCharVector) fieldVector;
            fieldValue = new String(date.get(rowIndex));
            break;
        case LIST:
            ListVector listVector = (ListVector) fieldVector;
            fieldValue = listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex);
            break;
        default:
            fieldValue = fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex);
    }
    return fieldValue;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

}


## 4.Spark Doris Connector


        Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

val dorisSparkDF = spark.read.format(“doris”)
.option(“doris.table.identifier”, “ Y O U R D O R I S D A T A B A S E N A M E . YOUR_DORIS_DATABASE_NAME. YOURDORISDATABASENAME.YOUR_DORIS_TABLE_NAME”)
.option(“doris.fenodes”, “ Y O U R D O R I S F E H O S T N A M E : YOUR_DORIS_FE_HOSTNAME: YOURDORISFEHOSTNAME:YOUR_DORIS_FE_RESFUL_PORT”)
.option(“user”, “ Y O U R D O R I S U S E R N A M E " ) . o p t i o n ( " p a s s w o r d " , " YOUR_DORIS_USERNAME") .option("password", " YOURDORISUSERNAME").option("password","YOUR_DORIS_PASSWORD”)
.load()

dorisSparkDF.show(5)


## 5.Flink Doris Connector


        Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(“FE_IP:HTTP_PORT”)
.setTableIdentifier(“db.table”)
.setUsername(“root”)
.setPassword(“password”);

DorisSource<List<?>> dorisSource = DorisSourceBuilder.

env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), “doris source”).print();


## 6.Arrow Flight SQL


        Arrow Flight SQL 是一种使用 Arrow 内存格式和 Flight RPC 框架与 SQL 数据库交互的协议。在 Apache Doris 2.1 版本中,提供基于 Arrow Flight SQL 构建了高速数据传输链路,支持主流语言通过 SQL 从 Doris 高速读取大规模数据,极大提升了其他系统与 Apache Doris 间数据传输效率。


1)maven 引入flight-sql-jdbc-driver包:



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

<?xml version="1.0" encoding="UTF-8"?>


4.0.0

com.yichenkeji
yichen-demo
1.0

<artifactId>yichen-demo-doris</artifactId>

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <lombok.version>1.18.26</lombok.version>
    <arrow.version>15.0.2</arrow.version>
</properties>

<dependencies>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
        <scope>provided</scope>
    </dependency>


    <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql-jdbc-driver -->
    <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>flight-sql-jdbc-driver</artifactId>
        <version>${arrow.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-jdbc -->
    <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-jdbc</artifactId>
        <version>${arrow.version}</version>
    </dependency>

</dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34


2)JDBC编码实现



  • 1
  • 2
  • 3
  • 4
  • 5

package com.yichenkeji.demo.doris;

import java.sql.*;
import java.util.Properties;

public class ArrowFlightSqlJdbc {

public static void main(String[] args) throws SQLException {
    String jdbc_url = "jdbc:arrow-flight-sql://192.168.179.134:9040";
    Properties prop = new Properties();
    prop.put("user", "root");
    prop.put("password", "");
    //需要配置该参数,否则报错
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
img

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

.168.179.134:9040";
Properties prop = new Properties();
prop.put(“user”, “root”);
prop.put(“password”, “”);
//需要配置该参数,否则报错

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
[外链图片转存中…(img-uvv0pRTw-1713119837686)]

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/892238
推荐阅读
相关标签
  

闽ICP备14008679号