Guide to Using Apache Pulsar Java Client with Spring Boot
This tutorial explains how to deploy a single‑node Pulsar cluster with Docker, configure the Java client in a Spring Boot application, and create producers and consumers with detailed code examples and parameter explanations.
Apache Pulsar is a popular message‑streaming platform, and this article walks through deploying a single‑node Pulsar cluster using Docker and integrating the Pulsar Java client with a Spring Boot application.
Deploy Pulsar
Pulsar can be installed locally, via Docker, or on Kubernetes; the article uses Docker to run a standalone instance on a machine with 2 CPU cores and 4 GB RAM.
docker run -it -p 6650:6650 -p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.9.1 bin/pulsar standaloneIf the Docker version is too old, the --mount flag may cause an "unknown flag" error; upgrading Docker to version 17.06 or later resolves this.
Successful startup is indicated by log messages such as:
2022-01-08T22:27:58,726+0000 [main] INFO org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standaloneAfter the local single‑node cluster starts, a namespace named public/default is created.
Pulsar Clients
Pulsar provides client libraries for Java, Go, Python, C++, Node.js, WebSocket, and C#.
Spring Boot Configuration
Add the Pulsar client dependency to the Maven pom.xml :
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.9.1</version>
</dependency>Define connection properties in application.properties :
# Pulsar address
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroupCreate Client
Instantiate the client using the URL from the properties file:
client = PulsarClient.builder()
.serviceUrl(url)
.build();Create Producer
producer = client.newProducer()
.topic(topic)
.compressionType(CompressionType.LZ4)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(true)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.batchingMaxMessages(1000)
.maxPendingMessages(1000)
.blockIfQueueFull(true)
.roundRobinRouterBatchingPartitionSwitchFrequency(10)
.batcherBuilder(BatcherBuilder.DEFAULT)
.create();The article explains each producer parameter, such as topic , compressionType , sendTimeout , enableBatching , and queue‑related settings.
Create Consumer
Consumer creation includes subscription configuration and supports four subscription types (Exclusive, Failover, Shared, Key_Shared):
consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
.receiverQueueSize(1000)
.subscribe();Parameters such as subscriptionInitialPosition (Latest/Earliest) and receiverQueueSize are described.
Message Receiving
Four receive methods are shown: synchronous single, synchronous batch, asynchronous single, and asynchronous batch, with an example of batch receive policy configuration.
consumer = client.newConsumer()
.topic(topic)
.subscriptionName(subscription)
.batchReceivePolicy(BatchReceivePolicy.builder()
.maxNumMessages(100)
.maxNumBytes(1024 * 1024)
.timeout(200, TimeUnit.MILLISECONDS)
.build())
.subscribe();Testing
A producer method sends messages asynchronously, logging success or failure, and a consumer loop receives, logs, and acknowledges messages. The controller exposes an HTTP endpoint to trigger message sending:
@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String key, @RequestParam String data) {
logger.info("收到消息发送请求, key:{}, value:{}", key, data);
pulsarProducer.sendMsg(key, data);
return "success";
}Calling http://192.168.157.1:8083/pulsar/send?key=key1&data=data1 results in logs showing successful send and receive of the message.
Summary
The Pulsar Java API is straightforward to use within Spring Boot; however, consumer configuration requires careful consideration of batching, asynchronous processing, and subscription types.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.