赞
踩
Flink做为当下流行的实时数据处理框架,在实时计算场景中已经广泛使用,在我们的生产场景中使用FLINK Kafka Collector 拉取Kafka中的数据进行计算处理,在使用过程中顺便对Flink的Kafka Collector做了简单理解和梳理。
组件 | 版本 |
---|---|
Flink | 1.11.2 |
Flink依赖的kafka-client版本 | 0.10.2.2 |
如下,Flink中使用Kafka Collector很简单,和使用Kafka 的java api类似,甚至更简单。
Properties kafkaDataProp = new Properties();
kafkaDataProp.setProperty("bootstrap.servers", objBrokerHosts);
// ....
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(
Arrays.asList(objTopics.split(",")), new SimpleStringSchema(), kafkaDataProp);
FlinkKafkaConsumer010 继承自 FlinkKafkaConsumerBase, FlinkKafkaConsumerBase 实现了RichParallelSourceFunction类,关于Flink数据源相关的接口,这里暂不展开。FlinkKafkaConsumer010 中的初始化方法如下:
private FlinkKafkaConsumer010(@Nullable List<String> topics, @Nullable Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) { super( topics, subscriptionPattern, deserializer, getLong( checkNotNull(props, "props"), KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED), !getBoolean(props, KEY_DISABLE_METRICS, false)); this.properties = props; //将 key.deserializer 以及 value.deserializer 替换为使用 ByteArrayDeserializer setDeserializer(this.properties); // configure the polling timeout try { if (properties.containsKey(KEY_POLL_TIMEOUT)) { this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT)); } else { this.pollTimeout = DEFAULT_POLL_TIMEOUT; } } catch (Exception e) { throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e); } }
父类初始化如下:
public FlinkKafkaConsumerBase(
List<String> topics,
Pattern topicPattern,
KafkaDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
boolean useMetrics) {
this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
this.deserializer = checkNotNull(deserializer, "valueDeserializer");
//检查分区发现配置
checkArgument(
discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0,
"Cannot define a negative value for the topic / partition discovery interval.");
this.discoveryIntervalMillis = discoveryIntervalMillis;
this.useMetrics = useMetrics;
}
此处并未对消费者进行进一步的初始化,进一步的初始化在Flink调用 FlinkKafkaConsumerBase 中open方法中,部分关键代码:
@Override public void open(Configuration configuration) throws Exception { // determine the offset commit mode this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()); // 创建分区发现器,用于发现新的分区以及匹配子任务负责处理的分区 this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); // 初始化分区发现器的kafka连接 this.partitionDiscoverer.open(); subscribedPartitionsToStartOffsets = new HashMap<>(); // 获取当前子任务负责处理的分区所有分区信息 final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions(); // 如果需要从 检查点 数据中进行恢复 if (restoredState != null) { //略 }else{ // 根据不同启动模式进行分区的初始化,默认情况下执行代码如下: default: for (KafkaTopicPartition seedPartition : allPartitions) { subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel()); } }
Flink Kafka Collector中subTask通过hash后取模确定自身所需要负责的分区,在实际场景中,一个topic的分区有可能被调整,Flink支持动态发现分区,发现的新分区也根据同样的算法确定是否属于当前子任务自身,discoverPartitions 方法相关代码如下:
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException { if (!closed && !wakeup) { try { List<KafkaTopicPartition> newDiscoveredPartitions; // 如果是指定topics if (topicsDescriptor.isFixedTopics()) { newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics()); } else { List<String> matchedTopics = getAllTopics(); // 使用模式匹配 Iterator<String> iter = matchedTopics.iterator(); while (iter.hasNext()) { if (!topicsDescriptor.isMatchingTopic(iter.next())) { iter.remove(); } } if (matchedTopics.size() != 0) { // get partitions only for matched topics newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics); } else { newDiscoveredPartitions = null; } } // (2) eliminate partition that are old partitions or should not be subscribed by this subtask if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor); } else { Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator(); KafkaTopicPartition nextPartition; while (iter.hasNext()) { nextPartition = iter.next(); if (!setAndCheckDiscoveredPartition(nextPartition)) { iter.remove(); } } } // ... }
setAndCheckDiscoveredPartition 方法代码如下:
public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
if (isUndiscoveredPartition(partition)) {
discoveredPartitions.add(partition);
// hash 取模后判断是否等于自身Id
return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
}
return false;
}
到处,主要初始化的部分就完成了,这里的检查点恢复,以及其它分区初始化的情况就不做深入说明了,
在Flink启动该数据源是通过调用源实现类的run方法,在该方法中可通过 SourceContext 将获取到的数据传递出去,run方法在SourceFunction中有如下定义说明。
/**
* Starts the source. Implementations can use the {@link SourceContext} emit
* elements.
*
* <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
* must lock on the checkpoint lock (using a synchronized block) before updating internal
* state and emitting elements, to make both an atomic operation:
* ...
*/
void run(SourceContext<T> ctx) throws Exception;
实现类FlinkKafkaConsumerBase run方法代码:
@Override public void run(SourceContext<T> sourceContext) throws Exception { if (subscribedPartitionsToStartOffsets == null) { throw new Exception("The partitions were not set for the consumer"); } // initialize commit metrics and default offset callback method this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER); this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER); final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask(); this.offsetCommitCallback = new KafkaCommitCallback() { @Override public void onSuccess() { successfulCommits.inc(); } @Override public void onException(Throwable cause) { LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause); failedCommits.inc(); } }; // mark the subtask as temporarily idle if there are no initial seed partitions; // once this subtask discovers some partitions and starts collecting records, the subtask's // status will automatically be triggered back to be active. if (subscribedPartitionsToStartOffsets.isEmpty()) { sourceContext.markAsTemporarilyIdle(); } LOG.info("Consumer subtask {} creating fetcher with offsets {}.", getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets); // from this point forward: // - 'snapshotState' will draw offsets from the fetcher, // instead of being built from `subscribedPartitionsToStartOffsets` // - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to // Kafka through the fetcher, if configured to do so) this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics); if (!running) { return; } // depending on whether we were restored with the current state version (1.3), // remaining logic branches off into 2 paths: // 1) New state - partition discovery loop executed as separate thread, with this // thread running the main fetcher loop // 2) Old state - partition discovery is disabled and only the main fetcher loop is executed if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) { kafkaFetcher.runFetchLoop(); } else { // 拉起子任务的分区发现线程,定时进行分区检查,然后同样调用 kafkaFetcher.runFetchLoop() 对KAFKA中数据进行拉取 runWithPartitionDiscovery(); } }
createFetcher根据不同实现创建不同的处理,基于对版本的兼容以及消费后数据的处理方式不同,在flink-collector-kafka_2.11 中,有如下实现:
类 | 说明 |
---|---|
FlinkKafkaConsumer | 启动消费线程,根据指定的DeserializationSchema对消费的数据进行Deserialization处理并收集 |
FlinkKafkaConsumer010 | 启动消费线程,根据指定的DeserializationSchema对消费的数据进行Deserialization处理并收集 |
FlinkKafkaShuffleConsumer | 启动消费线程,要抽指定的typeSerializer对消费数据进行反序列化处理并收集 |
PartitionDiscover 对新发现的分区会通过 addDiscoveredPartitions 方法注册到 Fetcher 的 unassignedPartitionsQueue ,消费线程在消费过程中会进行是否有新分区分配的检查,并进行 reassign ,部分代码如下:
// main fetch loop while (running) { // check if there is something to commit if (!commitInProgress) { // ... } try { if (hasAssignedPartitions) { newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); } if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; } if (!hasAssignedPartitions) { // Without assigned partitions KafkaConsumer.poll will throw an exception continue; } // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try { records = getRecordsFromKafka(); } catch (WakeupException we) { continue; } } try { handover.produce(records); records = null; } // ... }
在 getRecordsFromKafka 方法中,通过rateLimiter对数据的消费进行了限流处理,对消费出来的数据由handover.produce对数据进行转移,由kafkaFetcher.runFetchLoop
在初始化中的open方法里,通过调用 fromConfiguration 其实已经确定了偏移量提交的方式,fromConfiguration代码如下:
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {
//如果启动检查点
if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
// enableAutoCommit为 enable.auto.commit 配置项 FLINK中默认为true,auto.commit.interval.ms 默认为5000
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}
如果开启checkpoint的情况下,在每完成一次checkpoint将进行一次异步提交,具体代码在FlinkKafkaConsumerBase的notifyCheckpointComplete中实现,使用fetcher.commitInternalOffsetsToKafka让消费线程进行异常提交。
在实际场景中,如果不开启检查点,并使用自动提交,在kafka-0.10.2.2的版本情况下,可能导致kafka-client自动提交失败后无法再次尝试提交,在程序不重启的情况下偏移量永远无法提交的情况,但在更新的kafka-client版本中好像已经进行了这个问题的优化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。