当前位置:   article > 正文

浅析Flink的Kafka Connector_flink-connector-kafka

flink-connector-kafka

Flink做为当下流行的实时数据处理框架,在实时计算场景中已经广泛使用,在我们的生产场景中使用FLINK Kafka Collector 拉取Kafka中的数据进行计算处理,在使用过程中顺便对Flink的Kafka Collector做了简单理解和梳理。

组件版本
Flink1.11.2
Flink依赖的kafka-client版本0.10.2.2

一、Kafka Collector 初始化

如下,Flink中使用Kafka Collector很简单,和使用Kafka 的java api类似,甚至更简单。

Properties kafkaDataProp = new Properties();
kafkaDataProp.setProperty("bootstrap.servers", objBrokerHosts);
// ....
FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>(
				Arrays.asList(objTopics.split(",")), new SimpleStringSchema(), kafkaDataProp);
  • 1
  • 2
  • 3
  • 4
  • 5

FlinkKafkaConsumer010 继承自 FlinkKafkaConsumerBase, FlinkKafkaConsumerBase 实现了RichParallelSourceFunction类,关于Flink数据源相关的接口,这里暂不展开。FlinkKafkaConsumer010 中的初始化方法如下:

	private FlinkKafkaConsumer010(@Nullable List<String> topics, @Nullable Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer,	Properties props) {
		super(
				topics,
				subscriptionPattern,
				deserializer,
				getLong(
					checkNotNull(props, "props"),
					KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
				!getBoolean(props, KEY_DISABLE_METRICS, false));

		this.properties = props;
		//将 key.deserializer 以及 value.deserializer 替换为使用 ByteArrayDeserializer
		setDeserializer(this.properties);
		// configure the polling timeout
		try {
			if (properties.containsKey(KEY_POLL_TIMEOUT)) {
				this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
			} else {
				this.pollTimeout = DEFAULT_POLL_TIMEOUT;
			}
		}
		catch (Exception e) {
			throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
		}
	}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

父类初始化如下:

public FlinkKafkaConsumerBase(
			List<String> topics,
			Pattern topicPattern,
			KafkaDeserializationSchema<T> deserializer,
			long discoveryIntervalMillis,
			boolean useMetrics) {
		this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
		this.deserializer = checkNotNull(deserializer, "valueDeserializer");
		//检查分区发现配置
		checkArgument(
			discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0,
			"Cannot define a negative value for the topic / partition discovery interval.");
		this.discoveryIntervalMillis = discoveryIntervalMillis;
		this.useMetrics = useMetrics;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

此处并未对消费者进行进一步的初始化,进一步的初始化在Flink调用 FlinkKafkaConsumerBase 中open方法中,部分关键代码:

	@Override
	public void open(Configuration configuration) throws Exception {
		// determine the offset commit mode
		this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
				getIsAutoCommitEnabled(),
				enableCommitOnCheckpoints,
				((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

		// 创建分区发现器,用于发现新的分区以及匹配子任务负责处理的分区
		this.partitionDiscoverer = createPartitionDiscoverer(
				topicsDescriptor,
				getRuntimeContext().getIndexOfThisSubtask(),
				getRuntimeContext().getNumberOfParallelSubtasks());
		// 初始化分区发现器的kafka连接
		this.partitionDiscoverer.open();

		subscribedPartitionsToStartOffsets = new HashMap<>();
		// 获取当前子任务负责处理的分区所有分区信息
		final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
		// 如果需要从 检查点 数据中进行恢复
		if (restoredState != null) {
			//略
		}else{
			// 根据不同启动模式进行分区的初始化,默认情况下执行代码如下:
			default:
					for (KafkaTopicPartition seedPartition : allPartitions) {
						subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
					}
		}
		
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

Flink Kafka Collector中subTask通过hash后取模确定自身所需要负责的分区,在实际场景中,一个topic的分区有可能被调整,Flink支持动态发现分区,发现的新分区也根据同样的算法确定是否属于当前子任务自身,discoverPartitions 方法相关代码如下:

public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
		if (!closed && !wakeup) {
			try {
				List<KafkaTopicPartition> newDiscoveredPartitions;

				// 如果是指定topics
				if (topicsDescriptor.isFixedTopics()) {
					newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
				} else {
					List<String> matchedTopics = getAllTopics();

					// 使用模式匹配
					Iterator<String> iter = matchedTopics.iterator();
					while (iter.hasNext()) {
						if (!topicsDescriptor.isMatchingTopic(iter.next())) {
							iter.remove();
						}
					}
					if (matchedTopics.size() != 0) {
						// get partitions only for matched topics
						newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
					} else {
						newDiscoveredPartitions = null;
					}
				}

				// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
				if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
					throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
				} else {
					Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
					KafkaTopicPartition nextPartition;
					while (iter.hasNext()) {
						nextPartition = iter.next();
						if (!setAndCheckDiscoveredPartition(nextPartition)) {
							iter.remove();
						}
					}
				}
				// ... 
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

setAndCheckDiscoveredPartition 方法代码如下:

public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
		if (isUndiscoveredPartition(partition)) {
			discoveredPartitions.add(partition);
			// hash 取模后判断是否等于自身Id 
			return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
		}
		return false;
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

到处,主要初始化的部分就完成了,这里的检查点恢复,以及其它分区初始化的情况就不做深入说明了,

二、数据拉取

在Flink启动该数据源是通过调用源实现类的run方法,在该方法中可通过 SourceContext 将获取到的数据传递出去,run方法在SourceFunction中有如下定义说明。

/**
	 * Starts the source. Implementations can use the {@link SourceContext} emit
	 * elements.
	 *
	 * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
	 * must lock on the checkpoint lock (using a synchronized block) before updating internal
	 * state and emitting elements, to make both an atomic operation:
	 * ...
	 */
	 void run(SourceContext<T> ctx) throws Exception;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

实现类FlinkKafkaConsumerBase run方法代码:

	@Override
	public void run(SourceContext<T> sourceContext) throws Exception {
		if (subscribedPartitionsToStartOffsets == null) {
			throw new Exception("The partitions were not set for the consumer");
		}

		// initialize commit metrics and default offset callback method
		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
		final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();

		this.offsetCommitCallback = new KafkaCommitCallback() {
			@Override
			public void onSuccess() {
				successfulCommits.inc();
			}

			@Override
			public void onException(Throwable cause) {
				LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
				failedCommits.inc();
			}
		};

		// mark the subtask as temporarily idle if there are no initial seed partitions;
		// once this subtask discovers some partitions and starts collecting records, the subtask's
		// status will automatically be triggered back to be active.
		if (subscribedPartitionsToStartOffsets.isEmpty()) {
			sourceContext.markAsTemporarilyIdle();
		}

		LOG.info("Consumer subtask {} creating fetcher with offsets {}.",
			getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);
		// from this point forward:
		//   - 'snapshotState' will draw offsets from the fetcher,
		//     instead of being built from `subscribedPartitionsToStartOffsets`
		//   - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
		//     Kafka through the fetcher, if configured to do so)
		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				watermarkStrategy,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

		if (!running) {
			return;
		}

		// depending on whether we were restored with the current state version (1.3),
		// remaining logic branches off into 2 paths:
		//  1) New state - partition discovery loop executed as separate thread, with this
		//                 thread running the main fetcher loop
		//  2) Old state - partition discovery is disabled and only the main fetcher loop is executed
		if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {
			kafkaFetcher.runFetchLoop();
		} else {
			// 拉起子任务的分区发现线程,定时进行分区检查,然后同样调用  kafkaFetcher.runFetchLoop() 对KAFKA中数据进行拉取
			runWithPartitionDiscovery();
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

createFetcher根据不同实现创建不同的处理,基于对版本的兼容以及消费后数据的处理方式不同,在flink-collector-kafka_2.11 中,有如下实现:

说明
FlinkKafkaConsumer启动消费线程,根据指定的DeserializationSchema对消费的数据进行Deserialization处理并收集
FlinkKafkaConsumer010启动消费线程,根据指定的DeserializationSchema对消费的数据进行Deserialization处理并收集
FlinkKafkaShuffleConsumer启动消费线程,要抽指定的typeSerializer对消费数据进行反序列化处理并收集

PartitionDiscover 对新发现的分区会通过 addDiscoveredPartitions 方法注册到 Fetcher 的 unassignedPartitionsQueue ,消费线程在消费过程中会进行是否有新分区分配的检查,并进行 reassign ,部分代码如下:

	// main fetch loop
			while (running) {

				// check if there is something to commit
				if (!commitInProgress) {
					// ... 
				}

				try {
					if (hasAssignedPartitions) {
						newPartitions = unassignedPartitionsQueue.pollBatch();
					}
					else {
						// if no assigned partitions block until we get at least one
						// instead of hot spinning this loop. We rely on a fact that
						// unassignedPartitionsQueue will be closed on a shutdown, so
						// we don't block indefinitely
						newPartitions = unassignedPartitionsQueue.getBatchBlocking();
					}
					if (newPartitions != null) {
						reassignPartitions(newPartitions);
					}
				} catch (AbortedReassignmentException e) {
					continue;
				}

				if (!hasAssignedPartitions) {
					// Without assigned partitions KafkaConsumer.poll will throw an exception
					continue;
				}

				// get the next batch of records, unless we did not manage to hand the old batch over
				if (records == null) {
					try {
						records = getRecordsFromKafka();
					}
					catch (WakeupException we) {
						continue;
					}
				}

				try {
					handover.produce(records);
					records = null;
				}
				// ...  
		}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

在 getRecordsFromKafka 方法中,通过rateLimiter对数据的消费进行了限流处理,对消费出来的数据由handover.produce对数据进行转移,由kafkaFetcher.runFetchLoop

三、消费偏移量的提交

在初始化中的open方法里,通过调用 fromConfiguration 其实已经确定了偏移量提交的方式,fromConfiguration代码如下:

public static OffsetCommitMode fromConfiguration(
			boolean enableAutoCommit,
			boolean enableCommitOnCheckpoint,
			boolean enableCheckpointing) {
		//如果启动检查点
		if (enableCheckpointing) {
			// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
			return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
		} else {
			// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
			// enableAutoCommit为 enable.auto.commit 配置项 FLINK中默认为true,auto.commit.interval.ms 默认为5000
			return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
		}
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

如果开启checkpoint的情况下,在每完成一次checkpoint将进行一次异步提交,具体代码在FlinkKafkaConsumerBase的notifyCheckpointComplete中实现,使用fetcher.commitInternalOffsetsToKafka让消费线程进行异常提交。

在实际场景中,如果不开启检查点,并使用自动提交,在kafka-0.10.2.2的版本情况下,可能导致kafka-client自动提交失败后无法再次尝试提交,在程序不重启的情况下偏移量永远无法提交的情况,但在更新的kafka-client版本中好像已经进行了这个问题的优化。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/493535
推荐阅读
相关标签
  

闽ICP备14008679号