Master Kafka Topic Management and Message Production on CentOS
Learn how to set up Kafka on CentOS, create and manage topics, adjust partitions, produce and consume messages using command‑line tools and Java code, and explore related utilities such as consumer groups, log inspection, and configuration tips for remote access.
Environment: CentOS 8.1 with Zookeeper 3.6.2, Kafka 2.7.0 (Scala 2.13) and JDK 8.
Create a topic
<code>bin/kafka-topics.sh --create --zookeeper localhost:2182 --topic test --partitions 4 --replication-factor 2</code>This creates a topic named
testwith 4 partitions and a replication factor of 2 (one leader and one replica).
View topic information
<code>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test</code>The command displays details such as partition count, replication factor, leader, replicas, and ISR for each partition.
List all topics
<code>bin/kafka-topics.sh --list --zookeeper localhost:2181</code>Increase partition count
<code>bin/kafka-topics.sh --alter --zookeeper localhost:2182 --topic test --partitions 6</code>Check partition offsets
<code>bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1</code>Delete a topic
<code>bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test</code>Produce messages (CLI)
<code>bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test</code>Consume messages (CLI)
<code>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group g1</code>Options:
--from-beginningstarts consumption from the earliest offset;
--groupspecifies the consumer group.
List consumer groups
<code>bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list</code>Describe a consumer group
<code>bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g1 --members</code>Inspect log directories
<code>bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list test</code>Dump a log file
<code>bin/kafka-dump-log.sh --files /root/sf/datas/kafka/9094/test-0/00000000000000000000.log --print-data-log</code>Java producer example
<code>public class KafkaProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.100.101.105:9092,10.100.101.105:9093,10.100.101.105:9094");
props.put("group.id", "g1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int i = 0;
for (int n = 0; n < 10; n++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test", ++i + "", "msg " + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
}
}
});
TimeUnit.SECONDS.sleep(1);
}
}
}
</code>Java consumer example
<code>public class KafkaDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "10.100.101.105:9092, 10.100.101.105:9093, 10.100.101.105:9094");
props.put("group.id", "g1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "2000");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
}
</code>Remote access configuration
When Kafka runs on a different machine from your client, edit
config/server.propertiesto enable remote connections.
Future work will cover Spring Boot integration.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.