当前位置:   article > 正文

StarRocks分布式元数据源码解析_starrocks源码

starrocks源码

1. 支持元数据表

https://github.com/StarRocks/starrocks/pull/44276/files

核心类:LogicalIcebergMetadataTable,Iceberg元数据表,将元数据的各个字段做成表的列,后期可以通过sql操作从元数据获取字段,这个表的组成字段是DataFile相关的字段

  1. public static LogicalIcebergMetadataTable create(String catalogName, String originDb, String originTable) {
  2.     return new LogicalIcebergMetadataTable(catalogName,
  3.             ConnectorTableId.CONNECTOR_ID_GENERATOR.getNextId().asInt(),
  4.             ICEBERG_LOGICAL_METADATA_TABLE_NAME,
  5.             Table.TableType.METADATA,
  6.             builder()
  7.                     .columns(PLACEHOLDER_COLUMNS)
  8.                     .column("content", ScalarType.createType(PrimitiveType.INT))
  9.                     .column("file_path", ScalarType.createVarcharType())
  10.                     .column("file_format", ScalarType.createVarcharType())
  11.                     .column("spec_id", ScalarType.createType(PrimitiveType.INT))
  12.                     .column("partition_data", ScalarType.createType(PrimitiveType.VARBINARY))
  13.                     .column("record_count", ScalarType.createType(PrimitiveType.BIGINT))
  14.                     .column("file_size_in_bytes", ScalarType.createType(PrimitiveType.BIGINT))
  15.                     .column("split_offsets", ARRAY_BIGINT)
  16.                     .column("sort_id", ScalarType.createType(PrimitiveType.INT))
  17.                     .column("equality_ids", ARRAY_INT)
  18.                     .column("file_sequence_number", ScalarType.createType(PrimitiveType.BIGINT))
  19.                     .column("data_sequence_number", ScalarType.createType(PrimitiveType.BIGINT))
  20.                     .column("column_stats", ScalarType.createType(PrimitiveType.VARBINARY))
  21.                     .column("key_metadata", ScalarType.createType(PrimitiveType.VARBINARY))
  22.                     .build(),
  23.             originDb,
  24.             originTable,
  25.             MetadataTableType.LOGICAL_ICEBERG_METADATA);
  26. }

2. Iceberg表扫描

https://github.com/StarRocks/starrocks/pull/44313

核心类:StarRocksIcebergTableScan,扫描Iceberg表的实现类,基于Iceberg的上层接口实现,类似Iceberg默认提供的DataTableScan,doPlanFiles中定义实际的元数据文件扫描逻辑

这一块应当属于数据上层扫描逻辑

  1. protected CloseableIterable<FileScanTask> doPlanFiles() {
  2.     List<ManifestFile> dataManifests = findMatchingDataManifests(snapshot());
  3.     List<ManifestFile> deleteManifests = findMatchingDeleteManifests(snapshot());
  4.     boolean mayHaveEqualityDeletes = !deleteManifests.isEmpty() && mayHaveEqualityDeletes(snapshot());
  5.     boolean loadColumnStats = mayHaveEqualityDeletes || shouldReturnColumnStats();
  6.     if (shouldPlanLocally(dataManifests, loadColumnStats)) {
  7.         return planFileTasksLocally(dataManifests, deleteManifests);
  8.     } else {
  9.         return planFileTasksRemotely(dataManifests, deleteManifests);
  10.     }
  11. }

3. Iceberg元数据信息接口

[Feature] Introduce meta spec interface by stephen-shelby · Pull Request #44527 · StarRocks/starrocks · GitHub

核心类:IcebergMetaSpec,Iceberg元数据描述,核心是RemoteMetaSplit的一个List,代表了元数据文件的列表,基于这个做分布式解析

这一块应当属于元数据文件的切片逻辑

  1. public List<RemoteMetaSplit> getSplits() {
  2.     return splits;
  3. }

4. Iceberg元数据扫描节点

https://github.com/StarRocks/starrocks/pull/44581

核心类:IcebergMetadataScanNode,Iceberg元数据的扫描节点,袭乘自PlanNode类,主要是把上节的RemoteMetaSplit放到StarRocks的执行结构当中

这一块属于Iceberg逻辑向StarRocks逻辑的中间转换层

  1. private void addSplitScanRangeLocations(RemoteMetaSplit split) {
  2.     TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
  3.     THdfsScanRange hdfsScanRange = new THdfsScanRange();
  4.     hdfsScanRange.setUse_iceberg_jni_metadata_reader(true);
  5.     hdfsScanRange.setSerialized_split(split.getSerializeSplit());
  6.     hdfsScanRange.setFile_length(split.length());
  7.     hdfsScanRange.setLength(split.length());
  8.     // for distributed scheduler
  9.     hdfsScanRange.setFull_path(split.path());
  10.     hdfsScanRange.setOffset(0);
  11.     TScanRange scanRange = new TScanRange();
  12.     scanRange.setHdfs_scan_range(hdfsScanRange);
  13.     scanRangeLocations.setScan_range(scanRange);
  14.     TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress("-1", -1));
  15.     scanRangeLocations.addToLocations(scanRangeLocation);
  16.     result.add(scanRangeLocations);
  17. }

5. Iceberg元数据读取

https://github.com/StarRocks/starrocks/pull/44632

核心类:IcebergMetadataScanner,这个应该是Iceberg元数据的实际读取类,实现自StarRocks的ConnectorScanner

ConnectorScanner是StarRocks的设计的介于C++-based的BE和Java-based的大数据组件之间的JNI抽象中间层,可以直接复用Java SDK,规避了对BE代码的侵入以及使用C++访问大数据存储的诸多不便

这一块属于时实际元数据文件读取的Java侧代码

image.png

  1. public int getNext() throws IOException {
  2.     try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
  3.         int numRows = 0;
  4.         for (; numRows < getTableSize(); numRows++) {
  5.             if (!reader.hasNext()) {
  6.                 break;
  7.             }
  8.             ContentFile<?> file = reader.next();
  9.             for (int i = 0; i < requiredFields.length; i++) {
  10.                 Object fieldData = get(requiredFields[i], file);
  11.                 if (fieldData == null) {
  12.                     appendData(i, null);
  13.                 } else {
  14.                     ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData);
  15.                     appendData(i, fieldValue);
  16.                 }
  17.             }
  18.         }
  19.         return numRows;
  20.     } catch (Exception e) {
  21.         close();
  22.         LOG.error("Failed to get the next off-heap table chunk of iceberg metadata.", e);
  23.         throw new IOException("Failed to get the next off-heap table chunk of iceberg metadata.", e);
  24.     }
  25. }

    这一块目前没有找到Java侧的上层调用,应该在C++中调用,如下,其构造类是在C++中的

  1. // ---------------iceberg metadata jni scanner------------------
  2. std::unique_ptr<JniScanner> create_iceberg_metadata_jni_scanner(const JniScanner::CreateOptions& options) {
  3.     const auto& scan_range = *(options.scan_range);
  4.     ;
  5.     const auto* hdfs_table = dynamic_cast<const IcebergMetadataTableDescriptor*>(options.hive_table);
  6.     std::map<std::string, std::string> jni_scanner_params;
  7.     jni_scanner_params["required_fields"] = hdfs_table->get_hive_column_names();
  8.     jni_scanner_params["metadata_column_types"] = hdfs_table->get_hive_column_types();
  9.     jni_scanner_params["serialized_predicate"] = options.scan_node->serialized_predicate;
  10.     jni_scanner_params["serialized_table"] = options.scan_node->serialized_table;
  11.     jni_scanner_params["split_info"] = scan_range.serialized_split;
  12.     jni_scanner_params["load_column_stats"] = options.scan_node->load_column_stats ? "true" : "false";
  13.     const std::string scanner_factory_class = "com/starrocks/connector/iceberg/IcebergMetadataScannerFactory";
  14.     return std::make_unique<JniScanner>(scanner_factory_class, jni_scanner_params);
  15. }

6. 元数据收集任务

https://github.com/StarRocks/starrocks/pull/44679/files

核心类:IcebergMetadataCollectJob,Iceberg元数据的收集类,实现自MetadataCollectJob,目前看就是通过执行SQL语句,从前文的LogicalIcebergMetadataTable表当中获取数据

这一块属于最终的元数据收集

  1. private static final String ICEBERG_METADATA_TEMPLATE = "SELECT content" + // INTEGER
  2.         ", file_path" + // VARCHAR
  3.         ", file_format" + // VARCHAR
  4.         ", spec_id" + // INTEGER
  5.         ", partition_data" + // BINARY
  6.         ", record_count" + // BIGINT
  7.         ", file_size_in_bytes" + // BIGINT
  8.         ", split_offsets" + // ARRAY<BIGINT>
  9.         ", sort_id" + // INTEGER
  10.         ", equality_ids" + // ARRAY<INTEGER>
  11.         ", file_sequence_number" + // BIGINT
  12.         ", data_sequence_number " + // BIGINT
  13.         ", column_stats " + // BINARY
  14.         ", key_metadata " + // BINARY
  15.         "FROM `$catalogName`.`$dbName`.`$tableName$logical_iceberg_metadata` " +
  16.         "FOR VERSION AS OF $snapshotId " +
  17.         "WHERE $predicate'";

7. 流程梳理

image.png

1. IcebergMetadataCollectJob的调用

    IcebergMetadataCollectJob -> StarRocksIcebergTableScan.planFileTasksRemotely -> StarRocksIcebergTableScan.doPlanFiles -> 由Iceberg定义的TableScan流程触发

2. StarRocksIcebergTableScan的构建

    StarRocksIcebergTableScan -> IcebergCatalog.getTableScan -> IcebergMetadata.collectTableStatisticsAndCacheIcebergSplit -> prepareMetadata()和triggerIcebergPlanFilesIfNeeded()

    prepareMetadata()线路由PrepareCollectMetaTask任务触发,其执行逻辑中调用了prepareMetadata()接口。PrepareCollectMetaTask是OptimizerTask的子类,属于StarRocks优化器的一环,在Optimizer类执行优化的时候会。这一块属于CBO优化,默认是false,没找到设置成true的地方,目前应该没有启用

    triggerIcebergPlanFilesIfNeeded()路线有几个调用的地方,主路线应该是getRemoteFileInfos(),其他两个看内容属于统计信息之类的信息收集

    IcebergMetadata.getRemoteFileInfos -> IcebergScanNode.setupScanRangeLocations -> PlanFragmentBuilder.visitPhysicalIcebergScan -> PhysicalIcebergScanOperator

    这一条调用链最终源头到PhysicalIcebergScanOperator,这个应当是IcebergScanNode经过SQL计划转换后的实际执行节点类

3. 元数据扫描

    IcebergMetaSpec -> IcebergMetadata.getSerializedMetaSpec -> MetadataMgr.getSerializedMetaSpec -> IcebergMetadataScanNode.setupScanRangeLocations -> PlanFragmentBuilder.visitPhysicalIcebergMetadataScan -> PhysicalIcebergMetadataScanOperator

    元数据扫描这一块源头最终走到PhysicalIcebergMetadataScanOperator,也就是IcebergMetadataScanNode对应的执行类

4. 元数据扫描和数据扫描的逻辑关系

    目前整体流程在最上层就差PhysicalIcebergMetadataScanOperator和PhysicalIcebergScanOperator的逻辑关系,这个逻辑在StarRocks的SQL到执行计划的转换过程当中

    往上追踪到BackendSelectorFactory,注意这里有两个扫描节点的分配策略:LocalFragmentAssignmentStrategy、RemoteFragmentAssignmentStrategy。根据类的说明,最左节点为scanNode的时候,使用LocalFragmentAssignmentStrategy,它首先将扫描范围分配给 worker,然后将分配给每个 worker 的扫描范围分派给片段实例

    在LocalFragmentAssignmentStrategy的assignFragmentToWorker当中可以看到入参包含很多scanNode,追踪上层到CoordinatorPreprocessor,scanNode的来源是StarRocks的DAG图。这之后的源头就涉及到任务解析和DAG图的顺序构建,应当是先扫描元数据再扫描数据这样构建

  1. for (ExecutionFragment execFragment : executionDAG.getFragmentsInPostorder()) {
  2.     fragmentAssignmentStrategyFactory.create(execFragment, workerProvider).assignFragmentToWorker(execFragment);
  3. }

8. 代码解析

1. 元数据扫描

  • LogicalIcebergMetadataTable

    首先从PhysicalIcebergMetadataScanOperator出发,访问者模式调用接口accept,走到PlanFragmentBuilder.visitPhysicalIcebergMetadataScan

    这里首先跟LogicalIcebergMetadataTable关联了起来,这里PhysicalIcebergMetadataScanOperator里包含的表是LogicalIcebergMetadataTable表

    LogicalIcebergMetadataTable的初始创建根据调用链追踪应当由CatalogMgr.createCatalog触发

  1. PhysicalIcebergMetadataScanOperator node = (PhysicalIcebergMetadataScanOperator) optExpression.getOp();
  2. LogicalIcebergMetadataTable table = (LogicalIcebergMetadataTable) node.getTable();
  • IcebergMetadataScanNode

    中间经历一些列的设置,之后构建了IcebergMetadataScanNode

  1. IcebergMetadataScanNode metadataScanNode =
  2.         new IcebergMetadataScanNode(context.getNextNodeId(), tupleDescriptor,
  3.                 "IcebergMetadataScanNode", node.getTemporalClause());

    构建之后调用了setupScanRangeLocations,走到了IcebergMetadataScanNode的类逻辑,首先获取元数据文件的分片信息

  1. IcebergMetaSpec serializedMetaSpec = GlobalStateMgr.getCurrentState().getMetadataMgr()
  2.         .getSerializedMetaSpec(catalogName, originDbName, originTableName, snapshotId, icebergPredicate).cast();
  • IcebergMetadata

    这段逻辑跟IcebergMetadata关联了起来,调用其getSerializedMetaSpec接口,接口中就是获取Iceberg的元数据文件,中间经历了一定的过滤

  1. List<ManifestFile> dataManifests = snapshot.dataManifests(nativeTable.io());
  2. List<ManifestFile> matchingDataManifests = filterManifests(dataManifests, nativeTable, predicate);
  3. for (ManifestFile file : matchingDataManifests) {
  4.     remoteMetaSplits.add(IcebergMetaSplit.from(file));
  5. }

    获取分片之后就是按StarRocks的扫描结构组装TScanRangeLocations,最终在实际执行时分布式分配解析

  1. private void addSplitScanRangeLocations(RemoteMetaSplit split) {
  2.     TScanRangeLocations scanRangeLocations = new TScanRangeLocations();
  3.     THdfsScanRange hdfsScanRange = new THdfsScanRange();
  4.     hdfsScanRange.setUse_iceberg_jni_metadata_reader(true);
  5.     hdfsScanRange.setSerialized_split(split.getSerializeSplit());
  6.     hdfsScanRange.setFile_length(split.length());
  7.     hdfsScanRange.setLength(split.length());
  8.     // for distributed scheduler
  9.     hdfsScanRange.setFull_path(split.path());
  10.     hdfsScanRange.setOffset(0);
  11.     TScanRange scanRange = new TScanRange();
  12.     scanRange.setHdfs_scan_range(hdfsScanRange);
  13.     scanRangeLocations.setScan_range(scanRange);
  14.     TScanRangeLocation scanRangeLocation = new TScanRangeLocation(new TNetworkAddress("-1", -1));
  15.     scanRangeLocations.addToLocations(scanRangeLocation);
  16.     result.add(scanRangeLocations);
  17. }
  • PlanFragment 

    visitPhysicalIcebergMetadataScan接口最终组装的是一个PlanFragment,这大体类似于Spark的stage,是物理执行计划的计划块

  1. PlanFragment fragment =
  2.         new PlanFragment(context.getNextFragmentId(), metadataScanNode, DataPartition.RANDOM);
  3. context.getFragments().add(fragment);
  4. return fragment
  • IcebergMetadataScanner

    IcebergMetadataScanner由于其调用逻辑来自于C++的代码,暂未梳理其逻辑,但是假定其执行了,可以看其效果,主要在getNext()接口中读取数据

    可以看到其读取后的数据结构是ContentFile,是Iceberg中DataFile的上层父类

  1. ContentFile<?> file = reader.next();
  2. for (int i = 0; i < requiredFields.length; i++) {
  3.     Object fieldData = get(requiredFields[i], file);
  4.     if (fieldData == null) {
  5.         appendData(i, null);
  6.     } else {
  7.         ColumnValue fieldValue = new IcebergMetadataColumnValue(fieldData);
  8.         appendData(i, fieldValue);
  9.     }
  10. }

    主要在appendData接口当中,向表添加数据,可以看到这里设置了一个offHeapTable

    offHeapTable是 StarRocks 中的一个特殊表类型,简单来说就是在堆外内存中建立一个表结构,将数据对应存储到堆外内存,之后可以以表形式去访问

  1. protected void appendData(int index, ColumnValue value) {
  2.     offHeapTable.appendData(index, value);
  3. }

2. 数据扫描中的元数据解析

    首先同样到PlanFragmentBuilder.visitPhysicalIcebergScan,流程与visitPhysicalIcebergMetadataScan类似

    首先是这里的表是数据表

  1. Table referenceTable = node.getTable();
  2. context.getDescTbl().addReferencedTable(referenceTable);
  3. TupleDescriptor tupleDescriptor = context.getDescTbl().createTupleDescriptor();
  4. tupleDescriptor.setTable(referenceTable);
  5. // set slot
  6. prepareContextSlots(node, context, tupleDescriptor);

    之后是IcebergScanNode

  1. IcebergScanNode icebergScanNode =
  2.         new IcebergScanNode(context.getNextNodeId(), tupleDescriptor, "IcebergScanNode",
  3.                 equalityDeleteTupleDesc);

    IcebergScanNode这里核心是调用setupScanRangeLocations

icebergScanNode.setupScanRangeLocations(context.getDescTbl());

    最终同样封装成PlanFragment

  1. PlanFragment fragment =
  2.         new PlanFragment(context.getNextFragmentId(), icebergScanNode, DataPartition.RANDOM);
  3. context.getFragments().add(fragment);
  4. return fragment;
  • IcebergScanNode

    在setupScanRangeLocations当中,有一个操作是getRemoteFileInfos,这个就是获取数据文件信息,因此内部包含了元数据解析的部分

  1. List<RemoteFileInfo> splits = GlobalStateMgr.getCurrentState().getMetadataMgr().getRemoteFileInfos(
  2.         catalogName, icebergTable, null, snapshotId, predicate, null, -1);
  • IcebergMetadata

    getRemoteFileInfos是在IcebergMetadata当中,会调用triggerIcebergPlanFilesIfNeeded,看接口名字可以明确这是用来触发Iceberg的元数据解析的,最终走到了collectTableStatisticsAndCacheIcebergSplit

  1. private void triggerIcebergPlanFilesIfNeeded(IcebergFilter key, IcebergTable table, ScalarOperator predicate,
  2.                                              long limit, Tracers tracers, ConnectContext connectContext) {
  3.     if (!scannedTables.contains(key)) {
  4.         tracers = tracers == null ? Tracers.get() : tracers;
  5.         try (Timer ignored = Tracers.watchScope(tracers, EXTERNAL, "ICEBERG.processSplit." + key)) {
  6.             collectTableStatisticsAndCacheIcebergSplit(table, predicate, limit, tracers, connectContext);
  7.         }
  8.     }
  9. }

    collectTableStatisticsAndCacheIcebergSplit当中获取了TableScan,这里的Scan就是StarRocksIcebergTableScan

  1. TableScan scan = icebergCatalog.getTableScan(nativeTbl, new StarRocksIcebergTableScanContext(
  2.         catalogName, dbName, tableName, planMode(connectContext), connectContext))
  3.         .useSnapshot(snapshotId)
  4.         .metricsReporter(metricsReporter)
  5.         .planWith(jobPlanningExecutor);
  • StarRocksIcebergTableScan

    之后走scan.planFiles(),这个中间会基于Iceberg的逻辑进行调用

  1. CloseableIterable<FileScanTask> fileScanTaskIterable = TableScanUtil.splitFiles(
  2.         scan.planFiles(), scan.targetSplitSize());

    Icberg的逻辑中planFiles最终会调用TableScan的doPlanFiles,这里调用的就是StarRocksIcebergTableScan的实现接口,根据场景有本地和远程的调用方式

  1. if (shouldPlanLocally(dataManifests, loadColumnStats)) {
  2.     return planFileTasksLocally(dataManifests, deleteManifests);
  3. else {
  4.     return planFileTasksRemotely(dataManifests, deleteManifests);
  5. }

    Iceberg应当是使用的planFileTasksRemotely,内部会构建IcebergMetadataCollectJob

  1. MetadataCollectJob metadataCollectJob = new IcebergMetadataCollectJob(
  2.         catalogName, dbName, tableName, TResultSinkType.METADATA_ICEBERG, snapshotId(), icebergSerializedPredicate);
  3. metadataCollectJob.init(connectContext.getSessionVariable());
  4. long currentTimestamp = System.currentTimeMillis();
  5. String threadNamePrefix = String.format("%s-%s-%s-%d", catalogName, dbName, tableName, currentTimestamp);
  6. executeInNewThread(threadNamePrefix + "-fetch_result", metadataCollectJob::asyncCollectMetadata);
  • MetadataExecutor执行

    IcebergMetadataCollectJob的执行在MetadataExecutor当中,就是基本的SQL执行,这里是异步的

  1. public void asyncExecuteSQL(MetadataCollectJob job) {
  2.     ConnectContext context = job.getContext();
  3.     context.setThreadLocalInfo();
  4.     String sql = job.getSql();
  5.     ExecPlan execPlan;
  6.     StatementBase parsedStmt;
  7.     try {
  8.         parsedStmt = SqlParser.parseOneWithStarRocksDialect(sql, context.getSessionVariable());
  9.         execPlan = StatementPlanner.plan(parsedStmt, context, job.getSinkType());
  10.     } catch (Exception e) {
  11.         context.getState().setError(e.getMessage());
  12.         return;
  13.     }
  14.     this.executor = new StmtExecutor(context, parsedStmt);
  15.     context.setExecutor(executor);
  16.     context.setQueryId(UUIDUtil.genUUID());
  17.     context.getSessionVariable().setEnableMaterializedViewRewrite(false);
  18.     LOG.info("Start to execute metadata collect job on {}.{}.{}", job.getCatalogName(), job.getDbName(), job.getTableName());
  19.     executor.executeStmtWithResultQueue(context, execPlan, job.getResultQueue());
  20. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/993029
推荐阅读
相关标签
  

闽ICP备14008679号