内容简介:本文依然是以kafka0.8.2.2为例讲解1
本文依然是以kafka0.8.2.2为例讲解
一,如何删除一个 topic
删除一个 topic 有两个关键点:
1 ,配置删除参数
delete.topic.enable 这个 Broker 参数配置为 True 。
2 ,执行
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
假如不配置删除参数为 true 的话, topic 其实并没有被清除,只是被标记为删除。此时,估计一般人的做法是删除 topic 在 Zookeeper 的信息和日志,其实这个操作并不会清除 kafkaBroker 内存的 topic 数据。所以,此时最佳的策略是配置删除参数为 true 然后,重启 kafka 。
二,重要的类介绍
1 , PartitionStateMachine
该类代表分区的状态机。决定者分区的当前状态,和状态转移。四种状态
NonExistentPartition
NewPartition
OnlinePartition
OfflinePartition
2 , ReplicaManager
负责管理当前机器的所有副本,处理读写、删除等具体动作。
读写:写获取 partition 对象,再获取 Replica 对象,再获取 Log 对象,采用其管理的 Segment 对象将数据写入、读出。
3 , ReplicaStateMachine
副本的状态机。决定者副本的当前状态和状态之间的转移。一个副本总共可以处于一下几种状态的一种
NewReplica : Crontroller 在分区重分配的时候可以创建一个新的副本。只能接受变为 follower 的请求。前状态可以是 NonExistentReplica
OnlineReplica :新启动的分区,能接受变为 leader 或者 follower 请求。前状态可以是 NewReplica, OnlineReplica or OfflineReplica
OfflineReplica :死亡的副本处于这种状态。前状态可以是 NewReplica, OnlineReplica
ReplicaDeletionStarted :分本删除开始的时候处于这种状态,前状态是 OfflineReplica
ReplicaDeletionSuccessful :副本删除成功。前状态是 ReplicaDeletionStarted
ReplicaDeletionIneligible :删除失败的时候处于这种状态。前状态是 ReplicaDeletionStarted
NonExistentReplica :副本成功删除之后处于这种状态,前状态是 ReplicaDeletionSuccessful
4 , TopicDeletionManager
该类管理着 topic 删除的状态机
1) , TopicCommand 通过创建 /admin/delete_topics/<topic> ,来发布 topic 删除命令。
2) , Controller 监听 /admin/delete_topic 子节点变动,开始分别删除 topic
3) , Controller 有个后台线程负责删除 Topic
三,源码彻底解析 topic 的删除过程
此处会分四个部分:
A) , 客户端执行删除命令作用
B) , 不配置 delete.topic.enable 整个流水的源码
C) , 配置了 delete.topic.enable 整个流水的源码
D) , 手动删除 zk 上 topic 信息和磁盘数据
1 ,客户端执行删除命令
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
进入 kafka-topics.sh 我们会看到
exec $ (dirname $0) /kafka-run-class.sh kafka.admin.TopicCommand $@
进入 TopicCommand 里面, main 方法里面
else if (opts. options .has(opts. deleteOpt ))
deleteTopic (zkClient , opts)
实际内容是
val topics = getTopics (zkClient , opts)
if (topics.length == 0 ) {
println ( "Topic %s does not exist" .format(opts. options .valueOf(opts. topicOpt )))
}
topics.foreach { topic =>
try {
ZkUtils. createPersistentPath (zkClient , ZkUtils. getDeleteTopicPath (topic))
在 "/admin/delete_topics" 目录下创建了一个 topicName 的节点。
2 ,假如不配置 delete.topic.enable 整个流水是
总共有两处 listener 会响应:
A ),TopicChangeListener
B ),DeleteTopicsListener
使用 topic 的删除命令删除一个 topic 的话,指挥触发 DeleteTopicListener 。
var topicsToBeDeleted = {
import JavaConversions._
(children: Buffer[ String ]).toSet
}
val nonExistentTopics = topicsToBeDeleted.filter(t => ! controllerContext . allTopics .contains(t))
topicsToBeDeleted --= nonExistentTopics
if ( topicsToBeDeleted .size > 0 ) {
info( "Starting topic deletion for topics " + topicsToBeDeleted .mkString( "," ))
// mark topic ineligible for deletion if other state changes are in progress
topicsToBeDeleted .foreach { topic =>
val preferredReplicaElectionInProgress =
controllerContext . partitionsUndergoingPreferredReplicaElection .map(_.topic).contains(topic)
val partitionReassignmentInProgress =
controllerContext . partitionsBeingReassigned .keySet.map(_.topic).contains(topic)
if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
}
// add topic to deletion list
controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
}
由于都会判断 delete.topic.enable 是否为 true ,假如不为 true 就不会执行,为 true 就进入执行
controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
3 , delete.topic.enable 配置为 true
此处与步骤 2 的区别,就是那两个处理函数。
controller. deleteTopicManager .markTopicIneligibleForDeletion(Set(topic))
controller. deleteTopicManager .enqueueTopicsForDeletion( topicsToBeDeleted )
markTopicIneligibleForDeletion 函数的处理为
if ( isDeleteTopicEnabled ) {
val newTopicsToHaltDeletion = topicsToBeDeleted & topics
topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
if (newTopicsToHaltDeletion.size > 0 )
info( "Halted deletion of topics %s" .format(newTopicsToHaltDeletion.mkString( "," )))
}
主要是停止删除 topic ,假如存储以下三种情况
* Halt delete topic if -
* 1. replicas being down
* 2. partition reassignment in progress for some partitions of the topic
* 3. preferred replica election in progress for some partitions of the topic
enqueueTopicsForDeletion 主要作用是更新删除 topic 的集合,并激活 TopicDeleteThread
def enqueueTopicsForDeletion (topics: Set[ String ]) {
if ( isDeleteTopicEnabled ) {
topicsToBeDeleted ++= topics
partitionsToBeDeleted ++= topics.flatMap( controllerContext .partitionsForTopic)
resumeTopicDeletionThread()
}
}
在删除线程 DeleteTopicsThread 的 doWork 方法中
topicsQueuedForDeletion .foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
if (controller. replicaStateMachine .areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
info( "Deletion of topic %s successfully completed" .format(topic))
}
进入 completeDeleteTopic 方法中
// deregister partition change listener on the deleted topic. This is to prevent the partition change listener
// firing before the new topic listener when a deleted topic gets auto created
partitionStateMachine .deregisterPartitionChangeListener(topic)
val replicasForDeletedTopic = controller. replicaStateMachine .replicasInState(topic , ReplicaDeletionSuccessful)
// controller will remove this replica from the state machine as well as its partition assignment cache
replicaStateMachine .handleStateChanges(replicasForDeletedTopic , NonExistentReplica)
val partitionsForDeletedTopic = controllerContext .partitionsForTopic(topic)
// move respective partition to OfflinePartition and NonExistentPartition state
partitionStateMachine .handleStateChanges(partitionsForDeletedTopic , OfflinePartition)
partitionStateMachine .handleStateChanges(partitionsForDeletedTopic , NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted .retain(_.topic != topic)
controllerContext .zkClient.deleteRecursive(ZkUtils. getTopicPath (topic))
controllerContext .zkClient.deleteRecursive(ZkUtils. getTopicConfigPath (topic))
controllerContext .zkClient.delete(ZkUtils. getDeleteTopicPath (topic))
controllerContext .removeTopic(topic)
主要作用是解除掉监控分区变动的 listener ,删除 Zookeeper 具体节点信息,删除磁盘数据,更新内存数据结构,比如从副本状态机里面移除分区的具体信息。
其实,最终要的是我们的副本磁盘数据是如何删除的。我们重点介绍这个部分。
首次清除的话,在删除线程 DeleteTopicsThread 的 doWork 方法中
{
// if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
// TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
// or there is at least one failed replica (which means topic deletion should be retried).
if (controller. replicaStateMachine .isAnyReplicaInState(topic , ReplicaDeletionIneligible)) {
// mark topic for deletion retry
markTopicForDeletionRetry(topic)
}
进入 markTopicForDeletionRetry
val failedReplicas = controller.replicaStateMachine.replicasInState(topic , ReplicaDeletionIneligible)
info( "Retrying delete topic for topic %s since replicas %s were not successfully deleted"
.format(topic , failedReplicas.mkString( "," )))
controller. replicaStateMachine .handleStateChanges(failedReplicas , OfflineReplica)
在 ReplicaStateMachine 的 handleStateChanges 方法中,调用了 handleStateChange ,处理 OfflineReplica
// send stop replica command to the replica so that it stops fetching from the leader
brokerRequestBatch .addStopReplicaRequestForBrokers( List (replicaId) , topic , partition , deletePartition = false )
接着在 handleStateChanges 中
brokerRequestBatch .sendRequestsToBrokers(controller.epoch , controllerContext . correlationId .getAndIncrement)
给副本数据存储节点发送 StopReplicaKey 副本指令,并开始删除数据
stopReplicaRequestMap foreach { case (broker , replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true ).map(i => i.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false ).map(i => i.replica).toSet
debug( "The stop replica request (delete = true) sent to broker %d is %s"
.format(broker , stopReplicaWithDelete.mkString( "," )))
debug( "The stop replica request (delete = false) sent to broker %d is %s"
.format(broker , stopReplicaWithoutDelete.mkString( "," )))
replicaInfoList.foreach { r =>
val stopReplicaRequest = new StopReplicaRequest(r.deletePartition ,
Set( TopicAndPartition (r.replica.topic , r.replica.partition)) , controllerId , controllerEpoch , correlationId)
controller.sendRequest(broker , stopReplicaRequest , r.callback)
}
}
stopReplicaRequestMap .clear()
Broker 的 KafkaApis 的 Handle 方法在接受到指令后
case RequestKeys. StopReplicaKey => handleStopReplicaRequest(request)
val (response , error) = replicaManager.stopReplicas(stopReplicaRequest)
接着是在 stopReplicas 方法中
{
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
replicaFetcherManager .removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition (r.topic , r.partition)))
for (topicAndPartition <- stopReplicaRequest.partitions){
val errorCode = stopReplica(topicAndPartition.topic , topicAndPartition.partition , stopReplicaRequest.deletePartitions)
responseMap.put(topicAndPartition , errorCode)
}
(responseMap , ErrorMapping. NoError )
}
进一步进入 stopReplica 方法,正式进入日志删除
getPartition(topic , partitionId) match {
case Some (partition) =>
if (deletePartition) {
val removedPartition = allPartitions .remove((topic , partitionId))
if (removedPartition != null )
removedPartition.delete() // this will delete the local log
}
以上就是 kafka 的整个日志删除流水。
4 ,手动删除 zk 上 topic 信息和磁盘数据
TopicChangeListener 会监听处理,但是处理很简单,只是更新了
val deletedTopics = controllerContext . allTopics -- currentChildren
controllerContext . allTopics = currentChildren
val addedPartitionReplicaAssignment = ZkUtils. getReplicaAssignmentForTopics ( zkClient , newTopics.toSeq)
controllerContext . partitionReplicaAssignment = controllerContext . partitionReplicaAssignment .filter(p =>
四,总结
K afka 的 topic 的删除过程,实际上就是基于 Zookeeper 做了一个订阅发布系统。 Zookeeper 的客户端创建一个节点 /admin/delete_topics/<topic> ,由 kafka Controller 监听到事件之后正式触发 topic 的删除:解除 Partition 变更监听的 listener ,清除内存数据结构,删除副本数据,删除 topic 的相关 Zookeeper 节点。
delete.topic.enable 配置该参数为 false 的情况下执行了 topic 的删除命令,实际上未做任何动作。我们此时要彻底删除 topic 建议修改该参数为 true ,重启 kafka ,这样 topic 信息会被彻底删除,已经测试。
一般流行的做法是手动删除 Zookeeper 的 topic 相关信息及磁盘数据但是这样的话会造成部分内存数据未清除。至于是否会有隐患,未测试。
假如对kafka,hbase,spark源码感兴趣,欢迎大家关注浪尖公众号
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- Kubernetes源码分析之Pod的删除
- 如何提交takedown,删除泄漏源码的仓库和Fork
- GitHub被攻击 黑客删除数百个源码库索要比特币
- 微信实验十四、ThinkPHP5.0用户批量删除及源码下载
- 微信实验十二、ThinkPHP5.0单页浏览、增加、修改、删除用户及源码下载
- MySQL删除操作其实是假删除
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Think Python
Allen B. Downey / O'Reilly Media / 2012-8-23 / GBP 29.99
Think Python is an introduction to Python programming for students with no programming experience. It starts with the most basic concepts of programming, and is carefully designed to define all terms ......一起来看看 《Think Python》 这本书的介绍吧!