Comprehensive Guide to Spring Kafka: Integration, Advanced Features, and Usage
This article provides a detailed tutorial on integrating Kafka with Spring using Spring‑Kafka, covering simple setup, embedded Kafka testing, topic creation, message sending and receiving, transaction support, listener configurations, manual acknowledgment, error handling, retry and dead‑letter queues, and related code examples.
Simple Integration
Adding Dependency
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>Adding Configuration
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092Test Send and Receive
/**
* @author: kl @kailing.pub
* @date: 2019/5/30
*/
@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private KafkaTemplate<Object, Object> template;
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}
@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
logger.info("input value: {}", input);
}
}After starting the application, accessing http://localhost:8080/send/kl will log input value: "kl" on the console, demonstrating the basic send/receive flow.
Spring‑Kafka‑Test Embedded Kafka Server
Running the above code requires a real Kafka broker, but Spring‑Kafka‑Test provides an annotation‑driven embedded broker for quick testing.
Adding Dependency
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.2.6.RELEASE</version>
<scope>test</scope>
</dependency>Starting Service
A JUnit test can start an embedded broker with four broker nodes:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4, ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads() throws IOException {
System.in.read();
}
}The @EmbeddedKafka annotation supports parameters such as value / count (broker count), controlledShutdown, ports, and brokerProperties to fine‑tune the embedded cluster.
Creating New Topics
If a topic does not exist, Kafka will create it using default broker properties ( num.partitions=1, num.replica.fetchers=1).
Programmatic Creation at Startup
@Configuration
public class KafkaConfig {
@Bean
public KafkaAdmin admin(KafkaProperties properties) {
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
admin.setFatalIfBrokerNotAvailable(true);
return admin;
}
@Bean
public NewTopic topic2() {
return new NewTopic("topic-kl", 1, (short)1);
}
}When the broker supports version 1.0.0+, increasing partitions for an existing topic is also possible.
Using AdminClient Directly
@Autowired
private KafkaProperties properties;
@Test
public void testCreateTopic() {
AdminClient client = AdminClient.create(properties.buildAdminProperties());
try {
Collection<NewTopic> newTopics = new ArrayList<>(1);
newTopics.add(new NewTopic("topic-kl", 1, (short)1));
client.createTopics(newTopics);
} finally {
client.close();
}
}API‑Based Creation (Kafka 0.8.2.2)
@Test
public void testCreateTopic() throws Exception {
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$);
String topicName = "topic-kl";
int partitions = 1;
int replication = 1;
AdminUtils.createTopic(zkClient, topicName, partitions, replication, new Properties());
}Command‑Line Creation
String[] options = {
"--create",
"--zookeeper", "127.0.0.1:2181",
"--replication-factor", "3",
"--partitions", "3",
"--topic", "topic-kl"
};
TopicCommand.main(options);KafkaTemplate Message Sending Exploration
Getting Send Result – Asynchronous
template.send("", "").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) { /* ... */ }
@Override
public void onSuccess(SendResult<Object, Object> result) { /* ... */ }
});Getting Send Result – Synchronous
ListenableFuture<SendResult<Object, Object>> future = template.send("topic-kl", "kl");
try {
SendResult<Object, Object> result = future.get();
} catch (Throwable e) { e.printStackTrace(); }Transactional Messages
Enable transactions with the property spring.kafka.producer.transaction-id-prefix=kafka_tx.. When a method is annotated with @Transactional, all sends must occur inside the transaction; otherwise a runtime exception is thrown.
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
template.executeInTransaction(t -> {
t.send("topic_input", "kl");
if ("error".equals(input)) {
throw new RuntimeException("failed");
}
t.send("topic_input", "ckl");
return true;
});
}When the transaction is active, adding @Transactional(rollbackFor = RuntimeException.class) on the method also works.
ReplyingKafkaTemplate – Request/Reply Semantics
ReplyingKafkaTemplate extends KafkaTemplate and adds sendAndReceive to obtain a response from the consumer, enabling RPC‑like communication.
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
System.err.println("Return value: " + consumerRecord.value());Spring‑Kafka Message Consumption Deep Dive
@KafkaListener Usage
Beyond basic topic listening, @KafkaListener can specify topic partitions, initial offsets, concurrency, and a custom error handler.
@KafkaListener(id = "webGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "6", errorHandler = "myErrorHandler")
public String listen(String input) {
logger.info("input value: {}", input);
return "successful";
}The custom error handler must implement KafkaListenerErrorHandler and be registered in the Spring context with the name referenced in errorHandler.
Manual Acknowledgment Mode
Disable auto‑commit and set spring.kafka.listener.ack-mode=manual. Then inject Acknowledgment into the listener method and call ack.acknowledge() when the message should be committed.
@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input, Acknowledgment ack) {
logger.info("input value: {}", input);
if ("kl".equals(input)) {
ack.acknowledge();
}
return "successful";
}Listener Lifecycle Control
Set autoStartup = "false" on @KafkaListener and control start/stop/pause/resume via KafkaListenerEndpointRegistry.
@Autowired
private KafkaListenerEndpointRegistry registry;
@GetMapping("/stop/{listenerID}")
public void stop(@PathVariable String listenerID) {
registry.getListenerContainer(listenerID).pause();
}
@GetMapping("/resume/{listenerID}")
public void resume(@PathVariable String listenerID) {
registry.getListenerContainer(listenerID).resume();
}
@GetMapping("/start/{listenerID}")
public void start(@PathVariable String listenerID) {
registry.getListenerContainer(listenerID).start();
}SendTo – Message Forwarding
Use @SendTo("topic-ckl") to forward the processed payload to another topic.
@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo("topic-ckl")
public String listen(String input) {
logger.info("input value: {}", input);
return input + "hello!";
}Retry and Dead‑Letter Queue (DLQ)
Configure a SeekToCurrentErrorHandler with a DeadLetterPublishingRecoverer to retry a message up to three times; after that the message is sent to a DLQ topic named {originalTopic}.DLT.
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
template.send("topic-kl", input);
}
@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input) {
logger.info("input value: {}", input);
throw new RuntimeException("dlt");
}
@KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
public void dltListen(String input) {
logger.info("Received from DLT: " + input);
}Conclusion
The author explored many powerful features of Spring‑Kafka, such as embedded brokers, request/reply semantics, transactional messaging, advanced listener configurations, manual acknowledgment, error handling, retries, and dead‑letter queues, aiming to help developers adopt Spring‑Kafka more efficiently and avoid common pitfalls.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect's Tech Stack
Java backend, microservices, distributed systems, containerized programming, and more.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
