Introduction to Apache Kafka: Architecture, APIs, Code Examples, and Optimization
This article provides a comprehensive overview of Apache Kafka, covering its definition, architecture, command‑line API usage, Java producer and consumer examples, core principles such as high availability and message ordering, configuration tuning, and a summary of its advantages as a high‑throughput, fault‑tolerant streaming platform.
What is Kafka Apache Kafka is an open‑source stream processing platform from the Apache Software Foundation that provides publish‑subscribe messaging, commonly used for system decoupling, asynchronous communication, and traffic shaping. It also offers Kafka Streams for real‑time processing, which runs on the client side with simple deployment.
Kafka Architecture Kafka clusters organize records into topics, each backed by a set of partition logs. Every partition has one broker acting as the leader handling reads and writes, while other brokers serve as followers replicating the data. Leader election and metadata are managed via Zookeeper.
Kafka Command‑Line API
1. Create a topic with three partitions and three replicas:
[root@node01 bin]# kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--create
--topic test
--partitions 3
--replication-factor 32. List topics:
[root@node01 bin]# kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--list3. Describe a topic:
[root@node01 bin]# ./bin/kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--describe
--topic test4. Alter a topic (e.g., change partitions):
[root@node01 kafka_2.11-2.2.0]# ./bin/kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--alter
--topic test
--partitions 25. Delete a topic:
[root@node01 bin]# kafka-topics.sh
--zookeeper node2:2181,node3:2181/kafka
--delete
--topic test6. Produce messages to a topic:
[root@node01 bin]# kafka-console-producer.sh
--broker-list node01:9092,node01:9092,node01:9092
--topic test7. Consume messages from a topic:
[root@node01 bin]# kafka-console-consumer.sh
--bootstrap-server node01:9092,node01:9092,node01:9092
--topic test
--group opentest8. List consumer groups:
[root@node01 bin]# kafka-console-consumer.sh
--bootstrap-server node01:9092,node01:9092,node01:9092
--list9. Describe a consumer group:
[root@node01 bin]# kafka-console-consumer.sh
--bootstrap-server node01:9092,node01:9092,node01:9092
--describe
--group opentestUsing Kafka in Java Programs
1. Maven dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.1.0</version>
</dependency>2. Producer example (simplified):
@Test
public void producer() throws ExecutionException, InterruptedException {
String topic = "items";
Properties p = new Properties();
p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node02:9092,node03:9092,node01:9092");
p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
KafkaProducer
producer = new KafkaProducer<>(p);
while (true) {
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
ProducerRecord
record = new ProducerRecord<>(topic, "item"+j, "val" + i);
Future
send = producer.send(record);
RecordMetadata rm = send.get();
int partition = rm.partition();
long offset = rm.offset();
System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
}
}
}
}3. Consumer example:
@Test
public void consumer() {
Properties p = new Properties();
p.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node02:9092,node03:9092,node01:9092");
p.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
p.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "opentest");
p.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
p.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
KafkaConsumer
consumer = new KafkaConsumer<>(p);
while (true) {
ConsumerRecords
records = consumer.poll(Duration.ofMillis(0));
if (!records.isEmpty()) {
for (ConsumerRecord
record : records) {
System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + record.partition() + " offset: " + record.offset());
}
}
}
}Deep Principles
High availability (HA) is achieved via leader‑follower replication; the leader handles reads/writes while followers sync data.
Message ordering is guaranteed within a partition when producers use the same key, ensuring ordered consumption.
Consumer groups allow multiple consumers to share partitions; a partition is consumed by only one consumer in a group to preserve order.
Acknowledgment (acks) settings control durability: acks=1 (leader only), acks=0 (no wait), acks=all/-1 (all in‑sync replicas).
ISR (in‑sync replica), OSR (out‑of‑sync replica), and AR (assigned replicas) define replica states and affect durability guarantees.
Kafka maintains two index files per log segment: offset index and time index, enabling fast look‑ups.
Optimization Parameters
Broker, producer, and consumer configurations can be tuned (e.g., batch sizes, linger.ms, compression.type) to improve throughput and latency; the article includes illustrative configuration screenshots.
Conclusion
Kafka is favored as a mainstream messaging middleware because it offers decoupling, high throughput with low latency, durability on commodity disks, easy horizontal scaling, fault tolerance through replication, and multi‑language client support.
360 Quality & Efficiency
360 Quality & Efficiency focuses on seamlessly integrating quality and efficiency in R&D, sharing 360’s internal best practices with industry peers to foster collaboration among Chinese enterprises and drive greater efficiency value.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.