Inside Kafka's Topic Deletion: Code Walkthrough & Process Explained
This article explains the complete Kafka topic deletion workflow, from the client’s deleteTopics request through Zookeeper node creation, controller coordination, broker StopReplica handling, log renaming, delayed file removal, and final cleanup, while providing code excerpts and practical Q&A for common pitfalls.
Pre‑reading Questions
When is a node written under
/admin/delete_topics?
When is the topic's disk log actually deleted?
Does the Controller notify all Brokers or only those that host the deleted topic when sending StopReplica?
What happens if a Broker is offline or the delete operation fails?
What occurs if a deletion is attempted during a partition reassignment?
What if a partition‑replica reassignment happens while deletion is in progress?
What happens if you manually delete
/brokers/topics/{topicName}in ZK?
01 Delete Topic Command
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test
You can delete topics using regular‑expression matching by wrapping the topic name in double quotes, e.g., to delete all topics starting with
create_topic_byhand_zk:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"
02 Delete Any Topic (Use With Caution)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?" More usage details are available in regular‑expression documentation.
02 Related Configuration
Config
Description
Default
file.delete.delay.ms
Delay before the log file is actually removed after the topic is marked for deletion
60000
delete.topic.enable
Whether topic deletion is allowed
true
Source Code Overview
Client initiates a deleteTopics request
<code>KafkaApis.handle
AdminManager.deleteTopics</code>The controller processes the request:
<code>def deleteTopics(timeout: Int, topics: Set[String], responseCallback: Map[String, Errors] => Unit): Unit = {
// 1. Write delete marker to ZK under /admin/delete_topics/TopicName
// 2. If timeout <= 0 or ZK write fails, return error immediately
// 3. Otherwise schedule delayed deletion
}
</code>Controller then watches ZK changes and starts the deletion flow:
<code>private def processTopicDeletion(): Unit = {
if (!isActive) return
var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
// Clean up non‑existent topics
// If delete.topic.enable is true, enqueue topics for deletion
// Otherwise remove ZK nodes under /admin/delete_topics
}
</code>03 Controller Handles deleteTopics Request
The controller writes a ZK node
/admin/delete_topics/TopicNameto mark the topic for deletion.
04 Controller Listens for Changes and Executes Deletion
<code>private def processTopicDeletion(): Unit = { … }
</code>Key steps include:
Write delete marker to ZK.
If timeout <= 0 or ZK write fails, return error.
Otherwise, enqueue the topic for delayed deletion.
05 Delete Topic Flow
Controller notifies relevant brokers via StopReplica requests. Brokers rename the log directory to
logdir.uuid.deleteand add it to a deletion queue.
<code>def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
// Rename log directory and schedule actual file deletion
}
</code>A dedicated thread
kafka-delete-logsperiodically deletes logs whose
file.delete.delay.msinterval has elapsed:
<code>private def deleteLogs(): Unit = {
var nextDelayMs = 0L
// Iterate over logsToBeDeleted and delete them when delay expires
}
</code>06 StopReplica Request Success Callback
When a broker successfully processes StopReplica, the controller receives
TopicDeletionStopReplicaResponseReceivedand updates replica states. If all replicas are marked
ReplicaDeletionSuccessful, the controller completes the topic deletion:
<code>private def completeDeleteTopic(topic: String): Unit = {
client.mutePartitionModifications(topic)
// Clean up ZK nodes: /brokers/topics, /config/topics, /admin/delete_topics
// Remove topic from controller metadata
}
</code>Q&A Summary
When is a node written under /admin/delete_topics? When the client calls
deleteTopics, the controller writes the node.
When is the disk log actually deleted? After the controller sends StopReplica to live brokers, brokers rename logs and a background thread deletes them after
file.delete.delay.ms(default 60 s).
Does the controller notify all brokers? Only brokers that host replicas of the topic are notified.
What if a broker is offline or deletion fails? The controller keeps retrying until the broker comes back online and the deletion succeeds.
What if deletion is attempted during a partition reassignment? Deletion waits until the reassignment finishes before proceeding.
What if you manually delete /brokers/topics/{topicName} in ZK? If the topic does not exist, the node is simply removed; if it exists, the normal deletion flow is triggered.
Ops Development Stories
Maintained by a like‑minded team, covering both operations and development. Topics span Linux ops, DevOps toolchain, Kubernetes containerization, monitoring, log collection, network security, and Python or Go development. Team members: Qiao Ke, wanger, Dong Ge, Su Xin, Hua Zai, Zheng Ge, Teacher Xia.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.