Comprehensive Guide to Spring‑Kafka Integration and Advanced Features

This article provides a systematic tutorial on using Spring‑Kafka, covering basic setup, embedded Kafka for testing, topic creation methods, message sending with KafkaTemplate, transactional messaging, request‑reply patterns, advanced @KafkaListener configurations, manual acknowledgment, listener lifecycle control, SendTo forwarding, and retry with dead‑letter queues, all illustrated with complete code examples.

Architect's Tech Stack
Architect's Tech Stack
Architect's Tech Stack
Comprehensive Guide to Spring‑Kafka Integration and Advanced Features

Kafka is a high‑performance messaging system based on topics and partitions; Spring‑Kafka wraps the Apache Kafka client to simplify integration in Spring applications.

Simple Integration

Add the Maven dependency:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

Configure the bootstrap server:

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

Write a test controller that injects KafkaTemplate and sends a message:

@SpringBootApplication
@RestController
public class Application {
    private final Logger logger = LoggerFactory.getLogger(Application.class);
    @Autowired
    private KafkaTemplate<Object, Object> template;

    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        this.template.send("topic_input", input);
    }

    @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}", input);
    }
}

Running the application and accessing http://localhost:8080/send/kl will log the received message.

Embedded Kafka for Tests

Include the test dependency:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <version>2.2.6.RELEASE</version>
  <scope>test</scope>
</dependency>

Start an embedded broker with @EmbeddedKafka:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4, ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads() throws IOException {
        System.in.read();
    }
}

The annotation supports parameters such as count (broker nodes), controlledShutdown, brokerProperties, and brokerPropertiesLocation for fine‑grained configuration.

Topic Creation

Spring‑Kafka can auto‑create topics when a message is sent, using broker defaults ( num.partitions=1, num.replica.fetchers=1). You can also define topics programmatically:

@Configuration
public class KafkaConfig {
    @Bean
    public KafkaAdmin admin(KafkaProperties properties) {
        KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
        admin.setFatalIfBrokerNotAvailable(true);
        return admin;
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic-kl", 1, (short) 1);
    }
}

For older versions or custom needs, you can use the native AdminClient or ZK utilities to create topics.

KafkaTemplate Usage

Send messages asynchronously with callbacks:

template.send("", "").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
    @Override public void onFailure(Throwable t) { /* ... */ }
    @Override public void onSuccess(SendResult<Object, Object> r) { /* ... */ }
});

Or synchronously:

ListenableFuture<SendResult<Object, Object>> future = template.send("topic-kl", "kl");
SendResult<Object, Object> result = future.get();

Transactional Messaging

Enable transactions via configuration:

spring.kafka.producer.transaction-id-prefix=kafka_tx.

Wrap sends in a transaction:

template.executeInTransaction(t -> {
    t.send("topic_input", "kl");
    if ("error".equals(input)) throw new RuntimeException("failed");
    t.send("topic_input", "ckl");
    return true;
});

Adding @Transactional on the method also works.

Request‑Reply with ReplyingKafkaTemplate

RequestReplyFuture<String, String, String> replyFuture = replyingTemplate.sendAndReceive(record);
ConsumerRecord<String, String> response = replyFuture.get();
System.err.println("Return value: " + response.value());

Define a reply container and the template beans, then use @SendTo on the listener to specify the reply topic.

Advanced @KafkaListener Features

Specify topics, partitions, initial offsets, concurrency, and error handlers:

@KafkaListener(id = "webGroup", topicPartitions = {
    @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
    @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "6", errorHandler = "myErrorHandler")
public String listen(String input) { /* ... */ }

Implement a custom KafkaListenerErrorHandler bean named myErrorHandler to handle processing errors.

Manual Acknowledgment

Disable auto‑commit and set spring.kafka.listener.ack-mode=manual. In the listener method receive an Acknowledgment argument and call ack.acknowledge() when appropriate.

Listener Lifecycle Control

Set autoStartup="false" on the listener and use KafkaListenerEndpointRegistry to start, pause, or resume containers via HTTP endpoints.

SendTo Forwarding

Use @SendTo("topic-ckl") on a listener to forward processed messages to another topic.

Retry and Dead‑Letter Queue

Configure a container factory with a SeekToCurrentErrorHandler and a DeadLetterPublishingRecoverer to retry up to three times and then publish failed messages to {originalTopic}.DLT:

factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));

Consume from the dead‑letter topic with a separate @KafkaListener.

Conclusion

The guide demonstrates how Spring‑Kafka simplifies Kafka integration, provides powerful testing utilities, supports transactional and request‑reply patterns, and offers fine‑grained listener control, helping developers avoid common pitfalls when building robust messaging solutions.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

springKafkaMessagingtransactionalSpring KafkaEmbeddedKafka
Architect's Tech Stack
Written by

Architect's Tech Stack

Java backend, microservices, distributed systems, containerized programming, and more.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.