Kafka Client API Performance Testing with Producer and Consumer Examples
This article introduces Kafka, a high‑performance distributed messaging system, and provides step‑by‑step Java/Groovy examples for configuring a producer and consumer, demonstrating how to benchmark their throughput using the FunQpsConcurrent framework, along with necessary Gradle dependencies and server setup instructions.
Kafka is a high‑performance distributed messaging system originally developed by LinkedIn for processing massive real‑time data streams. It follows a publish/subscribe model, offering high reliability, high throughput, and low latency, making it suitable for log collection, event stream processing, real‑time monitoring, and many other scenarios.
Dependencies
The project uses Gradle; the required Kafka client library is added with the following dependency:
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '3.4.0'Kafka Server
For local testing the latest Kafka distribution (e.g., kafka_2.12-3.4.0 ) can be used. This version runs without a separate Zookeeper instance, providing an out‑of‑the‑box experience for quick functional verification.
Producer Performance Test Demo
The producer requires several configuration properties. The example below uses the default settings and demonstrates how to send messages in a performance‑testing loop using the FunQpsConcurrent utility.
package com.funtest.kafka
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import com.funtester.utils.StringUtil
import groovy.util.logging.Log4j2
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
@Log4j2
class Produce extends SourceCode {
static void main(String[] args) {
Properties properties = new Properties()
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName())
properties.setProperty(ProducerConfig.RETRIES_CONFIG, "3")
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all")
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name)
KafkaProducer
producer = new KafkaProducer<>(properties)
def topic = "testkafka"
def test = {
producer.send(new ProducerRecord<>(topic, StringUtil.getString(10)))
}
new FunQpsConcurrent(test, "Kafka测试").start()
producer.close()
}
}The call to producer.close() can be omitted in a simple main method because the JVM will terminate after the method finishes.
Consumer
The consumer side also requires a set of configuration properties. Kafka supports two subscription models: the automatic subscription mode (where the consumer group coordinator assigns partitions) and the manual assignment mode (where the application explicitly selects partitions). This example uses the simpler subscription mode.
package com.funtest.kafka
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.FunQpsConcurrent
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.time.Duration
class Cunsumer extends SourceCode {
static void main(String[] args) {
KafkaConsumer
consumer
Properties properties = new Properties()
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "FunTester32")
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10")
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
consumer = new KafkaConsumer<>(properties)
String topic = "testkafka"
consumer.subscribe([topic]) // subscription mode
while (true) {
ConsumerRecords
records = consumer.poll(Duration.ofMillis(1000))
for (ConsumerRecord
record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())
}
sleep(1.0)
}
def test = {
consumer.poll(Duration.ofMillis(1000))
}
new FunQpsConcurrent(test, "Kafka消费").start()
consumer.close()
}
}Running these examples requires a Kafka broker to be available; the author notes that a remote server may be needed for extensive performance testing.
FunTester Original Recommendations FunTester Declaration (ChatGPT version) FunTester 900 Original Collection -- By FunTester
FunTester
10k followers, 1k articles | completely useless
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.