Backend Development 11 min read

Guide to Using Apache Pulsar Java Client with Spring Boot

This tutorial explains how to deploy a single‑node Pulsar cluster with Docker, configure the Java client in a Spring Boot application, and create producers and consumers with detailed code examples and parameter explanations.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Guide to Using Apache Pulsar Java Client with Spring Boot

Apache Pulsar is a popular message‑streaming platform, and this article walks through deploying a single‑node Pulsar cluster using Docker and integrating the Pulsar Java client with a Spring Boot application.

Deploy Pulsar

Pulsar can be installed locally, via Docker, or on Kubernetes; the article uses Docker to run a standalone instance on a machine with 2 CPU cores and 4 GB RAM.

docker run -it -p 6650:6650 -p 8080:8080 \
  --mount source=pulsardata,target=/pulsar/data \
  --mount source=pulsarconf,target=/pulsar/conf \
  apachepulsar/pulsar:2.9.1 bin/pulsar standalone

If the Docker version is too old, the --mount flag may cause an "unknown flag" error; upgrading Docker to version 17.06 or later resolves this.

Successful startup is indicated by log messages such as:

2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone
After the local single‑node cluster starts, a namespace named public/default is created.

Pulsar Clients

Pulsar provides client libraries for Java, Go, Python, C++, Node.js, WebSocket, and C#.

Spring Boot Configuration

Add the Pulsar client dependency to the Maven pom.xml :

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.9.1</version>
</dependency>

Define connection properties in application.properties :

# Pulsar address
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

Create Client

Instantiate the client using the URL from the properties file:

client = PulsarClient.builder()
        .serviceUrl(url)
        .build();

Create Producer

producer = client.newProducer()
        .topic(topic)
        .compressionType(CompressionType.LZ4)
        .sendTimeout(0, TimeUnit.SECONDS)
        .enableBatching(true)
        .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
        .batchingMaxMessages(1000)
        .maxPendingMessages(1000)
        .blockIfQueueFull(true)
        .roundRobinRouterBatchingPartitionSwitchFrequency(10)
        .batcherBuilder(BatcherBuilder.DEFAULT)
        .create();

The article explains each producer parameter, such as topic , compressionType , sendTimeout , enableBatching , and queue‑related settings.

Create Consumer

Consumer creation includes subscription configuration and supports four subscription types (Exclusive, Failover, Shared, Key_Shared):

consumer = client.newConsumer()
        .topic(topic)
        .subscriptionName(subscription)
        .subscriptionType(SubscriptionType.Shared)
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .negativeAckRedeliveryDelay(60, TimeUnit.SECONDS)
        .receiverQueueSize(1000)
        .subscribe();

Parameters such as subscriptionInitialPosition (Latest/Earliest) and receiverQueueSize are described.

Message Receiving

Four receive methods are shown: synchronous single, synchronous batch, asynchronous single, and asynchronous batch, with an example of batch receive policy configuration.

consumer = client.newConsumer()
    .topic(topic)
    .subscriptionName(subscription)
    .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages(100)
        .maxNumBytes(1024 * 1024)
        .timeout(200, TimeUnit.MILLISECONDS)
        .build())
    .subscribe();

Testing

A producer method sends messages asynchronously, logging success or failure, and a consumer loop receives, logs, and acknowledges messages. The controller exposes an HTTP endpoint to trigger message sending:

@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String key, @RequestParam String data) {
    logger.info("收到消息发送请求, key:{}, value:{}", key, data);
    pulsarProducer.sendMsg(key, data);
    return "success";
}

Calling http://192.168.157.1:8083/pulsar/send?key=key1&data=data1 results in logs showing successful send and receive of the message.

Summary

The Pulsar Java API is straightforward to use within Spring Boot; however, consumer configuration requires careful consideration of batching, asynchronous processing, and subscription types.

JavaDockerSpring BootMessagingConsumerproducerApache Pulsar
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.