Comprehensive Guide to Spring Kafka: Integration, Advanced Features, and Usage

This article provides a detailed tutorial on integrating Kafka with Spring using Spring‑Kafka, covering simple setup, embedded Kafka testing, topic creation, message sending and receiving, transaction support, listener configurations, manual acknowledgment, error handling, retry and dead‑letter queues, and related code examples.

Architect's Tech Stack
Architect's Tech Stack
Architect's Tech Stack
Comprehensive Guide to Spring Kafka: Integration, Advanced Features, and Usage

Simple Integration

Adding Dependency

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

Adding Configuration

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

Test Send and Receive

/**
 * @author: kl @kailing.pub
 * @date: 2019/5/30
 */
@SpringBootApplication
@RestController
public class Application {
    private final Logger logger = LoggerFactory.getLogger(Application.class);
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Autowired
    private KafkaTemplate<Object, Object> template;
    @GetMapping("/send/{input}")
    public void sendFoo(@PathVariable String input) {
        this.template.send("topic_input", input);
    }
    @KafkaListener(id = "webGroup", topics = "topic_input")
    public void listen(String input) {
        logger.info("input value: {}", input);
    }
}

After starting the application, accessing http://localhost:8080/send/kl will log input value: "kl" on the console, demonstrating the basic send/receive flow.

Spring‑Kafka‑Test Embedded Kafka Server

Running the above code requires a real Kafka broker, but Spring‑Kafka‑Test provides an annotation‑driven embedded broker for quick testing.

Adding Dependency

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <version>2.2.6.RELEASE</version>
  <scope>test</scope>
</dependency>

Starting Service

A JUnit test can start an embedded broker with four broker nodes:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4, ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads() throws IOException {
        System.in.read();
    }
}

The @EmbeddedKafka annotation supports parameters such as value / count (broker count), controlledShutdown, ports, and brokerProperties to fine‑tune the embedded cluster.

Creating New Topics

If a topic does not exist, Kafka will create it using default broker properties ( num.partitions=1, num.replica.fetchers=1).

Programmatic Creation at Startup

@Configuration
public class KafkaConfig {
    @Bean
    public KafkaAdmin admin(KafkaProperties properties) {
        KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
        admin.setFatalIfBrokerNotAvailable(true);
        return admin;
    }
    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic-kl", 1, (short)1);
    }
}

When the broker supports version 1.0.0+, increasing partitions for an existing topic is also possible.

Using AdminClient Directly

@Autowired
private KafkaProperties properties;
@Test
public void testCreateTopic() {
    AdminClient client = AdminClient.create(properties.buildAdminProperties());
    try {
        Collection<NewTopic> newTopics = new ArrayList<>(1);
        newTopics.add(new NewTopic("topic-kl", 1, (short)1));
        client.createTopics(newTopics);
    } finally {
        client.close();
    }
}

API‑Based Creation (Kafka 0.8.2.2)

@Test
public void testCreateTopic() throws Exception {
    ZkClient zkClient = new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$);
    String topicName = "topic-kl";
    int partitions = 1;
    int replication = 1;
    AdminUtils.createTopic(zkClient, topicName, partitions, replication, new Properties());
}

Command‑Line Creation

String[] options = {
    "--create",
    "--zookeeper", "127.0.0.1:2181",
    "--replication-factor", "3",
    "--partitions", "3",
    "--topic", "topic-kl"
};
TopicCommand.main(options);

KafkaTemplate Message Sending Exploration

Getting Send Result – Asynchronous

template.send("", "").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
    @Override
    public void onFailure(Throwable throwable) { /* ... */ }
    @Override
    public void onSuccess(SendResult<Object, Object> result) { /* ... */ }
});

Getting Send Result – Synchronous

ListenableFuture<SendResult<Object, Object>> future = template.send("topic-kl", "kl");
try {
    SendResult<Object, Object> result = future.get();
} catch (Throwable e) { e.printStackTrace(); }

Transactional Messages

Enable transactions with the property spring.kafka.producer.transaction-id-prefix=kafka_tx.. When a method is annotated with @Transactional, all sends must occur inside the transaction; otherwise a runtime exception is thrown.

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
    template.executeInTransaction(t -> {
        t.send("topic_input", "kl");
        if ("error".equals(input)) {
            throw new RuntimeException("failed");
        }
        t.send("topic_input", "ckl");
        return true;
    });
}

When the transaction is active, adding @Transactional(rollbackFor = RuntimeException.class) on the method also works.

ReplyingKafkaTemplate – Request/Reply Semantics

ReplyingKafkaTemplate extends KafkaTemplate and adds sendAndReceive to obtain a response from the consumer, enabling RPC‑like communication.

RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
ConsumerRecord<String, String> consumerRecord = replyFuture.get();
System.err.println("Return value: " + consumerRecord.value());

Spring‑Kafka Message Consumption Deep Dive

@KafkaListener Usage

Beyond basic topic listening, @KafkaListener can specify topic partitions, initial offsets, concurrency, and a custom error handler.

@KafkaListener(id = "webGroup", topicPartitions = {
    @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
    @TopicPartition(topic = "topic2", partitions = "0",
        partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
}, concurrency = "6", errorHandler = "myErrorHandler")
public String listen(String input) {
    logger.info("input value: {}", input);
    return "successful";
}

The custom error handler must implement KafkaListenerErrorHandler and be registered in the Spring context with the name referenced in errorHandler.

Manual Acknowledgment Mode

Disable auto‑commit and set spring.kafka.listener.ack-mode=manual. Then inject Acknowledgment into the listener method and call ack.acknowledge() when the message should be committed.

@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input, Acknowledgment ack) {
    logger.info("input value: {}", input);
    if ("kl".equals(input)) {
        ack.acknowledge();
    }
    return "successful";
}

Listener Lifecycle Control

Set autoStartup = "false" on @KafkaListener and control start/stop/pause/resume via KafkaListenerEndpointRegistry.

@Autowired
private KafkaListenerEndpointRegistry registry;
@GetMapping("/stop/{listenerID}")
public void stop(@PathVariable String listenerID) {
    registry.getListenerContainer(listenerID).pause();
}
@GetMapping("/resume/{listenerID}")
public void resume(@PathVariable String listenerID) {
    registry.getListenerContainer(listenerID).resume();
}
@GetMapping("/start/{listenerID}")
public void start(@PathVariable String listenerID) {
    registry.getListenerContainer(listenerID).start();
}

SendTo – Message Forwarding

Use @SendTo("topic-ckl") to forward the processed payload to another topic.

@KafkaListener(id = "webGroup", topics = "topic-kl")
@SendTo("topic-ckl")
public String listen(String input) {
    logger.info("input value: {}", input);
    return input + "hello!";
}

Retry and Dead‑Letter Queue (DLQ)

Configure a SeekToCurrentErrorHandler with a DeadLetterPublishingRecoverer to retry a message up to three times; after that the message is sent to a DLQ topic named {originalTopic}.DLT.

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory,
        KafkaTemplate<Object, Object> template) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3));
    return factory;
}

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
    template.send("topic-kl", input);
}

@KafkaListener(id = "webGroup", topics = "topic-kl")
public String listen(String input) {
    logger.info("input value: {}", input);
    throw new RuntimeException("dlt");
}

@KafkaListener(id = "dltGroup", topics = "topic-kl.DLT")
public void dltListen(String input) {
    logger.info("Received from DLT: " + input);
}

Conclusion

The author explored many powerful features of Spring‑Kafka, such as embedded brokers, request/reply semantics, transactional messaging, advanced listener configurations, manual acknowledgment, error handling, retries, and dead‑letter queues, aiming to help developers adopt Spring‑Kafka more efficiently and avoid common pitfalls.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

MicroservicesspringKafkaMessagingSpring Kafka
Architect's Tech Stack
Written by

Architect's Tech Stack

Java backend, microservices, distributed systems, containerized programming, and more.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.