Boost Kafka Throughput in Spring Boot: Batch Consumption Guide
This article demonstrates how to integrate Kafka with Spring Boot, add necessary dependencies and configuration, implement both single‑message and batch consumers, and tune batch settings to dramatically improve processing speed for millions of records in a microservice environment.
Introduction
In a previous article we introduced Kafka's architecture and how partitioning can accelerate data consumption in a cluster. Theory alone is insufficient; practical implementation is required.
Using a real production scenario, we will show how to employ
SpringBootto consume Kafka data with high throughput.
Practical Implementation
The data source generates over 10 million order records nightly. We need to ingest this data efficiently.
The open‑source mall project (SpringBoot3 + Vue) provides a complete e‑commerce system with microservice architecture, Docker, and K8s deployment. Boot project: https://github.com/macrozheng/mall Cloud project: https://github.com/macrozheng/mall-swarm Video tutorials: https://www.macrozheng.com/video/
2.1 Add Kafka Dependency
The project uses SpringBoot
2.1.5.RELEASEand Kafka
2.2.6.RELEASE.
<code><dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency></code>2.2 Add Kafka Configuration Properties
Insert the following properties into
application.properties:
<code># Kafka server address (comma‑separated for multiple brokers)
spring.kafka.bootstrap-servers=197.168.25.196:9092
# Retry count
spring.kafka.producer.retries=3
# Batch size
spring.kafka.producer.batch-size=1000
# Buffer memory (32 MB)
spring.kafka.producer.buffer-memory=33554432
# Consumer group
spring.kafka.consumer.group-id=crm-microservice-newperformance
# Offset reset
spring.kafka.consumer.auto-offset-reset=earliest
# Max poll records
spring.kafka.consumer.max-poll-records=4000
# Auto commit
spring.kafka.consumer.enable-auto-commit=true
# Auto commit interval (ms)
spring.kafka.consumer.auto-commit-interval=1000</code>2.3 Create a Consumer
<code>@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
@KafkaListener(topics = {"big_data_topic"})
public void consumer(ConsumerRecord<?, ?> consumerRecord) {
log.info("Received bigData message: {}", consumerRecord.toString());
// db.save(consumerRecord);
}
}</code>2.4 Simulate Data Push
<code>@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
public void testSend() {
for (int i = 0; i < 5000; i++) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("datekey", 20210610);
map.put("userid", i);
map.put("salaryAmount", i);
kafkaTemplate.send("big_data_topic", JSONObject.toJSONString(map));
}
}
}</code>Initial single‑message consumption works, but processing 10 million records takes about three hours, which is unacceptable.
2.5 Switch to Batch Consumption
Create a
KafkaConfigurationclass to define producer and consumer factories and a batch listener container:
<code>@Configuration
public class KafkaConfiguration {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
@Value("${spring.kafka.producer.retries}") private Integer retries;
@Value("${spring.kafka.producer.batch-size}") private Integer batchSize;
@Value("${spring.kafka.producer.buffer-memory}") private Integer bufferMemory;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset;
@Value("${spring.kafka.consumer.max-poll-records}") private Integer maxPollRecords;
@Value("${spring.kafka.consumer.batch.concurrency}") private Integer batchConcurrency;
@Value("${spring.kafka.consumer.enable-auto-commit}") private Boolean autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}") private Integer autoCommitInterval;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchListener(true);
return factory;
}
}
</code>Add the new property
spring.kafka.consumer.batch.concurrencyto control the number of consumer threads.
<code># Batch consumer concurrency (≤ topic partitions)
spring.kafka.consumer.batch.concurrency=3
# Max records per poll
spring.kafka.consumer.max-poll-records=4000
# Disable auto‑commit for manual control
spring.kafka.consumer.enable-auto-commit=false</code>Update the listener to use batch mode:
<code>@Component
public class BigDataTopicListener {
private static final Logger log = LoggerFactory.getLogger(BigDataTopicListener.class);
@KafkaListener(topics = {"big_data_topic"}, containerFactory = "batchFactory")
public void batchConsumer(List<ConsumerRecord<?, ?>> consumerRecords, Acknowledgment ack) {
long start = System.currentTimeMillis();
// db.batchSave(consumerRecords);
ack.acknowledge();
log.info("Received bigData batch: {}, consumption time: {}ms", consumerRecords.size(), System.currentTimeMillis() - start);
}
}
</code>With three consumer instances and three topic partitions, processing 5 million records finishes within 30 minutes, a dramatic improvement over the original three‑hour runtime.
In production, adjust
max-poll-recordsand the number of partitions or scale the microservice cluster to meet higher throughput demands, while avoiding excessively large batch sizes that could trigger frequent GC pauses.
Conclusion
This tutorial showed how to use SpringBoot and Kafka to achieve high‑throughput data consumption in a microservice, laying the groundwork for further topics such as failure handling.
macrozheng
Dedicated to Java tech sharing and dissecting top open-source projects. Topics include Spring Boot, Spring Cloud, Docker, Kubernetes and more. Author’s GitHub project “mall” has 50K+ stars.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.