赞
踩
*/
def deleteTopics(timeout: Int,
topics: Set[String],
responseCallback: Map[String, Errors] => Unit): Unit = {
// 1. map over topics calling the asynchronous delete
val metadata = topics.map { topic =>
try {
// zk中写入数据 标记要被删除的topic /admin/delete_topics/Topic名称
adminZkClient.deleteTopic(topic)
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
case _: TopicAlreadyMarkedForDeletionException =>
// swallow the exception, and still track deletion allowing multiple calls to wait for deletion
DeleteTopicMetadata(topic, Errors.NONE)
case e: Throwable =>
error(s"Error processing delete topic request for topic $topic", e)
DeleteTopicMetadata(topic, Errors.forException(e))
}
}
// 2. 如果客户端传过来的timeout<=0或者 写入zk数据过程异常了 则执行下面的,直接返回异常
if (timeout <= 0 || !metadata.exists(_.error == Errors.NONE)) {
val results = metadata.map { deleteTopicMetadata =>
// ignore topics that already have errors
if (deleteTopicMetadata.error == Errors.NONE) {
(deleteTopicMetadata.topic, Errors.REQUEST_TIMED_OUT)
} else {
(deleteTopicMetadata.topic, deleteTopicMetadata.error)
}
}.toMap
responseCallback(results)
} else {
// 3. else pass the topics and errors to the delayed operation and set the keys
val delayedDelete = new DelayedDeleteTopics(timeout, metadata.toSeq, this, responseCallback)
val delayedDeleteKeys = topics.map(new TopicKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
topicPurgatory.tryCompleteElseWatch(delayedDelete, delayedDeleteKeys)
}
}
zk中写入数据topic/admin/delete_topics/Topic名称
; 标记要被删除的Topic
如果客户端传过来的timeout<=0或者 写入zk数据过程异常了 则直接返回异常
KafkaController.processTopicDeletion
private def processTopicDeletion(): Unit = {
if (!isActive) return
var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
val nonExistentTopics = topicsToBeDeleted – controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(“,”)}")
zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
}
topicsToBeDeleted --= nonExistentTopics
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(“,”)}")
// 标记暂时不可删除的Topic
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
if (partitionReassignmentInProgress)
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic),
reason = “topic reassignment in progress”)
}
// add topic to deletion list
topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
} else {
// If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
info(s"Removing $topicsToBeDeleted since delete topic is disabled")
zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
}
}
如果/admin/delete_topics/
下面的节点有不存在的Topic,则清理掉
如果配置了delete.topic.enable=false
不可删除Topic的话,则将/admin/delete_topics/
下面的节点全部删除,然后流程结束
delete.topic.enable=true
; 将主题标记为不符合删除条件,放到topicsIneligibleForDeletion
中; 不符合删除条件的是:Topic分区正在进行分区重分配
将Topic添加到删除Topic列表topicsToBeDeleted
中;
然后调用TopicDeletionManager.resumeDeletions()
方法执行删除操作
TopicDeletionManager.resumeDeletions()
private def resumeDeletions(): Unit = {
val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
val topicsEligibleForRetry = mutable.Set.empty[String]
val topicsEligibleForDeletion = mutable.Set.empty[String]
if (topicsQueuedForDeletion.nonEmpty)
topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
//如果所有副本都被标记为删除成功了,然后执行删除Topic成功操作;
if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
// clear up all state for this topic from controller cache and zookeeper
//执行删除Topic成功之后的操作;
completeDeleteTopic(topic)
info(s"Deletion of topic $topic successfully completed")
} else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven’t initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
topicsEligibleForRetry += topic
}
}
// Add topic to the eligible set if it is eligible for deletion.
if (isTopicEligibleForDeletion(topic)) {
info(s"Deletion of topic $topic (re)started")
topicsEligibleForDeletion += topic
}
}
// topic deletion retry will be kicked off
if (topicsEligibleForRetry.nonEmpty) {
retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
}
// topic deletion will be kicked off
if (topicsEligibleForDeletion.nonEmpty) {
//删除Topic,发送UpdataMetaData请求
onTopicDeletion(topicsEligibleForDeletion)
}
}
}
重点看看onTopicDeletion
方法,标记所有待删除分区;向Brokers发送updateMetadataRequest
请求,告知Brokers这个主题正在被删除,并将Leader设置为LeaderAndIsrLeaderDuringDelete
;
将待删除的Topic的所有分区,执行分区状态机的转换 ;当前状态–>OfflinePartition
->NonExistentPartition
; 这两个状态转换只是在当前Controller内存中更新了一下状态; 关于状态机请看 【kafka源码】Controller中的状态机TODO…;
client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))
向待删除Topic分区发送UpdateMetadata
请求; 这个时候更新了什么数据呢?
看上面图片源码, 发送UpdateMetadata
请求的时候把分区的Leader= -2; 表示这个分区正在被删除;那么所有正在被删除的分区就被找到了;拿到这些待删除分区之后干嘛呢?
更新一下限流相关信息
调用groupCoordinator.handleDeletedPartitions(deletedPartitions)
: 清除给定的deletedPartitions
的组偏移量以及执行偏移量删除的函数;就是现在该分区不能提供服务啦,不能被消费啦
详细请看 Kafka的元数据更新UpdateMetadata
TopicDeletionManager.onPartitionDeletion
接口如下;将所有Dead replicas 副本直接移动到ReplicaDeletionIneligible
状态,如果某些副本已死,也将相应的主题标记为不适合删除,因为它无论如何都不会成功完成
副本状态转换成OfflineReplica
; 这个时候会对该Topic的所有副本所在Broker发起StopReplicaRequest
请求;(参数deletePartitions = false
,表示还不执行删除操作); 以便他们停止向Leader
发送fetch
请求; 关于状态机请看 【kafka源码】Controller中的状态机TODO…;
副本状态转换成 ReplicaDeletionStarted
状态,这个时候会对该Topic的所有副本所在Broker发起StopReplicaRequest
请求;(参数deletePartitions = true
,表示执行删除操作)。这将发送带有 deletePartition=true 的 StopReplicaRequest
。并将删除相应分区的所有副本中的所有持久数据
最终调用的是接口
ReplicaManager.stopReplica
==> LogManager.asyncDelete
将给定主题分区“logdir”的目录重命名为“logdir.uuid.delete”,并将其添加到删除队列中
例如 :
def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
val removedLog: Log = logCreationOrDeletionLock synchronized {
//将待删除的partition在 Logs中删除掉
if (isFuture)
futureLogs.remove(topicPartition)
else
currentLogs.remove(topicPartition)
}
if (removedLog != null) {
//我们需要等到要删除的日志上没有更多的清理任务,然后才能真正删除它。
if (cleaner != null && !isFuture) {
cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
//重命名topic副本文件夹 命名规则 topic-uuid-delete
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
//将Log添加到待删除Log队列中,等待删除
addLogToBeDeleted(removedLog)
} else if (offlineLogDirs.nonEmpty) {
throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) “future” else “”} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(“,”)}")
}
removedLog
}
上面我们知道最终是将待删除的Log添加到了
logsToBeDeleted
这个队列中; 这个队列就是待删除Log队列,有一个线程kafka-delete-logs
专门来处理的;我们来看看这个线程怎么工作的
LogManager.startup
启动的时候 ,启动了一个定时线程
scheduler.schedule(“kafka-delete-logs”, // will be rescheduled after each delete logs with a dynamic period
deleteLogs _,
delay = InitialTaskDelayMs,
unit = TimeUnit.MILLISECONDS)
删除日志的线程
/**
Delete logs marked for deletion. Delete all logs for which currentDefaultConfig.fileDeleteDelayMs
has elapsed after the delete was scheduled. Logs for which this interval has not yet elapsed will be
considered for deletion in the next iteration of deleteLogs
. The next iteration will be executed
after the remaining time for the first log that is not deleted. If there are no more logsToBeDeleted
,
deleteLogs
will be executed after currentDefaultConfig.fileDeleteDelayMs
.
删除标记为删除的日志文件;
file.delete.delay.ms 文件延迟删除时间 默认60000毫秒
*/
private def deleteLogs(): Unit = {
var nextDelayMs = 0L
try {
def nextDeleteDelayMs: Long = {
if (!logsToBeDeleted.isEmpty) {
val (_, scheduleTimeMs) = logsToBeDeleted.peek()
scheduleTimeMs + currentDefaultConfig.fileDeleteDelayMs - time.milliseconds()
} else
currentDefaultConfig.fileDeleteDelayMs
}
while ({nextDelayMs = nextDeleteDelayMs; nextDelayMs <= 0}) {
val (removedLog, _) = logsToBeDeleted.take()
if (removedLog != null) {
try {
//立即彻底删除此日志目录和文件系统中的所有内容
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: KafkaStorageException =>
error(s"Exception while deleting $removedLog in dir ${removedLog.dir.getParent}.", e)
}
}
}
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
我还通过一些渠道整理了一些大厂真实面试主要有:蚂蚁金服、拼多多、阿里云、百度、唯品会、携程、丰巢科技、乐信、软通动力、OPPO、银盛支付、中国平安等初,中级,高级Java面试题集合,附带超详细答案,希望能帮助到大家。
还有专门针对JVM、SPringBoot、SpringCloud、数据库、Linux、缓存、消息中间件、源码等相关面试题。
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门即可获取!
可以扫码获取!!(备注Java获取)**
我还通过一些渠道整理了一些大厂真实面试主要有:蚂蚁金服、拼多多、阿里云、百度、唯品会、携程、丰巢科技、乐信、软通动力、OPPO、银盛支付、中国平安等初,中级,高级Java面试题集合,附带超详细答案,希望能帮助到大家。
[外链图片转存中…(img-YhsAD6pl-1712268081996)]
还有专门针对JVM、SPringBoot、SpringCloud、数据库、Linux、缓存、消息中间件、源码等相关面试题。
[外链图片转存中…(img-IpshqZaP-1712268081997)]
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门即可获取!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。