Operations 17 min read

Inside Kafka's Topic Deletion: Code Walkthrough & Process Explained

This article explains the complete Kafka topic deletion workflow, from the client’s deleteTopics request through Zookeeper node creation, controller coordination, broker StopReplica handling, log renaming, delayed file removal, and final cleanup, while providing code excerpts and practical Q&A for common pitfalls.

Ops Development Stories
Ops Development Stories
Ops Development Stories
Inside Kafka's Topic Deletion: Code Walkthrough & Process Explained

Pre‑reading Questions

When is a node written under

/admin/delete_topics

?

When is the topic's disk log actually deleted?

Does the Controller notify all Brokers or only those that host the deleted topic when sending StopReplica?

What happens if a Broker is offline or the delete operation fails?

What occurs if a deletion is attempted during a partition reassignment?

What if a partition‑replica reassignment happens while deletion is in progress?

What happens if you manually delete

/brokers/topics/{topicName}

in ZK?

01 Delete Topic Command

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test

You can delete topics using regular‑expression matching by wrapping the topic name in double quotes, e.g., to delete all topics starting with

create_topic_byhand_zk

:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic "create_topic_byhand_zk.*"

02 Delete Any Topic (Use With Caution)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic ".*?" More usage details are available in regular‑expression documentation.

02 Related Configuration

Config

Description

Default

file.delete.delay.ms

Delay before the log file is actually removed after the topic is marked for deletion

60000

delete.topic.enable

Whether topic deletion is allowed

true

Source Code Overview

Client initiates a deleteTopics request

<code>KafkaApis.handle
AdminManager.deleteTopics</code>

The controller processes the request:

<code>def deleteTopics(timeout: Int, topics: Set[String], responseCallback: Map[String, Errors] => Unit): Unit = {
  // 1. Write delete marker to ZK under /admin/delete_topics/TopicName
  // 2. If timeout <= 0 or ZK write fails, return error immediately
  // 3. Otherwise schedule delayed deletion
}
</code>

Controller then watches ZK changes and starts the deletion flow:

<code>private def processTopicDeletion(): Unit = {
  if (!isActive) return
  var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
  // Clean up non‑existent topics
  // If delete.topic.enable is true, enqueue topics for deletion
  // Otherwise remove ZK nodes under /admin/delete_topics
}
</code>

03 Controller Handles deleteTopics Request

The controller writes a ZK node

/admin/delete_topics/TopicName

to mark the topic for deletion.

04 Controller Listens for Changes and Executes Deletion

<code>private def processTopicDeletion(): Unit = { … }
</code>

Key steps include:

Write delete marker to ZK.

If timeout <= 0 or ZK write fails, return error.

Otherwise, enqueue the topic for delayed deletion.

05 Delete Topic Flow

Controller notifies relevant brokers via StopReplica requests. Brokers rename the log directory to

logdir.uuid.delete

and add it to a deletion queue.

<code>def asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log = {
  // Rename log directory and schedule actual file deletion
}
</code>

A dedicated thread

kafka-delete-logs

periodically deletes logs whose

file.delete.delay.ms

interval has elapsed:

<code>private def deleteLogs(): Unit = {
  var nextDelayMs = 0L
  // Iterate over logsToBeDeleted and delete them when delay expires
}
</code>

06 StopReplica Request Success Callback

When a broker successfully processes StopReplica, the controller receives

TopicDeletionStopReplicaResponseReceived

and updates replica states. If all replicas are marked

ReplicaDeletionSuccessful

, the controller completes the topic deletion:

<code>private def completeDeleteTopic(topic: String): Unit = {
  client.mutePartitionModifications(topic)
  // Clean up ZK nodes: /brokers/topics, /config/topics, /admin/delete_topics
  // Remove topic from controller metadata
}
</code>

Q&A Summary

When is a node written under /admin/delete_topics? When the client calls

deleteTopics

, the controller writes the node.

When is the disk log actually deleted? After the controller sends StopReplica to live brokers, brokers rename logs and a background thread deletes them after

file.delete.delay.ms

(default 60 s).

Does the controller notify all brokers? Only brokers that host replicas of the topic are notified.

What if a broker is offline or deletion fails? The controller keeps retrying until the broker comes back online and the deletion succeeds.

What if deletion is attempted during a partition reassignment? Deletion waits until the reassignment finishes before proceeding.

What if you manually delete /brokers/topics/{topicName} in ZK? If the topic does not exist, the node is simply removed; if it exists, the normal deletion flow is triggered.

ZookeeperKafkaBrokerScalaLog CleanupTopic Deletion
Ops Development Stories
Written by

Ops Development Stories

Maintained by a like‑minded team, covering both operations and development. Topics span Linux ops, DevOps toolchain, Kubernetes containerization, monitoring, log collection, network security, and Python or Go development. Team members: Qiao Ke, wanger, Dong Ge, Su Xin, Hua Zai, Zheng Ge, Teacher Xia.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.