Comprehensive Guide to Spring‑Kafka Integration and Advanced Features
This article provides a systematic tutorial on using Spring‑Kafka, covering basic setup, embedded Kafka for testing, topic creation methods, message sending with KafkaTemplate, transactional messaging, request‑reply patterns, advanced @KafkaListener configurations, manual acknowledgment, listener lifecycle control, SendTo forwarding, and retry with dead‑letter queues, all illustrated with complete code examples.
Kafka is a high‑performance messaging system based on topics and partitions; Spring‑Kafka wraps the Apache Kafka client to simplify integration in Spring applications.
Simple Integration
Add the Maven dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>Configure the bootstrap server:
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092Write a test controller that injects KafkaTemplate and sends a message:
@SpringBootApplication
@RestController
public class Application {
private final Logger logger = LoggerFactory.getLogger(Application.class);
@Autowired
private KafkaTemplate<Object, Object> template;
@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}
@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
logger.info("input value: {}", input);
}
}Running the application and accessing http://localhost:8080/send/kl will log the received message.
Embedded Kafka for Tests
Include the test dependency:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.2.6.RELEASE</version>
<scope>test</scope>
</dependency>Start an embedded broker with @EmbeddedKafka:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4, ports = {9092,9093,9094,9095})
public class ApplicationTests {
@Test
public void contextLoads() throws IOException {
System.in.read();
}
}The annotation supports parameters such as count (broker nodes), controlledShutdown, brokerProperties, and brokerPropertiesLocation for fine‑grained configuration.
Topic Creation
Spring‑Kafka can auto‑create topics when a message is sent, using broker defaults ( num.partitions=1, num.replica.fetchers=1). You can also define topics programmatically:
@Configuration
public class KafkaConfig {
@Bean
public KafkaAdmin admin(KafkaProperties properties) {
KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
admin.setFatalIfBrokerNotAvailable(true);
return admin;
}
@Bean
public NewTopic topic2() {
return new NewTopic("topic-kl", 1, (short) 1);
}
}For older versions or custom needs, you can use the native AdminClient or ZK utilities to create topics.
KafkaTemplate Usage
Send messages asynchronously with callbacks:
template.send("", "").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override public void onFailure(Throwable t) { /* ... */ }
@Override public void onSuccess(SendResult<Object, Object> r) { /* ... */ }
});Or synchronously:
ListenableFuture<SendResult<Object, Object>> future = template.send("topic-kl", "kl");
SendResult<Object, Object> result = future.get();Transactional Messaging
Enable transactions via configuration:
spring.kafka.producer.transaction-id-prefix=kafka_tx.Wrap sends in a transaction:
template.executeInTransaction(t -> {
t.send("topic_input", "kl");
if ("error".equals(input)) throw new RuntimeException("failed");
t.send("topic_input", "ckl");
return true;
});Adding @Transactional on the method also works.
Request‑Reply with ReplyingKafkaTemplate
RequestReplyFuture<String, String, String> replyFuture = replyingTemplate.sendAndReceive(record);
ConsumerRecord<String, String> response = replyFuture.get();
System.err.println("Return value: " + response.value());Define a reply container and the template beans, then use @SendTo on the listener to specify the reply topic.
Advanced @KafkaListener Features
Specify topics, partitions, initial offsets, concurrency, and error handlers:
@KafkaListener(id = "webGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "6", errorHandler = "myErrorHandler")
public String listen(String input) { /* ... */ }Implement a custom KafkaListenerErrorHandler bean named myErrorHandler to handle processing errors.
Manual Acknowledgment
Disable auto‑commit and set spring.kafka.listener.ack-mode=manual. In the listener method receive an Acknowledgment argument and call ack.acknowledge() when appropriate.
Listener Lifecycle Control
Set autoStartup="false" on the listener and use KafkaListenerEndpointRegistry to start, pause, or resume containers via HTTP endpoints.
SendTo Forwarding
Use @SendTo("topic-ckl") on a listener to forward processed messages to another topic.
Retry and Dead‑Letter Queue
Configure a container factory with a SeekToCurrentErrorHandler and a DeadLetterPublishingRecoverer to retry up to three times and then publish failed messages to {originalTopic}.DLT:
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));Consume from the dead‑letter topic with a separate @KafkaListener.
Conclusion
The guide demonstrates how Spring‑Kafka simplifies Kafka integration, provides powerful testing utilities, supports transactional and request‑reply patterns, and offers fine‑grained listener control, helping developers avoid common pitfalls when building robust messaging solutions.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect's Tech Stack
Java backend, microservices, distributed systems, containerized programming, and more.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
