Master Kafka Streams in Spring Boot: Real‑Time Data Processing with Code Samples
This guide walks through setting up Kafka Streams with Spring Boot 2.3, covering environment configuration, core concepts, topology design, and multiple practical examples—including message sending, listening, transformations, aggregations, filtering, branching, and multi‑field grouping—complete with full code snippets and execution results.
Environment
Spring Boot 2.3.12.RELEASE, kafka_2.13-2.7.0, Zookeeper 3.6.2.
Kafka Streams Overview
Kafka introduced the Streams API in version 0.10, enabling real‑time processing and analysis of data stored in Kafka.
Stream processing works with unbounded data, continuously ingesting input and producing incremental results.
Kafka Streams is a lightweight client library that supports event‑time vs processing‑time, windowing, state management and interactive queries.
Key features include:
Simple, lightweight Java client library.
No extra dependencies beyond Kafka; leverages Kafka partitioning for scalability and ordering.
Fault‑tolerant state stores for efficient windowed joins and aggregations.
Exactly‑once semantics.
Record‑level processing with millisecond latency.
High‑level DSL and low‑level Processor API.
Stream Processing Topology
A stream represents an infinite, ordered, replayable and fault‑tolerant sequence of immutable records (key‑value pairs).
A Stream Processing Application uses Kafka Streams to define processor topologies, where each processor node transforms incoming records and may emit one or more output records.
Special processors:
Source Processor reads from one or more Kafka topics and creates the input stream.
Sink Processor writes records received from upstream processors to a specified Kafka topic.
Dependencies
<code><dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</code>Configuration
<code>server:
port: 9090
spring:
application:
name: kafka-demo
kafka:
streams:
application-id: ${spring.application.name}
properties:
spring.json.trusted.packages: '*'
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
producer:
acks: 1
retries: 10
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.trusted.packages: '*'
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
enable-auto-commit: false
group-id: ConsumerTest
auto-offset-reset: latest
properties:
session.timeout.ms: 12000
heartbeat.interval.ms: 3000
max.poll.records: 100
spring.json.trusted.packages: '*'
listener:
ack-mode: manual-immediate
type: batch
concurrency: 8
properties:
max.poll.interval.ms: 300000
</code>Message Sending
<code>@Service
public class MessageSend {
@Resource
private KafkaTemplate<String, Message> kafkaTemplate;
public void sendMessage2(Message message) {
kafkaTemplate.send(new ProducerRecord<String, Message>("test", message))
.addCallback(result -> System.out.println("执行成功..." + Thread.currentThread().getName()),
ex -> { System.out.println("执行失败"); ex.printStackTrace(); });
}
}
</code>Message Listening
<code>@KafkaListener(topics = {"test"})
public void listener2(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {
for (ConsumerRecord<String, Message> record : records) {
System.out.println(this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName()
+ ", key: " + record.key() + ", 接收到消息:" + record.value()
+ ", partition: " + record.partition() + ", offset: " + record.offset());
}
try { TimeUnit.SECONDS.sleep(0); } catch (InterruptedException e) { e.printStackTrace(); }
ack.acknowledge();
}
@KafkaListener(topics = {"demo"})
public void listenerDemo(List<ConsumerRecord<String, Message>> records, Acknowledgment ack) {
for (ConsumerRecord<String, Message> record : records) {
System.out.println("Demo Topic: " + this.getClass().hashCode() + ", Thread" + Thread.currentThread().getName()
+ ", key: " + record.key() + ", 接收到消息:" + record.value()
+ ", partition: " + record.partition() + ", offset: " + record.offset());
}
ack.acknowledge();
}
</code>Kafka Streams Processing Examples
Message transformation and forwarding
<code>@Bean
public KStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
KStream<Object, Object> stream = streamsBuilder.stream("test");
stream.map((key, value) -> {
System.out.println("原始消息内容:" + new String((byte[]) value, Charset.forName("UTF-8")));
return new KeyValue<>(key, "{\"title\": \"123123\", \"message\": \"重新定义内容\"}".getBytes(Charset.forName("UTF-8")));
}).to("demo");
return stream;
}
</code>Stream object processing
<code>@Bean
public KStream<String, Message> kStream4(StreamsBuilder streamsBuilder) {
JsonSerde<Message> jsonSerde = new JsonSerde<>();
JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
descri.addTrustedPackages("*");
KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
stream.map((key, value) -> { value.setTitle("XXXXXXX"); return new KeyValue<>(key, value); })
.to("demo", Produced.with(Serdes.String(), jsonSerde));
return stream;
}
</code>Grouped processing
<code>@Bean
public KStream<String, Message> kStream5(StreamsBuilder streamsBuilder) {
JsonSerde<Message> jsonSerde = new JsonSerde<>();
JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
descri.addTrustedPackages("*");
KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
stream.selectKey((key, value) -> value.getOrgCode())
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.count()
.toStream().print(Printed.toSysOut());
return stream;
}
</code>Aggregation
<code>@Bean
public KStream<String, Message> kStream6(StreamsBuilder streamsBuilder) {
JsonSerde<Message> jsonSerde = new JsonSerde<>();
JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
descri.addTrustedPackages("*");
KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
stream.selectKey((key, value) -> value.getOrgCode())
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.aggregate(() -> 0L,
(key, value, agg) -> { System.out.println("key = " + key + ", value = " + value + ", agg = " + agg); return agg + 1; },
Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
.toStream().print(Printed.toSysOut());
return stream;
}
</code>Filter
<code>@Bean
public KStream<String, Message> kStream7(StreamsBuilder streamsBuilder) {
JsonSerde<Message> jsonSerde = new JsonSerde<>();
JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
descri.addTrustedPackages("*");
KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
stream.selectKey((key, value) -> value.getOrgCode())
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.aggregate(() -> 0L,
(key, value, agg) -> { System.out.println("key = " + key + ", value = " + value + ", agg = " + agg); return agg + 1; },
Materialized.<String, Long, KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
.toStream()
.filter((key, value) -> !"2".equals(key))
.print(Printed.toSysOut());
return stream;
}
</code>Branching (multiple streams)
<code>@Bean
public KStream<String, Message> kStream8(StreamsBuilder streamsBuilder) {
JsonSerde<Message> jsonSerde = new JsonSerde<>();
JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
descri.addTrustedPackages("*");
KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
KStream<String, Message>[] arrStream = stream.branch(
(key, value) -> "男".equals(value.getSex()),
(key, value) -> "女".equals(value.getSex()));
Stream.of(arrStream).forEach(as -> as.foreach((key, message) ->
System.out.println(Thread.currentThread().getName() + ", key = " + key + ", message = " + message)));
return stream;
}
</code>Multi‑field grouping (single selectKey limitation)
<code>@Bean
public KStream<String, Message> kStreamM2(StreamsBuilder streamsBuilder) {
JsonSerde<Message> jsonSerde = new JsonSerde<>();
JsonDeserializer<Message> descri = (JsonDeserializer<Message>) jsonSerde.deserializer();
descri.addTrustedPackages("*");
KStream<String, Message> stream = streamsBuilder.stream("test", Consumed.with(Serdes.String(), jsonSerde));
stream.selectKey((key, value) -> value.getTime() + " | " + value.getOrgCode())
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.count()
.toStream().print(Printed.toSysOut());
return stream;
}
</code>The tutorial concludes with a complete demonstration of Kafka Streams capabilities in a Spring Boot project.
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.