Boost Your IoT Apps with mica-mqtt: High‑Performance MQTT Server & Client Guide
The article introduces mica-mqtt, a lightweight, low‑latency, high‑performance MQTT component built on t‑io, detailing its features, configuration for Spring Boot and plain Java projects, custom interfaces, clustering, monitoring with Prometheus‑Grafana, and provides code examples for both server and client integration.
1. Introduction
mica-mqtt is a simple, low‑latency, high‑performance MQTT open‑source component built on t‑io . See the
mica-mqtt-examplemodule for usage.
2. Features
Supports MQTT v3.1, v3.1.1 and v5.0.
Provides MQTT client.
Provides MQTT server.
Supports will messages.
Supports retained messages.
Custom message (mq) processing and cluster forwarding.
Demo for Alibaba Cloud MQTT connection.
Supports GraalVM native compilation.
Spring Boot starter
mica-mqtt-spring-boot-starterfor quick integration.
Starter integrates with Prometheus + Grafana.
3. TODO
Add WebSocket support (research completed).
Optimize MQTT session handling and add full v5.0 support.
4. Changelog
Documentation added cluster processing steps and usage scenarios for will and retained messages.
Removed QoS2 parameter from demo to avoid performance loss.
Abstracted internal forwarding of will and retained messages.
Added
mica-mqtt-spring-boot-example.
Starter now supports client access and server optimizations.
Starter supports metric collection for Prometheus + Grafana.
Server now disconnects previous connections with same clientId.
Upgraded mica-auto to 2.1.3 to fix IDE incremental compilation issue.
5. Spring Boot Quick Start
5.1 Add Dependency
<code><dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-spring-boot-starter</artifactId>
<version>1.0.2</version>
</dependency>
</code>5.2 Server YML Configuration
<code>mqtt:
server:
enabled: true # default true
ip: 127.0.0.1 # default 127.0.0.1
port: 5883 # default 1883
name: Mica-Mqtt-Server
buffer-allocator: HEAP
heartbeat-timeout: 120000
read-buffer-size: 8092
max-bytes-in-message: 8092
debug: true # disable when Prometheus metrics are enabled
</code>5.3 Required Server Interfaces (register as Spring beans)
IMqttServerAuthHandler – client authentication (required).
IMqttMessageListener – message listening (required).
IMqttConnectStatusListener – connection status listening (required).
IMqttSessionManager – session management (optional).
IMqttMessageStore – stores will and retained messages (cluster mode).
AbstractMqttMessageDispatcher – message forwarding for will/retained messages (cluster mode).
IpStatListener – t‑io IP status listening (optional).
5.4 Optional Server Customizer
<code>@Configuration(proxyBeanMethods = false)
public class MqttServerCustomizerConfiguration {
@Bean
public MqttServerCustomizer activeRecordPluginCustomizer() {
return new MqttServerCustomizer() {
@Override
public void customize(MqttServerCreator creator) {
// custom configuration overrides YML settings
System.out.println("----------------MqttServerCustomizer-----------------");
}
};
}
}
</code>5.5 Using MqttServerTemplate
<code>import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.ByteBuffer;
@Service
public class ServerService {
@Autowired
private MqttServerTemplate server;
public boolean publish(String body) {
server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
return true;
}
}
</code>5.6 Cluster Message Broadcasting via MQ
Implement
IMqttConnectStatusListenerto store device status.
Implement
IMqttMessageListenerto forward messages to MQ for business processing.
Implement
IMqttMessageStoreto store will and retained messages.
Implement
AbstractMqttMessageDispatcherto forward messages to MQ, which then broadcasts back to the MQTT cluster.
Business messages are sent to MQ, broadcast to the MQTT cluster, and finally delivered to devices.
5.7 Prometheus + Grafana Monitoring
Thanks to t‑io’s design, monitoring metrics are exposed via t‑io‑stat . Supported metrics include connection counts, message packets, and byte volumes for both inbound and outbound traffic.
mqtt_connections_accepted – total connections accepted
mqtt_connections_closed – total connections closed
mqtt_connections_size – current active connections
mqtt_messages_handled_packets – processed message packets
mqtt_messages_handled_bytes – processed message bytes
mqtt_messages_received_packets – received message packets
mqtt_messages_received_bytes – received message bytes
mqtt_messages_send_packets – sent message packets
mqtt_messages_send_bytes – sent message bytes
6. Plain Java Project Integration
6.1 Maven Dependency
<code><dependency>
<groupId>net.dreamlu</groupId>
<artifactId>mica-mqtt-core</artifactId>
<version>1.0.2</version>
</dependency>
</code>6.2 mica-mqtt Client Example
<code>// Initialize MQTT client
MqttClient client = MqttClient.create()
.ip("127.0.0.1")
.port(1883) // default 1883
.username("admin")
.password("123456")
.version(MqttVersion.MQTT_5) // default 3_1_1
.clientId("xxxxxx") // default prefix + nanosecond
.connect();
// Subscribe
client.subQos0("/test/#", (topic, payload) -> {
logger.info(topic + '\t' + ByteBufferUtil.toString(payload));
});
// Unsubscribe
client.unSubscribe("/test/#");
// Publish
client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
// Disconnect, reconnect, stop
client.disconnect();
client.reconnect();
client.stop();
</code>6.3 mica-mqtt Server Example
<code>// Increase stack size for many connections: -Xss129k
MqttServer mqttServer = MqttServer.create()
.ip("127.0.0.1")
.port(1883)
.readBufferSize(512)
.authHandler((clientId, userName, password) -> true)
.messageListener((clientId, topic, mqttQoS, payload) -> {
logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS,
ByteBufferUtil.toString(payload));
})
.useSsl("", "", "")
.connectStatusListener(new IMqttConnectStatusListener() {
@Override
public void online(String clientId) { }
@Override
public void offline(String clientId) { }
})
.messageDispatcher(new IMqttMessageDispatcher() {
@Override
public void config(MqttServer mqttServer) { }
@Override
public boolean send(Message message) { return false; }
@Override
public boolean send(String clientId, Message message) { return false; }
})
.debug()
.start();
// Publish to a specific client
mqttServer.publish("clientId", "/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Publish to all listeners of a topic
mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
// Stop server
mqttServer.stop();
</code>7. Demo
See the
mica-mqtt-exampleproject for a live demonstration.
Java Architecture Diary
Committed to sharing original, high‑quality technical articles; no fluff or promotional content.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.