Backend Development 28 min read

Design and Implementation of a Netty‑Based Microservice Communication Module

The article walks Java developers through constructing a lightweight Netty‑based RPC framework—using RocketMQ’s NettyRemotingServer and NettyRemotingClient to handle synchronous, asynchronous and one‑way calls, routing request codes to dedicated processors and thread pools, exposing services via dynamic proxies, and outlining extensions such as service‑registry integration for a full microservice communication solution.

DaTaobao Tech
DaTaobao Tech
DaTaobao Tech
Design and Implementation of a Netty‑Based Microservice Communication Module

This article provides a deep dive into building a communication module on top of Netty, illustrating how to create a lightweight microservice‑style RPC framework for Java developers. It starts with an overview of network communication patterns, contrasting short‑lived HTTP connections with long‑lived TCP connections, and explains why Netty is preferred over raw sockets for high‑performance, reliable messaging.

Server side : The server is built using RocketMQ's NettyRemotingServer . Configuration includes port, selector threads, and worker threads. Processors are registered per request code, each backed by its own thread pool to isolate business logic.

import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Server {
    public static void main(String[] args) throws Exception {
        NettyServerConfig cfg = new NettyServerConfig();
        cfg.setListenPort(8888);
        cfg.setServerSelectorThreads(2);
        cfg.setServerWorkerThreads(8);
        NettyRemotingServer server = new NettyRemotingServer(cfg, null);
        ExecutorService poolA = new ThreadPoolExecutor(4,4,0,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1024));
        ExecutorService poolB = new ThreadPoolExecutor(4,4,0,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1024));
        NettyRequestProcessor processA = new NettyRequestProcessor(){
            @Override
            public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand req) throws Exception {
                RemotingCommand resp = RemotingCommand.createResponseCommand(0,"server");
                switch(req.getCode()){
                    case 0: resp.setBody("hello sync 0".getBytes()); break;
                    case 1: resp.setBody("hello sync 1".getBytes()); break;
                }
                return resp;
            }
            @Override public boolean rejectRequest(){return false;}
        };
        server.registerProcessor(0, processA, poolA);
        server.registerProcessor(1, processA, poolA);
        server.start();
        System.out.println("start ok " + JSON.toJSONString(cfg));
        System.in.read();
    }
}

Client side : The client uses NettyRemotingClient to perform synchronous, asynchronous, and one‑way calls. Each call is assigned a unique opaque ID, stored in a response table, and the client thread waits on a latch (sync) or a callback (async).

import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Client {
    public static void main(String[] args) throws Exception {
        NettyClientConfig cfg = new NettyClientConfig();
        cfg.setClientWorkerThreads(8);
        NettyRemotingClient client = new NettyRemotingClient(cfg, null);
        ExecutorService poolA = new ThreadPoolExecutor(4,4,0,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1024));
        client.registerProcessor(5, new NettyRequestProcessor(){
            @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand req) throws Exception {return null;}
            @Override public boolean rejectRequest(){return false;}
        }, poolA);
        client.start();
        // sync call
        RemotingCommand req = RemotingCommand.createRequestCommand(0,null);
        req.setRemark("sync");
        RemotingCommand resp = client.invokeSync("127.0.0.1:8888", req, 30000);
        System.out.println("sync result: " + new String(resp.getBody()));
        // async call
        RemotingCommand asyncReq = RemotingCommand.createRequestCommand(1,null);
        asyncReq.setRemark("async");
        client.invokeAsync("127.0.0.1:8888", asyncReq, 30000, new InvokeCallback(){
            @Override public void operationComplete(org.apache.rocketmq.remoting.ResponseFuture f){
                System.out.println("async result: " + new String(f.getResponseCommand().getBody()));
            }
        });
        // oneway call
        RemotingCommand oneWay = RemotingCommand.createRequestCommand(9,null);
        oneWay.setRemark("oneway");
        client.invokeOneway("127.0.0.1:8888", oneWay, 30000);
        System.in.read();
    }
}

The article also discusses the internal routing mechanism: request codes map to NettyRequestProcessor instances and their dedicated thread pools, enabling fine‑grained resource isolation for different business scenarios.

Microservice extension : By registering Java interfaces and their implementations in ApiProviderBean , the framework can expose services via dynamic proxy. The client side ( ApiConsumerBean ) encodes method signatures and arguments into a custom header, sends a request, and decodes the JSON response, achieving an HSF‑like RPC experience without a heavyweight framework.

public class ApiProviderBean {
    private NettyRemotingServer server;
    private Map
> index = new HashMap<>();
    public void init() throws Exception {
        NettyServerConfig cfg = new NettyServerConfig();
        cfg.setListenPort(8888);
        server = new NettyRemotingServer(cfg, null);
        server.registerProcessor(0, (ctx, req) -> {
            CommonHeader hdr = (CommonHeader) req.decodeCommandCustomHeader(CommonHeader.class);
            Call call = index.get(hdr.getInterfaceName()).get(hdr.getMethodName());
            Object[] args = decodeArgs(hdr.getArgsJsonJson(), call.method.getParameters());
            Object result = call.method.invoke(call.instance, args);
            RemotingCommand resp = RemotingCommand.createResponseCommand(0,null);
            if(result!=null) resp.setBody(JSON.toJSONBytes(result));
            return resp;
        }, null);
        server.start();
    }
    public
void register(Class
iface, T impl){
        index.computeIfAbsent(iface.getName(), k->new LinkedHashMap<>())
             .putAll(Arrays.stream(iface.getDeclaredMethods())
                 .collect(Collectors.toMap(Method::toString, m->{
                     Call c = new Call(); c.instance=impl; c.method=m; return c; } )));
    }
    static class Call{Object instance; Method method;}
}
public class ApiConsumerBean implements InvocationHandler {
    private NettyRemotingClient client; private String addr; private Class
iface; private long timeout=3000L;
    @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if(method.getDeclaringClass()==Object.class) return method.invoke(this,args);
        CommonHeader hdr = new CommonHeader();
        hdr.setInterfaceName(iface.getName());
        hdr.setMethodName(method.toString());
        List
argJson = Arrays.stream(args).map(JSON::toJSONString).collect(Collectors.toList());
        hdr.setArgsJsonJson(JSON.toJSONString(argJson));
        RemotingCommand req = RemotingCommand.createRequestCommand(0,hdr);
        if(method.getReturnType()!=void.class){
            RemotingCommand resp = client.invokeSync(addr, req, timeout);
            return JSON.parseObject(new String(resp.getBody(),StandardCharsets.UTF_8), method.getReturnType());
        }else{
            client.invokeOneway(addr, req, timeout);
            return null;
        }
    }
    @SuppressWarnings("unchecked")
    public
T getProxy(){return (T)Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{iface}, this);}
}

Finally, the article suggests further enhancements such as integrating a service registry (e.g., ZooKeeper) for automatic provider discovery and load‑balancing, turning the prototype into a full‑featured, low‑dependency microservice communication framework.

distributed systemsJavamicroservicesrpcnettyThread Pools
DaTaobao Tech
Written by

DaTaobao Tech

Official account of DaTaobao Technology

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.