赞
踩
先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7
深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
import org.apache.arrow.vector.;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.sdk.thrift.;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;
public class QueryPlanMain {
private static String queryPlanUrlFormat = “%s/api/%s/%s/_query_plan”;
public static void main(String[] args) throws Exception {
String dorisUrl = "http://127.0.0.1:8030";
String username = "root";
String password = "1234564";
String database = "article";
String table = "dim_user";
// String[] columns = {“id”,“name”,“merger_name”,“area_code”,“level”};
String[] columns = {“*”};
String sql =“select “+String.join(”,”,columns)+" from “+database+”.“+table
// +” where id<= 10 "
// + " order by id " //不支持order by
// + " limit 10" //不支持limit
;
JSONObject queryPlanResult = getQueryPlan(dorisUrl,username,password,database,table,sql);
System.out.println(“查询计划执行结果:”+queryPlanResult);
//根据查询计划结果,查询数据
if (queryPlanResult != null){
readData(queryPlanResult,username,password,database,table);
}
}
private static JSONObject getQueryPlan(String dorisUrl, String username,String password,String database, String table, String sql) throws Exception { try (CloseableHttpClient client = HttpClients.custom().build()) { String queryPlanUrl = String.format(queryPlanUrlFormat, dorisUrl, database, table); HttpPost post = new HttpPost(queryPlanUrl); System.out.println("执行查询计划url: "+queryPlanUrl); post.setHeader(HttpHeaders.EXPECT, "100-continue"); post.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(username, password)); // The param is specific SQL, and the query plan is returned Map<String, String> params = new HashMap<>(); params.put("sql", sql); System.out.println("执行查询计划参数: "+ params); StringEntity entity = new StringEntity(JSON.toJSONString(params)); post.setEntity(entity); try (CloseableHttpResponse response = client.execute(post)) { if (response.getEntity() != null) { //解析查询计划 JSONObject queryPlanJSONObject = JSONObject.parseObject(EntityUtils.toString(response.getEntity())); System.out.println("执行查询计划response: " + queryPlanJSONObject); JSONObject dataJSONObject = queryPlanJSONObject.getJSONObject("data"); //查询计划异常返回exception信息 if (dataJSONObject.containsKey("exception")) { throw new RuntimeException(dataJSONObject.getString("exception")); } return dataJSONObject; } } } return null; } private static String basicAuthHeader(String username, String password) { final String info = username + ":" + password; byte[] encoded = Base64.encodeBase64(info.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } private static void readData(JSONObject data,String username,String password,String database,String table) throws Exception { String queryPlan = data.getString("opaqued_query_plan"); JSONObject partitions = data.getJSONObject("partitions"); long readTotal = 0L; long totalTime = 0L; for (Map.Entry<String, Object> tablet : partitions.entrySet()) { System.out.println("tablet信息:"+ tablet); long startTime = System.currentTimeMillis(); Long tabletId = Long.parseLong(tablet.getKey()); JSONObject value = JSONObject.parseObject(JSON.toJSONString(tablet.getValue())); //get first backend String routingsBackend = value.getJSONArray("routings").getString(0); String backendHost = routingsBackend.split(":")[0]; String backendPort = routingsBackend.split(":")[1]; //connect backend TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); TTransport transport = new TSocket(new TConfiguration(), backendHost, Integer.parseInt(backendPort)); TProtocol protocol = factory.getProtocol(transport); TDorisExternalService.Client client = new TDorisExternalService.Client(protocol); if (!transport.isOpen()) { transport.open(); } //build params TScanOpenParams params = new TScanOpenParams(); params.cluster = "default_cluster"; params.database = database; params.table = table; params.tablet_ids = Collections.singletonList(tabletId); params.opaqued_query_plan = queryPlan; // max row number of one read batch params.setBatchSize(1000); params.setQueryTimeout(3600); params.setMemLimit(2147483648L); params.setUser(username); params.setPasswd(password); //open scanner TScanOpenResult tScanOpenResult = client.openScanner(params); if (!TStatusCode.OK.equals(tScanOpenResult.getStatus().getStatusCode())) { throw new RuntimeException(String.format("The status of open scanner result from %s is '%s', error message is: %s.", routingsBackend, tScanOpenResult.getStatus().getStatusCode(), tScanOpenResult.getStatus().getErrorMsgs())); } TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); nextBatchParams.setContextId(tScanOpenResult.getContextId()); boolean eos = false; //read data int offset = 0; long nums = 0L; while (!eos) { nextBatchParams.setOffset(offset); TScanBatchResult next = client.getNext(nextBatchParams); if (!TStatusCode.OK.equals(next.getStatus().getStatusCode())) { throw new RuntimeException(String.format("The status of get next result from %s is '%s', error message is: %s.", routingsBackend, next.getStatus().getStatusCode(), next.getStatus().getErrorMsgs())); } eos = next.isEos(); if (!eos) { int i = convertArrow(next); offset += i; nums += i; } } readTotal += nums; long cost_time =(System.currentTimeMillis() - startTime); totalTime +=cost_time; System.out.println("tabletId["+tabletId+"]任务结束,总数据量:"+nums+",总花费:"+cost_time+"ms"); //close TScanCloseParams closeParams = new TScanCloseParams(); closeParams.setContextId(tScanOpenResult.getContextId()); client.closeScanner(closeParams); if (transport.isOpen()) { transport.close(); } } System.out.println("总任务结束,总数据量:"+readTotal+",总花费:"+totalTime+"ms"); } private static int convertArrow(TScanBatchResult nextResult) throws Exception { long startTime = System.currentTimeMillis(); int offset = 0; RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE); ArrowStreamReader arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(nextResult.getRows()), rootAllocator); VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot(); while (arrowStreamReader.loadNextBatch()) { List<FieldVector> fieldVectors = root.getFieldVectors(); //total data rows int rowCountInOneBatch = root.getRowCount(); //按行获取数据
// for (int row = 0; row < rowCountInOneBatch; row++) {
// List rowData = new ArrayList<>();
// for (FieldVector fieldVector : fieldVectors) {
// Types.MinorType minorType = fieldVector.getMinorType();
// Object v = convertValue(row, minorType, fieldVector);
// rowData.add(v);
// }
System.out.println(rowData);
// }
//按列获取数据
List rowData = new ArrayList<>();
for (FieldVector fieldVector : fieldVectors) {
Types.MinorType minorType = fieldVector.getMinorType();
List filedData = new ArrayList<>();
for (int row = 0; row < rowCountInOneBatch; row++) {
Object v = convertValue(row, minorType, fieldVector);
filedData.add(v);
}
// System.out.println(filedData);
}
offset += root.getRowCount();
}
//处理完之后要关闭,否则容易内存溢出 arrowStreamReader.close(); return offset; } private static Object convertValue(int rowIndex, Types.MinorType minorType, FieldVector fieldVector) { Object fieldValue; switch (minorType) { case BIT: BitVector bitVector = (BitVector) fieldVector; fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0; break; case TINYINT: TinyIntVector tinyIntVector = (TinyIntVector) fieldVector; fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex); break; case SMALLINT: SmallIntVector smallIntVector = (SmallIntVector) fieldVector; fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex); break; case INT: IntVector intVector = (IntVector) fieldVector; fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex); break; case BIGINT: BigIntVector bigIntVector = (BigIntVector) fieldVector; fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex); break; case FLOAT4: Float4Vector float4Vector = (Float4Vector) fieldVector; fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex); break; case FLOAT8: Float8Vector float8Vector = (Float8Vector) fieldVector; fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex); break; case VARBINARY: VarBinaryVector varBinaryVector = (VarBinaryVector) fieldVector; fieldValue = varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex); break; case DECIMAL: DecimalVector decimalVector = (DecimalVector) fieldVector; fieldValue = decimalVector.getObject(rowIndex).stripTrailingZeros(); break; case VARCHAR: VarCharVector date = (VarCharVector) fieldVector; fieldValue = new String(date.get(rowIndex)); break; case LIST: ListVector listVector = (ListVector) fieldVector; fieldValue = listVector.isNull(rowIndex) ? null : listVector.getObject(rowIndex); break; default: fieldValue = fieldVector.isNull(rowIndex) ? null : fieldVector.getObject(rowIndex); } return fieldValue; }
}
## 4.Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
val dorisSparkDF = spark.read.format(“doris”)
.option(“doris.table.identifier”, “
Y
O
U
R
D
O
R
I
S
D
A
T
A
B
A
S
E
N
A
M
E
.
YOUR_DORIS_DATABASE_NAME.
YOURDORISDATABASENAME.YOUR_DORIS_TABLE_NAME”)
.option(“doris.fenodes”, “
Y
O
U
R
D
O
R
I
S
F
E
H
O
S
T
N
A
M
E
:
YOUR_DORIS_FE_HOSTNAME:
YOURDORISFEHOSTNAME:YOUR_DORIS_FE_RESFUL_PORT”)
.option(“user”, “
Y
O
U
R
D
O
R
I
S
U
S
E
R
N
A
M
E
"
)
.
o
p
t
i
o
n
(
"
p
a
s
s
w
o
r
d
"
,
"
YOUR_DORIS_USERNAME") .option("password", "
YOURDORISUSERNAME").option("password","YOUR_DORIS_PASSWORD”)
.load()
dorisSparkDF.show(5)
## 5.Flink Doris Connector
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。本文档介绍Flink如何通过Datastream和SQL操作Doris。
DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(“FE_IP:HTTP_PORT”)
.setTableIdentifier(“db.table”)
.setUsername(“root”)
.setPassword(“password”);
DorisSource<List<?>> dorisSource = DorisSourceBuilder.
env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), “doris source”).print();
## 6.Arrow Flight SQL
Arrow Flight SQL 是一种使用 Arrow 内存格式和 Flight RPC 框架与 SQL 数据库交互的协议。在 Apache Doris 2.1 版本中,提供基于 Arrow Flight SQL 构建了高速数据传输链路,支持主流语言通过 SQL 从 Doris 高速读取大规模数据,极大提升了其他系统与 Apache Doris 间数据传输效率。
1)maven 引入flight-sql-jdbc-driver包:
<?xml version="1.0" encoding="UTF-8"?>
4.0.0
com.yichenkeji
yichen-demo
1.0
<artifactId>yichen-demo-doris</artifactId> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <lombok.version>1.18.26</lombok.version> <arrow.version>15.0.2</arrow.version> </properties> <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.arrow/flight-sql-jdbc-driver --> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>flight-sql-jdbc-driver</artifactId> <version>${arrow.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-jdbc --> <dependency> <groupId>org.apache.arrow</groupId> <artifactId>arrow-jdbc</artifactId> <version>${arrow.version}</version> </dependency> </dependencies>
2)JDBC编码实现
package com.yichenkeji.demo.doris;
import java.sql.*;
import java.util.Properties;
public class ArrowFlightSqlJdbc {
public static void main(String[] args) throws SQLException {
String jdbc_url = "jdbc:arrow-flight-sql://192.168.179.134:9040";
Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "");
//需要配置该参数,否则报错
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
.168.179.134:9040";
Properties prop = new Properties();
prop.put(“user”, “root”);
prop.put(“password”, “”);
//需要配置该参数,否则报错
网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。
需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
[外链图片转存中…(img-uvv0pRTw-1713119837686)]
一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。