Design and Implementation of the xxl-job Communication Layer Using Netty and Dynamic Proxy
This article explains how xxl-job implements its communication layer with Netty HTTP, outlines the overall workflow, and highlights key design choices such as dynamic proxy abstraction, full asynchronous processing, and request‑ID based thread synchronization, accompanied by detailed Java code examples.
xxl-job uses Netty HTTP for communication, although it also supports Mina and Jetty; the current implementation hard‑codes Netty HTTP.
The overall processing flow is illustrated with an activity diagram, showing how the scheduler notifies the executor to execute tasks.
Activity diagram
Key design highlights:
Using dynamic proxy to hide communication details
xxl-job defines two interfaces, ExecutorBiz and AdminBiz , which encapsulate operations such as heartbeat, pause, trigger execution, registration, and cancellation. Their implementations contain no communication logic; instead, XxlRpcReferenceBean.getObject() generates a proxy that performs remote calls.
Full asynchronous processing
The executor deserializes incoming messages and stores task information in a LinkedBlockingQueue . Worker threads retrieve tasks from this queue for execution, while results are placed into a callback queue, reducing Netty worker thread load and increasing throughput.
Asynchronous handling wrapped as synchronous calls
Although the code appears synchronous, the underlying implementation uses futures and callbacks to manage async execution.
Scheduler side code for triggering a task ( XxlJobTrigger.runExecutor ) is shown below:
public static ReturnT
runExecutor(TriggerParam triggerParam, String address){
ReturnT
runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// many async operations, finally get synchronous result
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("\n address:").append(address);
runResultSB.append("\n code:").append(runResult.getCode());
runResultSB.append("\n msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}The dynamic proxy implementation for synchronous calls:
// proxy invocation
if (CallType.SYNC == callType) {
XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
try {
client.asyncSend(finalAddress, xxlRpcRequest);
XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
if (xxlRpcResponse.getErrorMsg() != null) {
throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
}
return xxlRpcResponse.getResult();
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
} finally {
futureResponse.removeInvokerFuture();
}
}The XxlRpcFutureResponse class handles thread waiting and notification:
public void setResponse(XxlRpcResponse response) {
this.response = response;
synchronized (lock) {
done = true;
lock.notifyAll();
}
}
@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (!done) {
synchronized (lock) {
try {
if (timeout < 0) {
lock.wait();
} else {
long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.wait(timeoutMillis);
}
} catch (InterruptedException e) {
throw e;
}
}
}
if (!done) {
throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request.toString());
}
return response;
}Each remote call carries a UUID request ID, which is used to locate the corresponding XxlRpcFutureResponse and wake the waiting thread:
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
if (futureResponse == null) return;
if (futureResponse.getInvokeCallback() != null) {
try {
executeResponseCallback(new Runnable() {
@Override
public void run() {
if (xxlRpcResponse.getErrorMsg() != null) {
futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
} else {
futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
}
}
});
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else {
futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
}These design choices provide a clean abstraction of the communication layer, improve throughput by offloading work from Netty threads, and simplify the developer experience by presenting asynchronous operations as synchronous method calls.
IT Architects Alliance
Discussion and exchange on system, internet, large‑scale distributed, high‑availability, and high‑performance architectures, as well as big data, machine learning, AI, and architecture adjustments with internet technologies. Includes real‑world large‑scale architecture case studies. Open to architects who have ideas and enjoy sharing.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.