赞
踩
flink支持DataStream和DataStream写入iceberg
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input, FLINK_SCHEMA)
.tableLoader(tableLoader)
.writeParallelism(1)
.build();
env.execute("Test Iceberg DataStream");
input为DataStream和DataStream形式的输入流,FLINK_SCHEMA为TableSchema;
首先看build()方法:
public DataStreamSink<RowData> build() {
Preconditions.checkArgument(this.rowDataInput != null, "Please use forRowData() to initialize the input DataStream.");
Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null");
if (this.table == null) {
this.tableLoader.open();
try {
TableLoader loader = this.tableLoader;
Throwable var2 = null;
try {
this.table = loader.loadTable();
} catch (Throwable var12) {
var2 = var12;
throw var12;
} finally {
if (loader != null) {
if (var2 != null) {
try {
loader.close();
} catch (Throwable var11) {
var2.addSuppressed(var11);
}
} else {
loader.close();
}
}
}
} catch (IOException var14) {
throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, var14);
}
}
List<Integer> equalityFieldIds = Lists.newArrayList();
if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) {
Iterator var16 = this.equalityFieldColumns.iterator();
while(var16.hasNext()) {
String column = (String)var16.next();
NestedField field = this.table.schema().findField(column);
Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, this.table.schema());
equalityFieldIds.add(field.fieldId());
}
}
RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
this.rowDataInput = this.distributeDataStream(this.rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType);
IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism;
DataStream<Void> returnStream = this.rowDataInput.transform(FlinkSink.ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter).setParallelism(this.writeParallelism).transform(FlinkSink.ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1);
return returnStream.addSink(new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1);
}
此处创建写的iceberg核心算子IcebergStreamWriter和IcebergFilesCommitter
IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
build()方法中,调用createStreamWriter()创建IcebergStreamWriter
static IcebergStreamWriter<RowData> createStreamWriter(Table table, RowType flinkRowType, List<Integer> equalityFieldIds) {
Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkRowType, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props, equalityFieldIds);
return new IcebergStreamWriter(table.name(), taskWriterFactory);
}
根据表信息构建TaskWriterFactory,并传入到IcebergStreamWriter
class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult> implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {
private static final long serialVersionUID = 1L;
private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
private transient TaskWriter<T> writer;
private transient int subTaskId;
private transient int attemptId;
IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> taskWriterFactory) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
this.setChainingStrategy(ChainingStrategy.ALWAYS);
}
public void open() {
this.subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
this.attemptId = this.getRuntimeContext().getAttemptNumber();
this.taskWriterFactory.initialize(this.subTaskId, this.attemptId);
this.writer = this.taskWriterFactory.create();
}
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
this.emit(this.writer.complete());
this.writer = this.taskWriterFactory.create();
}
public void processElement(StreamRecord<T> element) throws Exception {
this.writer.write(element.getValue());
}
}
在open中通过传入的taskWriterFactory构建TaskWriter
public TaskWriter<RowData> create() {
Preconditions.checkNotNull(this.outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize().");
if (this.equalityFieldIds != null && !this.equalityFieldIds.isEmpty()) {
return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds) : new PartitionedDeltaWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema, this.equalityFieldIds));
} else {
return (TaskWriter)(this.spec.isUnpartitioned() ? new UnpartitionedWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes) : new RowDataTaskWriterFactory.RowDataPartitionedFanoutWriter(this.spec, this.format, this.appenderFactory, this.outputFileFactory, this.io, this.targetFileSizeBytes, this.schema, this.flinkSchema));
}
}
此方法中根据是否指定字段,构造分区写(PartitionedDeltaWriter/RowDataPartitionedFanoutWriter)和非分区写实例(UnpartitionedDeltaWriter/UnpartitionedWriter)
四个类的调用关系:
指定字段:
UnpartitionedDeltaWriter -> BaseEqualityDeltaWriter.write() -> RollingFileWriter.write() -> appender.add()
PartitionedDeltaWriter -> BaseDeltaTaskWriter.write() -> RollingFileWriter.write() -> appender.add()
未指定字段:
UnpartitionedWriter -> RollingFileWriter.write() -> appender.add()
RowDataPartitionedFanoutWriter -> BaseRollingWriter.write -> RollingFileWriter.write() -> appender.add()
底层调用的appender为创建TaskWriter传入的FlinkAppenderFactory创建的
在processElement()中调用write(element.getValue())方法,将数据写入,最后在checkpoint时提交。
提示:task执行三部曲:beforeInvoke() -> runMailboxLoop() -> afterInvoke()
beforeInvoke调用open()和initializeState(),runMailboxLoop调用processElement()处理数据
IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
build()方法中,传入tableLoader和overwrite直接创建IcebergFilesCommitter。
checkpoint初始化操作在IcebergFilesCommitter的initializeState()
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
this.tableLoader.open();
this.table = this.tableLoader.loadTable();
int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
int attemptId = this.getRuntimeContext().getAttemptNumber();
this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, (long)attemptId);
this.maxCommittedCheckpointId = -1L;
this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
if (context.isRestored()) {
String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next();
Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(this.table, restoredFlinkJobId);
NavigableMap<Long, byte[]> uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {
long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
}
}
}
checkpoint提交流程在IcebergFilesCommitter的snapshotState中
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
long checkpointId = context.getCheckpointId();
LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", this.table, checkpointId);
this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
this.checkpointsState.clear();
this.checkpointsState.add(this.dataFilesPerCheckpoint);
this.jobIdState.clear();
this.jobIdState.add(this.flinkJobId);
this.writeResultsOfCurrentCkpt.clear();
}
this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
为更新当前的checkpointId和manifest元文件信息
dataFilesPerCheckpoint与调用关系如下:
private byte[] writeToManifest(long checkpointId) throws IOException {
if (this.writeResultsOfCurrentCkpt.isEmpty()) {
return EMPTY_MANIFEST_DATA;
} else {
WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build();
DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> {
return this.manifestOutputFileFactory.create(checkpointId);
}, this.table.spec());
return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);
}
}
writeResultsOfCurrentCkpt中包含了datafile文件、deletefile文件和referenced数据文件。然后,根据result创建deltaManifests ,并且返回序列化后的manifest信息。
deltaManifests 值如下:
static DeltaManifests writeCompletedFiles(WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec) throws IOException {
ManifestFile dataManifest = null;
ManifestFile deleteManifest = null;
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest = writeDataFiles((OutputFile)outputFileSupplier.get(), spec, Lists.newArrayList(result.dataFiles()));
}
if (result.deleteFiles() != null && result.deleteFiles().length > 0) {
OutputFile deleteManifestFile = (OutputFile)outputFileSupplier.get();
ManifestWriter<DeleteFile> deleteManifestWriter = ManifestFiles.writeDeleteManifest(2, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
ManifestWriter<DeleteFile> writer = deleteManifestWriter;
Throwable var8 = null;
try {
DeleteFile[] var9 = result.deleteFiles();
int var10 = var9.length;
for(int var11 = 0; var11 < var10; ++var11) {
DeleteFile deleteFile = var9[var11];
writer.add(deleteFile);
}
} catch (Throwable var16) {
var8 = var16;
throw var16;
} finally {
if (writer != null) {
$closeResource(var8, writer);
}
}
deleteManifest = deleteManifestWriter.toManifestFile();
}
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}
从上面写入过程可以看出,datafile和deletefile写入后,分别生成各自的Manifest文件,最后创建DeltaManifests返回。
最后通知checkpoint完成,提交checkpoint
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
if (checkpointId > this.maxCommittedCheckpointId) {
this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
}
}
private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException {
NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
List<ManifestFile> manifests = Lists.newArrayList();
NavigableMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
Iterator var8 = pendingMap.entrySet().iterator();
while(var8.hasNext()) {
Entry<Long, byte[]> e = (Entry)var8.next();
if (!Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) {
DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize(DeltaManifestsSerializer.INSTANCE, (byte[])e.getValue());
pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io()));
manifests.addAll(deltaManifests.manifests());
}
}
if (this.replacePartitions) {
this.replacePartitions(pendingResults, newFlinkJobId, checkpointId);
} else {
this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
}
pendingMap.clear();
var8 = manifests.iterator();
while(var8.hasNext()) {
ManifestFile manifest = (ManifestFile)var8.next();
try {
this.table.io().deleteFile(manifest.path());
} catch (Exception var12) {
String details = MoreObjects.toStringHelper(this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString();
LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", details, var12);
}
}
}
这里会反序列化之前序列化的值,生成deltaManifests,添加到manifests列表中,manifests值:
然后根据replacePartitions(创建时传入的overwrite值,默认为false)值提交事务,默认情况下调用commitDeltaTxn()
private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
int deleteFilesNum = pendingResults.values().stream().mapToInt((r) -> {
return r.deleteFiles().length;
}).sum();
Stream var10000;
if (deleteFilesNum == 0) {
AppendFiles appendFiles = this.table.newAppend();
int numFiles = 0;
Iterator var8 = pendingResults.values().iterator();
while(var8.hasNext()) {
WriteResult result = (WriteResult)var8.next();
Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
numFiles += result.dataFiles().length;
var10000 = Arrays.stream(result.dataFiles());
Objects.requireNonNull(appendFiles);
var10000.forEach(appendFiles::appendFile);
}
this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
} else {
Iterator var12 = pendingResults.entrySet().iterator();
while(var12.hasNext()) {
Entry<Long, WriteResult> e = (Entry)var12.next();
WriteResult result = (WriteResult)e.getValue();
RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles();
int numDataFiles = result.dataFiles().length;
var10000 = Arrays.stream(result.dataFiles());
Objects.requireNonNull(rowDelta);
var10000.forEach(rowDelta::addRows);
int numDeleteFiles = result.deleteFiles().length;
var10000 = Arrays.stream(result.deleteFiles());
Objects.requireNonNull(rowDelta);
var10000.forEach(rowDelta::addDeletes);
this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey());
}
}
}
创建一个RowDelta的对象rowDelta或MergeAppend的appendFiles,rowDelta的实现类为BaseRowDelta继承自MergingSnapshotProducer作为一个新的snapshot提交;appendFiles的实现类MergeAppend,同样继承MergingSnapshotProducer。
private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) {
LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table});
operation.set("flink.max-committed-checkpoint-id", Long.toString(checkpointId));
operation.set("flink.job-id", newFlinkJobId);
long start = System.currentTimeMillis();
operation.commit();
long duration = System.currentTimeMillis() - start;
LOG.info("Committed in {} ms", duration);
}
operation.commit()会调用SnapshotProducer中的commit()方法
public void commit() {
AtomicLong newSnapshotId = new AtomicLong(-1L);
try {
Tasks.foreach(new TableOperations[]{this.ops}).retry(this.base.propertyAsInt("commit.retry.num-retries", 4)).exponentialBackoff((long)this.base.propertyAsInt("commit.retry.min-wait-ms", 100), (long)this.base.propertyAsInt("commit.retry.max-wait-ms", 60000), (long)this.base.propertyAsInt("commit.retry.total-timeout-ms", 1800000), 2.0D).onlyRetryOn(CommitFailedException.class).run((taskOps) -> {
Snapshot newSnapshot = this.apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata updated;
if (this.stageOnly) {
updated = this.base.addStagedSnapshot(newSnapshot);
} else {
updated = this.base.replaceCurrentSnapshot(newSnapshot);
}
if (updated != this.base) {
taskOps.commit(this.base, updated.withUUID());
}
});
} catch (RuntimeException var5) {
Exceptions.suppressAndThrow(var5, this::cleanAll);
}
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), this.getClass().getSimpleName());
try {
Snapshot saved = this.ops.refresh().snapshot(newSnapshotId.get());
if (saved != null) {
this.cleanUncommitted(Sets.newHashSet(saved.allManifests()));
Iterator var3 = this.manifestLists.iterator();
while(var3.hasNext()) {
String manifestList = (String)var3.next();
if (!saved.manifestListLocation().equals(manifestList)) {
this.deleteFile(manifestList);
}
}
} else {
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up");
}
} catch (RuntimeException var6) {
LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", var6);
}
this.notifyListeners();
}
SnapshotProducer.apply() 方法执行写入manifestFiles数据,返回快照数据;
public Snapshot apply() {
this.base = this.refresh();
Long parentSnapshotId = this.base.currentSnapshot() != null ? this.base.currentSnapshot().snapshotId() : null;
long sequenceNumber = this.base.nextSequenceNumber();
this.validate(this.base);
List<ManifestFile> manifests = this.apply(this.base);
if (this.base.formatVersion() <= 1 && !this.base.propertyAsBoolean("write.manifest-lists.enabled", true)) {
return new BaseSnapshot(this.ops.io(), this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifests);
} else {
OutputFile manifestList = this.manifestListPath();
try {
ManifestListWriter writer = ManifestLists.write(this.ops.current().formatVersion(), manifestList, this.snapshotId(), parentSnapshotId, sequenceNumber);
Throwable var7 = null;
try {
this.manifestLists.add(manifestList.location());
ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
Tasks.range(manifestFiles.length).stopOnFailure().throwFailureWhenFinished().executeWith(ThreadPools.getWorkerPool()).run((index) -> {
manifestFiles[index] = (ManifestFile)this.manifestsWithMetadata.get((ManifestFile)manifests.get(index));
});
writer.addAll(Arrays.asList(manifestFiles));
} catch (Throwable var13) {
var7 = var13;
throw var13;
} finally {
if (writer != null) {
$closeResource(var7, writer);
}
}
} catch (IOException var15) {
throw new RuntimeIOException(var15, "Failed to write manifest list file", new Object[0]);
}
return new BaseSnapshot(this.ops.io(), sequenceNumber, this.snapshotId(), parentSnapshotId, System.currentTimeMillis(), this.operation(), this.summary(this.base), manifestList.location());
}
}
然后生成表的元数据updated
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
if (this.snapshotsById.containsKey(snapshot.snapshotId())) {
return this.setCurrentSnapshotTo(snapshot);
} else {
ValidationException.check(this.formatVersion == 1 || snapshot.sequenceNumber() > this.lastSequenceNumber, "Cannot add snapshot with sequence number %s older than last sequence number %s", new Object[]{snapshot.sequenceNumber(), this.lastSequenceNumber});
List<Snapshot> newSnapshots = ImmutableList.builder().addAll(this.snapshots).add(snapshot).build();
List<HistoryEntry> newSnapshotLog = ImmutableList.builder().addAll(this.snapshotLog).add(new TableMetadata.SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId())).build();
return new TableMetadata((InputFile)null, this.formatVersion, this.uuid, this.location, snapshot.sequenceNumber(), snapshot.timestampMillis(), this.lastColumnId, this.schema, this.defaultSpecId, this.specs, this.defaultSortOrderId, this.sortOrders, this.properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, this.addPreviousFile(this.file, this.lastUpdatedMillis));
}
}
调用BaseMetastoreTableOperations算子的commit()方法
public void commit(TableMetadata base, TableMetadata metadata) {
if (base != this.current()) {
throw new CommitFailedException("Cannot commit: stale table metadata", new Object[0]);
} else if (base == metadata) {
LOG.info("Nothing to commit.");
} else {
long start = System.currentTimeMillis();
this.doCommit(base, metadata);
this.deleteRemovedMetadataFiles(base, metadata);
this.requestRefresh();
LOG.info("Successfully committed to table {} in {} ms", this.tableName(), System.currentTimeMillis() - start);
}
}
最后调用HiveTableOperations的doCommit(),执行提交操作。
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = this.writeNewMetadata(metadata, this.currentVersion() + 1);
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, this.conf);
boolean threw = true;
boolean updateHiveTable = false;
Optional lockId = Optional.empty();
try {
lockId = Optional.of(this.acquireLock());
Table tbl = this.loadHmsTable();
if (tbl != null) {
if (base == null && tbl.getParameters().get("metadata_location") != null) {
throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
}
updateHiveTable = true;
LOG.debug("Committing existing table: {}", this.fullName);
} else {
tbl = this.newHmsTable();
LOG.debug("Committing new table: {}", this.fullName);
}
tbl.setSd(this.storageDescriptor(metadata, hiveEngineEnabled));
String metadataLocation = (String)tbl.getParameters().get("metadata_location");
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
throw new CommitFailedException("Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s", new Object[]{baseMetadataLocation, metadataLocation, this.database, this.tableName});
}
this.setParameters(newMetadataLocation, tbl, hiveEngineEnabled);
this.persistTable(tbl, updateHiveTable);
threw = false;
} catch (org.apache.hadoop.hive.metastore.api.AlreadyExistsException var16) {
throw new AlreadyExistsException("Table already exists: %s.%s", new Object[]{this.database, this.tableName});
} catch (UnknownHostException | TException var17) {
if (var17.getMessage() != null && var17.getMessage().contains("Table/View 'HIVE_LOCKS' does not exist")) {
throw new RuntimeException("Failed to acquire locks from metastore because 'HIVE_LOCKS' doesn't exist, this probably happened when using embedded metastore or doesn't create a transactional meta table. To fix this, use an alternative metastore", var17);
}
throw new RuntimeException(String.format("Metastore operation failed for %s.%s", this.database, this.tableName), var17);
} catch (InterruptedException var18) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during commit", var18);
} finally {
this.cleanupMetadataAndUnlock(threw, newMetadataLocation, lockId);
}
task的生命周期:
StreamTask是所有stream task的基本类。一个task 运行一个或者多个StreamOperator(如果成chain)。成chain的算子在同一个线程内同步运行。
执行过程:
@Override
public final void invoke() throws Exception {
try {
beforeInvoke();
// final check to exit early before starting to run
if (canceled) {
throw new CancelTaskException();
}
// let the task do its work
runMailboxLoop();
// if this left the run() method cleanly despite the fact that this was canceled,
// make sure the "clean shutdown" is not attempted
if (canceled) {
throw new CancelTaskException();
}
afterInvoke();
}
catch (Exception invokeException) {
try {
cleanUpInvoke();
}
catch (Throwable cleanUpException) {
throw (Exception) ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);
}
throw invokeException;
}
cleanUpInvoke();
}
在beforeInvoke中会做一些初始化工作,包括提取出所有的operator等。
在runMailboxLoop中调用task运行
在afterInvoke中结束
调用关系:
-- invoke()
* |
* +----> Create basic utils (config, etc) and load the chain of operators
* +----> operators.setup()
* +----> task specific init()
* +----> initialize-operator-states()
* +----> open-operators()
* +----> run()
* --------------> mailboxProcessor.runMailboxLoop();
* --------------> StreamTask.processInput()
* --------------> StreamTask.inputProcessor.processInput()
* --------------> 间接调用 operator的processElement()和processWatermark()方法
* +----> close-operators()
* +----> dispose-operators()
* +----> common cleanup
* +----> task specific cleanup()
iceberg相关学习文档
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。