当前位置:   article > 正文

Iceberg源码学习:flink写iceberg流程

iceberg源码

开始实例

flink支持DataStream和DataStream写入iceberg

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input, FLINK_SCHEMA)
    .tableLoader(tableLoader)
    .writeParallelism(1)
    .build();
env.execute("Test Iceberg DataStream");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

input为DataStream和DataStream形式的输入流,FLINK_SCHEMA为TableSchema;

首先看build()方法:

public DataStreamSink<RowData> build() {
            Preconditions.checkArgument(this.rowDataInput != null, "Please use forRowData() to initialize the input DataStream.");
            Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null");
            if (this.table == null) {
                this.tableLoader.open();

                try {
                    TableLoader loader = this.tableLoader;
                    Throwable var2 = null;

                    try {
                        this.table = loader.loadTable();
                    } catch (Throwable var12) {
                        var2 = var12;
                        throw var12;
                    } finally {
                        if (loader != null) {
                            if (var2 != null) {
                                try {
                                    loader.close();
                                } catch (Throwable var11) {
                                    var2.addSuppressed(var11);
                                }
                            } else {
                                loader.close();
                            }
                        }

                    }
                } catch (IOException var14) {
                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, var14);
                }
            }

            List<Integer> equalityFieldIds = Lists.newArrayList();
            if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) {
                Iterator var16 = this.equalityFieldColumns.iterator();

                while(var16.hasNext()) {
                    String column = (String)var16.next();
                    NestedField field = this.table.schema().findField(column);
                    Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, this.table.schema());
                    equalityFieldIds.add(field.fieldId());
                }
            }

            RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
            this.rowDataInput = this.distributeDataStream(this.rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType);
            IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
            this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism;
            DataStream<Void> returnStream = this.rowDataInput.transform(FlinkSink.ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter).setParallelism(this.writeParallelism).transform(FlinkSink.ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1);
            return returnStream.addSink(new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

此处创建写的iceberg核心算子IcebergStreamWriter和IcebergFilesCommitter

IcebergStreamWriter

IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
  • 1

build()方法中,调用createStreamWriter()创建IcebergStreamWriter

static IcebergStreamWriter<RowData> createStreamWriter(Table table, RowType flinkRowType, List<Integer> equalityFieldIds) {
        Map<String, String> props = table.properties();
        long targetFileSize = getTargetFileSizeBytes(props);
        FileFormat fileFormat = getFileFormat(props);
        TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, equalityFieldIds);
        return new IcebergStreamWriter(table.name(), taskWriterFactory);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

根据表信息构建TaskWriterFactory,并传入到IcebergStreamWriter

class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final String fullTableName;
    private final TaskWriterFactory<T> taskWriterFactory;
    private transient TaskWriter<T> writer;
    private transient int subTaskId;
    private transient int attemptId;

    IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> taskWriterFactory) {
        this.fullTableName = fullTableName;
        this.taskWriterFactory = taskWriterFactory;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() {
        this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        this.attemptId = this.getRuntimeContext().getAttemptNumber();
        this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);
        this.writer = this.taskWriterFactory.create();
    }
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.emit(this.writer.complete());
        this.writer = this.taskWriterFactory.create();
    }

    public void processElement(StreamRecord<T> element) throws Exception {
        this.writer.write(element.getValue());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

在open中通过传入的taskWriterFactory构建TaskWriter

public TaskWriter<RowData> create() {
        Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");
        if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {
            return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));
        } else {
            return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
四个类的调用关系:

指定字段:

UnpartitionedDeltaWriter -> BaseEqualityDeltaWriter.write() -> RollingFileWriter.write() -> appender.add()
PartitionedDeltaWriter -> BaseDeltaTaskWriter.write() -> RollingFileWriter.write() -> appender.add()

未指定字段:

UnpartitionedWriter -> RollingFileWriter.write() -> appender.add()
RowDataPartitionedFanoutWriter -> BaseRollingWriter.write -> RollingFileWriter.write() -> appender.add()

底层调用的appender为创建TaskWriter传入的FlinkAppenderFactory创建的

在processElement()中调用write(element.getValue())方法,将数据写入,最后在checkpoint时提交。

提示:task执行三部曲:beforeInvoke() -> runMailboxLoop() -> afterInvoke()
beforeInvoke调用open()和initializeState(),runMailboxLoop调用processElement()处理数据

IcebergFilesCommitter

IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
  • 1

build()方法中,传入tableLoader和overwrite直接创建IcebergFilesCommitter。
checkpoint初始化操作在IcebergFilesCommitter的initializeState()

public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        int attemptId = this.getRuntimeContext().getAttemptNumber();
        this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, (long)attemptId);
        this.maxCommittedCheckpointId = -1L;
        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
        if (context.isRestored()) {
            String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next();
            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
            this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, restoredFlinkJobId);
            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
            if (!uncommittedDataFiles.isEmpty()) {
                long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
                this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
            }
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

checkpoint提交流程在IcebergFilesCommitter的snapshotState中

public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        long checkpointId = context.getCheckpointId();
        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, checkpointId);
        this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
        this.checkpointsState.clear();
        this.checkpointsState.add(this.dataFilesPerCheckpoint);
        this.jobIdState.clear();
        this.jobIdState.add(this.flinkJobId);
        this.writeResultsOfCurrentCkpt.clear();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
为更新当前的checkpointId和manifest元文件信息
dataFilesPerCheckpoint与调用关系如下:
在这里插入图片描述

private byte[] writeToManifest(long checkpointId) throws IOException {
        if (this.writeResultsOfCurrentCkpt.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        } else {
            WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build();
            DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> {
                return this.manifestOutputFileFactory.create(checkpointId);
            }, this.table.spec());
            return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

writeResultsOfCurrentCkpt中包含了datafile文件、deletefile文件和referenced数据文件。然后,根据result创建deltaManifests ,并且返回序列化后的manifest信息。

deltaManifests 值如下:
在这里插入图片描述

static DeltaManifests writeCompletedFiles(WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec) throws IOException {
        ManifestFile dataManifest = null;
        ManifestFile deleteManifest = null;
        if (result.dataFiles() != null && result.dataFiles().length > 0) {
            dataManifest = writeDataFiles((OutputFile)outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
        }

        if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
            OutputFile deleteManifestFile = (OutputFile)outputFileSupplier.get();
            ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
            ManifestWriter<DeleteFile> writer = deleteManifestWriter;
            Throwable var8 = null;

            try {
                DeleteFile[] var9 = result.deleteFiles();
                int var10 = var9.length;

                for(int var11 = 0; var11 < var10; ++var11) {
                    DeleteFile deleteFile = var9[var11];
                    writer.add(deleteFile);
                }
            } catch (Throwable var16) {
                var8 = var16;
                throw var16;
            } finally {
                if (writer != null) {
                    $closeResource(var8, writer);
                }

            }

            deleteManifest = deleteManifestWriter.toManifestFile();
        }

        return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

从上面写入过程可以看出,datafile和deletefile写入后,分别生成各自的Manifest文件,最后创建DeltaManifests返回。
最后通知checkpoint完成,提交checkpoint

public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (checkpointId > this.maxCommittedCheckpointId) {
            this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId);
            this.maxCommittedCheckpointId = checkpointId;
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException {
        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
        List<ManifestFile> manifests = Lists.newArrayList();
        NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
        Iterator var8 = pendingMap.entrySet().iterator();

        while(var8.hasNext()) {
            Entry<Long, byte[]> e = (Entry)var8.next();
            if (!Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) {
                DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, (byte[])e.getValue());
                pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io()));
                manifests.addAll(deltaManifests.manifests());
            }
        }

        if (this.replacePartitions) {
            this.replacePartitions(pendingResults, newFlinkJobId, checkpointId);
        } else {
            this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
        }

        pendingMap.clear();
        var8 = manifests.iterator();

        while(var8.hasNext()) {
            ManifestFile manifest = (ManifestFile)var8.next();

            try {
                this.table.io().deleteFile(manifest.path());
            } catch (Exception var12) {
                String details = MoreObjects.toStringHelper(this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString();
                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, var12);
            }
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

这里会反序列化之前序列化的值,生成deltaManifests,添加到manifests列表中,manifests值:
在这里插入图片描述

然后根据replacePartitions(创建时传入的overwrite值,默认为false)值提交事务,默认情况下调用commitDeltaTxn()

    private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
        int deleteFilesNum = pendingResults.values().stream().mapToInt((r) -> {
            return r.deleteFiles().length;
        }).sum();
        Stream var10000;
        if (deleteFilesNum == 0) {
            AppendFiles appendFiles = this.table.newAppend();
            int numFiles = 0;
            Iterator var8 = pendingResults.values().iterator();

            while(var8.hasNext()) {
                WriteResult result = (WriteResult)var8.next();
                Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
                numFiles += result.dataFiles().length;
                var10000 = Arrays.stream(result.dataFiles());
                Objects.requireNonNull(appendFiles);
                var10000.forEach(appendFiles::appendFile);
            }

            this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
        } else {
            Iterator var12 = pendingResults.entrySet().iterator();

            while(var12.hasNext()) {
                Entry<Long, WriteResult> e = (Entry)var12.next();
                WriteResult result = (WriteResult)e.getValue();
                RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles();
                int numDataFiles = result.dataFiles().length;
                var10000 = Arrays.stream(result.dataFiles());
                Objects.requireNonNull(rowDelta);
                var10000.forEach(rowDelta::addRows);
                int numDeleteFiles = result.deleteFiles().length;
                var10000 = Arrays.stream(result.deleteFiles());
                Objects.requireNonNull(rowDelta);
                var10000.forEach(rowDelta::addDeletes);
                this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey());
            }
        }

    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

创建一个RowDelta的对象rowDelta或MergeAppend的appendFiles,rowDelta的实现类为BaseRowDelta继承自MergingSnapshotProducer作为一个新的snapshot提交;appendFiles的实现类MergeAppend,同样继承MergingSnapshotProducer。

    private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) {
        LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table});
        operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId));
        operation.set("flink.job-id", newFlinkJobId);
        long start = System.currentTimeMillis();
        operation.commit();
        long duration = System.currentTimeMillis() - start;
        LOG.info("Committed in {} ms", duration);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

operation.commit()会调用SnapshotProducer中的commit()方法

    public void commit() {
        AtomicLong newSnapshotId = new AtomicLong(-1L);

        try {
            Tasks.foreach(new TableOperations[]{this.ops}).retry(this.base.propertyAsInt("commit.retry.num-retries", 4)).exponentialBackoff((long)this.base.propertyAsInt("commit.retry.min-wait-ms", 100), (long)this.base.propertyAsInt("commit.retry.max-wait-ms", 60000), (long)this.base.propertyAsInt("commit.retry.total-timeout-ms", 1800000), 2.0D).onlyRetryOn(CommitFailedException.class).run((taskOps) -> {
                Snapshot newSnapshot = this.apply();
                newSnapshotId.set(newSnapshot.snapshotId());
                TableMetadata updated;
                if (this.stageOnly) {
                    updated = this.base.addStagedSnapshot(newSnapshot);
                } else {
                    updated = this.base.replaceCurrentSnapshot(newSnapshot);
                }

                if (updated != this.base) {
                    taskOps.commit(this.base, updated.withUUID());
                }
            });
        } catch (RuntimeException var5) {
            Exceptions.suppressAndThrow(var5, this::cleanAll);
        }

        LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), this.getClass().getSimpleName());

        try {
            Snapshot saved = this.ops.refresh().snapshot(newSnapshotId.get());
            if (saved != null) {
                this.cleanUncommitted(Sets.newHashSet(saved.allManifests()));
                Iterator var3 = this.manifestLists.iterator();

                while(var3.hasNext()) {
                    String manifestList = (String)var3.next();
                    if (!saved.manifestListLocation().equals(manifestList)) {
                        this.deleteFile(manifestList);
                    }
                }
            } else {
                LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
            }
        } catch (RuntimeException var6) {
            LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", var6);
        }

        this.notifyListeners();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

SnapshotProducer.apply() 方法执行写入manifestFiles数据,返回快照数据;

    public Snapshot apply() {
        this.base = this.refresh();
        Long parentSnapshotId = this.base.currentSnapshot() != null ? this.base.currentSnapshot().snapshotId() : null;
        long sequenceNumber = this.base.nextSequenceNumber();
        this.validate(this.base);
        List<ManifestFile> manifests = this.apply(this.base);
        if (this.base.formatVersion() <= 1 && !this.base.propertyAsBoolean("write.manifest-lists.enabled", true)) {
            return new BaseSnapshot(this.ops.io(), this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifests);
        } else {
            OutputFile manifestList = this.manifestListPath();

            try {
                ManifestListWriter writer = ManifestLists.write(this.ops.current().formatVersion(), manifestList, this.snapshotId(), parentSnapshotId, sequenceNumber);
                Throwable var7 = null;

                try {
                    this.manifestLists.add(manifestList.location());
                    ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
                    Tasks.range(manifestFiles.length).stopOnFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run((index) -> {
                        manifestFiles[index] = (ManifestFile)this.manifestsWithMetadata.get((ManifestFile)manifests.get(index));
                    });
                    writer.addAll(Arrays.asList(manifestFiles));
                } catch (Throwable var13) {
                    var7 = var13;
                    throw var13;
                } finally {
                    if (writer != null) {
                        $closeResource(var7, writer);
                    }

                }
            } catch (IOException var15) {
                throw new RuntimeIOException(var15, "Failed to write manifest list file", new Object[0]);
            }

            return new BaseSnapshot(this.ops.io(), sequenceNumber, this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifestList.location());
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

然后生成表的元数据updated

public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
        if (this.snapshotsById.containsKey(snapshot.snapshotId())) {
            return this.setCurrentSnapshotTo(snapshot);
        } else {
            ValidationException.check(this.formatVersion == 1 || snapshot.sequenceNumber() > this.lastSequenceNumber, "Cannot add snapshot with sequence number %s older than last sequence number %s", new Object[]{snapshot.sequenceNumber(), this.lastSequenceNumber});
            List<Snapshot> newSnapshots = ImmutableList.builder().addAll(this.snapshots).add(snapshot).build();
            List<HistoryEntry> newSnapshotLog = ImmutableList.builder().addAll(this.snapshotLog).add(new TableMetadata.SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())).build();
            return new TableMetadata((InputFile)null, this.formatVersion, this.uuid, this.location, snapshot.sequenceNumber(), snapshot.timestampMillis(), this.lastColumnId, this.schema, this.defaultSpecId, this.specs, this.defaultSortOrderId, this.sortOrders, this.properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, this.addPreviousFile(this.file, this.lastUpdatedMillis));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

调用BaseMetastoreTableOperations算子的commit()方法

public void commit(TableMetadata base, TableMetadata metadata) {
        if (base != this.current()) {
            throw new CommitFailedException("Cannot commit: stale table metadata", new Object[0]);
        } else if (base == metadata) {
            LOG.info("Nothing to commit.");
        } else {
            long start = System.currentTimeMillis();
            this.doCommit(base, metadata);
            this.deleteRemovedMetadataFiles(base, metadata);
            this.requestRefresh();
            LOG.info("Successfully committed to table {} in {} ms", this.tableName(), System.currentTimeMillis() - start);
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

最后调用HiveTableOperations的doCommit(),执行提交操作。

protected void doCommit(TableMetadata base, TableMetadata metadata) {
        String newMetadataLocation = this.writeNewMetadata(metadata, this.currentVersion() + 1);
        boolean hiveEngineEnabled = hiveEngineEnabled(metadata, this.conf);
        boolean threw = true;
        boolean updateHiveTable = false;
        Optional lockId = Optional.empty();

        try {
            lockId = Optional.of(this.acquireLock());
            Table tbl = this.loadHmsTable();
            if (tbl != null) {
                if (base == null && tbl.getParameters().get("metadata_location") != null) {
                    throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
                }

                updateHiveTable = true;
                LOG.debug("Committing existing table: {}", this.fullName);
            } else {
                tbl = this.newHmsTable();
                LOG.debug("Committing new table: {}", this.fullName);
            }

            tbl.setSd(this.storageDescriptor(metadata, hiveEngineEnabled));
            String metadataLocation = (String)tbl.getParameters().get("metadata_location");
            String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
            if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
                throw new CommitFailedException("Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", new Object[]{baseMetadataLocation, metadataLocation, this.database, this.tableName});
            }

            this.setParameters(newMetadataLocation, tbl, hiveEngineEnabled);
            this.persistTable(tbl, updateHiveTable);
            threw = false;
        } catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException var16) {
            throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
        } catch (UnknownHostException | TException var17) {
            if (var17.getMessage() != null && var17.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
                throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't exist, this probably happened when using embedded metastore or doesn't create a transactional meta table. To fix this, use an alternative metastore", var17);
            }

            throw new RuntimeException(String.format("Metastore operation failed for %s.%s", this.database, this.tableName), var17);
        } catch (InterruptedException var18) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted during commit", var18);
        } finally {
            this.cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

附:flink task执行流程

task的生命周期:
StreamTask是所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
执行过程:

@Override
	public final void invoke() throws Exception {
		try {
			beforeInvoke();

			// final check to exit early before starting to run
			if (canceled) {
				throw new CancelTaskException();
			}

			// let the task do its work
			runMailboxLoop();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			afterInvoke();
		}
		catch (Exception invokeException) {
			try {
				cleanUpInvoke();
			}
			catch (Throwable cleanUpException) {
				throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
			}
			throw invokeException;
		}
		cleanUpInvoke();
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

在beforeInvoke中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop中调用task运行
在afterInvoke中结束
调用关系:

-- invoke()
*        |
*        +----> Create basic utils (config, etc) and load the chain of operators
*        +----> operators.setup()
*        +----> task specific init()
*        +----> initialize-operator-states()
*        +----> open-operators()
*        +----> run()
* --------------> mailboxProcessor.runMailboxLoop();
* --------------> StreamTask.processInput()
* --------------> StreamTask.inputProcessor.processInput()
* --------------> 间接调用 operator的processElement()和processWatermark()方法
*        +----> close-operators()
*        +----> dispose-operators()
*        +----> common cleanup
*        +----> task specific cleanup()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  1. 创建状态存储后端,为 OperatorChain 中的所有算子提供状态
  2. 加载 OperatorChain 中的所有算子
  3. 所有的 operator 调用 setup
  4. task 相关的初始化操作
  5. 所有 operator 调用 initializeState 初始化状态
  6. 所有的 operator 调用 open
  7. run 方法循环处理数据
  8. 所有 operator 调用 close
  9. 所有 operator 调用 dispose
  10. 通用的 cleanup 操作
  11. task 相关的 cleanup 操作

参考

iceberg相关学习文档

  1. https://iceberg.apache.org/#flink/
  2. https://www.dremio.com/apache-iceberg-an-architectural-look-under-the-covers/
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/824877
推荐阅读
相关标签
  

闽ICP备14008679号