Netty TCP Demo: Long‑Lived Socket Connection Architecture and Implementation
This article presents a complete Netty‑based TCP demo for IoT projects, detailing the project background, architecture, module layout, business flow, and in‑depth Java code examples—including a local queue, multithreaded processing, client creation, Redis locking, and SpringBoot integration—along with testing instructions and source links.
Recently an IoT project required a persistent socket connection for message communication; after numerous bugs the author refined the solution into an open‑source demo that removes business‑specific code and focuses on the core technical components.
Project Background
The demo uses netty , redis and springboot2.2.0 to build a long‑lived TCP client that processes messages from a queue.
Architecture
1. Project Structure
The repository contains three main modules: netty-tcp-core (utility classes), netty-tcp-server (test server, not used in production), and netty-tcp-client (the focus of this article).
2. Business Flow
Producer → Message Queue → Consumer (client) → TCP channel → Server → TCP channel → Client
When a consumer receives a device message, it checks whether a TCP channel to the server already exists; if not, it creates one, sends the message, and caches the connection using Redis.
3. Code Details
1) Message Queue
A local ArrayBlockingQueue simulates a message broker for the demo:
package org.example.client;
import org.example.client.model.NettyMsgModel;
import java.util.concurrent.ArrayBlockingQueue;
/**
* 本项目为演示使用本地队列 实际生产中应该使用消息中间件代替(rocketmq或rabbitmq)
* @author ReWind00
* @date 2023/2/15 11:20
*/
public class QueueHolder {
private static final ArrayBlockingQueue
queue = new ArrayBlockingQueue<>(100);
public static ArrayBlockingQueue
get() {
return queue;
}
}A separate thread continuously takes messages from the queue and processes them:
public class LoopThread implements Runnable {
@Override
public void run() {
for (int i = 0; i < MAIN_THREAD_POOL_SIZE; i++) {
executor.execute(() -> {
while (true) {
try {
NettyMsgModel nettyMsgModel = QueueHolder.get().take();
messageProcessor.process(nettyMsgModel);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
});
}
}
}2) Execution Class
The process method (from MessageProcessor ) handles locking, channel reuse, and client creation, using Redis as a distributed lock to avoid duplicate client creation for the same device (identified by IMEI).
public void process(NettyMsgModel nettyMsgModel) {
String imei = nettyMsgModel.getImei();
try {
synchronized (this) {
if (redisCache.hasKey(NETTY_QUEUE_LOCK + imei)) {
log.info("imei={}消息处理中,重新入列", imei);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
QueueHolder.get().offer(nettyMsgModel);
}
}, 2000);
log.info("imei={}消息处理中,重新入列完成", imei);
return;
} else {
redisCache.setCacheObject(NETTY_QUEUE_LOCK + imei, "1", 120, TimeUnit.SECONDS);
}
}
if (NettyClientHolder.get().containsKey(imei)) {
NettyClient nettyClient = NettyClientHolder.get().get(imei);
if (nettyClient.getChannelFuture() != null && nettyClient.getChannelFuture().channel().isActive()) {
if (!nettyClient.getChannelFuture().channel().isWritable()) {
log.warn("警告,通道不可写,imei={},channelId={}", nettyClient.getImei(), nettyClient.getChannelFuture().channel().id());
}
nettyClient.send(nettyMsgModel.getMsg());
} else {
log.info("client imei={},通道不活跃,主动关闭", nettyClient.getImei());
nettyClient.close();
this.createClientAndSend(nettyMsgModel);
}
} else {
this.createClientAndSend(nettyMsgModel);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
redisCache.deleteObject(NETTY_QUEUE_LOCK + imei);
}
}3) Netty Client
The client is a prototype‑scoped Spring bean that builds a Bootstrap with delimiters, string codecs, idle state handling, and a custom handler. It receives host/port via @Value , connects with a 2‑retry policy, and blocks the thread until the channel becomes active.
public class NettyClient implements Runnable {
@Value("${netty.server.port}") private int port;
@Value("${netty.server.host}") private String host;
private String imei;
private Map
bizData;
private EventLoopGroup workGroup;
private Class
clientHandlerClass;
private ChannelFuture channelFuture;
// constructor, run(), close(), send(), init(), connect() ...
}The client’s channelActive method notifies the waiting thread, allowing the queued message to be sent immediately after the channel is ready.
4) Handler
DemoClientHandler extends BaseClientHandler and processes inbound messages, idle events, and exceptions. It also releases the lock once the channel is active.
public class DemoClientHandler extends BaseClientHandler {
private final String imei;
private final Map
bizData;
private final NettyClient nettyClient;
private int allIdleCounter = 0;
private static final int MAX_IDLE_TIMES = 3;
// channelActive, channelInactive, channelRead, userEventTriggered, exceptionCaught ...
}Testing
A simple Spring MVC controller exposes three endpoints ( /demo/testOne , /demo/testTwo , /demo/testThree ) that push messages into the queue, demonstrating client creation, channel reuse, and lock‑based delayed consumption.
Log output shows the first message triggering client creation, the second message reusing the active channel, and the third test illustrating delayed re‑queue when the lock is held.
Source Code
https://gitee.com/jaster/netty-tcp-demo
Conclusion
The demo is intended for learning and exchange; production use would require replacing the local queue with a real message broker, adding authentication, and handling edge cases.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.