Backend Development 14 min read

Design and Implementation of a Custom Distributed Job Scheduling Framework (k‑job)

This article introduces the motivation, architecture, technology choices, and key implementation details of a lightweight, highly extensible distributed job scheduling framework built on gRPC, Protobuf, a custom name‑server, and a bespoke message‑queue, addressing limitations of existing solutions like Quartz, XXL‑Job, and PowerJob.

Selected Java Interview Questions
Selected Java Interview Questions
Selected Java Interview Questions
Design and Implementation of a Custom Distributed Job Scheduling Framework (k‑job)

The current landscape of scheduled‑task frameworks—such as Quartz, XXL‑Job, and PowerJob—offers many features but suffers from inflexibility, heavy size, and limited dynamic configurability, prompting the decision to rewrite a lightweight framework tailored to frequent task creation, dynamic parameter changes, and high‑concurrency scenarios.

Positioning : The new framework extends PowerJob, adding a lightweight API, built‑in asynchronous message queue, and a custom registration center (NameServer) to achieve fine‑grained load balancing and automatic app‑group splitting.

Technical Stack :

通信 : gRPC(基于netty的nio)
序列化 :Protobuf编码格式编解码
负载均衡 :自己实现的注册中心NameServer
    |___ 策略 : 服务端最小调度次数策略
    |___ 交互 :pull+push
消息队列 : 自己实现的简易消息队列
    |___ 消息发送 : 异步+超时重试
    |___ 持久化 :mmap+同步刷盘策略
    |___ 消息重试 :多级延时队列+死信队列
定时调度 : 时间轮算法

Project Structure :

├── LICENSE
├── k-job-common // 各组件的公共依赖,开发者无需感知
├── k-job-nameServer // server和worker的注册中心,提供负载均衡
├── k-job-producer //普通Jar包,提供 OpenAPI,内置消息队列的异步发送
├── k-job-server // 基于SpringBoot实现的调度服务器
├── k-job-worker-boot-starter // kjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 kjob-server
├── k-job-worker // 普通Jar包,接入kjob-server的应用需要依赖该Jar包
└── pom.xml

Key Features :

Load balancing for massive concurrent tasks using a minimum‑schedule‑times strategy and server‑side grouping.

Automatic app‑group splitting when worker count exceeds a threshold, with dynamic server assignment.

Asynchronous task creation and modification via a custom producer that uses gRPC future stubs and Reactor for non‑blocking handling.

Reliable message queue with mmap‑based commit log, consumer queue, multi‑level delay queues, and a dead‑letter queue to guarantee delivery and persistence.

Key load‑balancing code:

public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName) {
    // first req, serverAddress is empty
    if(serverAddress.isEmpty()){
        ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
        reBalanceInfo.setSplit(false);
        reBalanceInfo.setServerIpList(new ArrayList
(serverAddressSet));
        reBalanceInfo.setSubAppName("");
        return reBalanceInfo;
    }
    ReBalanceInfo reBalanceInfo = new ReBalanceInfo();
    // get sorted scheduleTimes serverList
    List
newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream()
        .sorted(new Comparator
() {
            @Override
            public int compare(String o1, String o2) {
                return (int) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2));
            }
        }).collect(Collectors.toList());
    // see if split
    if(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 1){
        // return new serverIpList
        reBalanceInfo.setSplit(true);
        reBalanceInfo.setChangeServer(false);
        reBalanceInfo.setServerIpList(newServerIpList);
        reBalanceInfo.setSubAppName(appName + ":" + appName2WorkerNumMap.size());
        return reBalanceInfo;
    }
    // see if need change server
    Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 1));
    Long comparedScheduleTimes = lestScheduleTimes == 0 ? 1 : lestScheduleTimes;
    if(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 2){
        reBalanceInfo.setSplit(false);
        reBalanceInfo.setChangeServer(true);
        // first server is target lest scheduleTimes server
        reBalanceInfo.setServerIpList(newServerIpList);
        reBalanceInfo.setSubAppName("");
        return reBalanceInfo;
    }
    // return default list
    reBalanceInfo.setSplit(false);
    reBalanceInfo.setServerIpList(new ArrayList
(serverAddressSet));
    reBalanceInfo.setSubAppName("");
    return reBalanceInfo;
}

Message‑queue implementation (commit log, consumer queue, delay queues) includes the following core structures:

private MappedByteBuffer commitLogBuffer;  // commitlog file mapped to memory
private MappedByteBuffer consumerQueueBuffer; // consumerQueue file mapped to memory
private final AtomicLong commitLogBufferPosition = new AtomicLong(0);
private final AtomicLong commitLogCurPosition = new AtomicLong(0);
private final AtomicLong lastProcessedOffset = new AtomicLong(0);
private final AtomicLong currentConsumerQueuePosition = new AtomicLong(0);
private final AtomicLong consumerPosition = new AtomicLong(0);

Delay‑queue based retry logic:

private static final Deque
deadMessageQueue = new ArrayDeque<>();
private static final List
> delayQueueList = new ArrayList<>(2);
private static List
delayTimes = Lists.newArrayList(10000L, 5000L);
public static void init(Consumer consumer) {
    delayQueueList.add(new DelayQueue<>());
    delayQueueList.add(new DelayQueue<>());
    Thread consumerThread1 = new Thread(() -> {
        try {
            while (true) {
                DelayQueue
delayQueue = delayQueueList.get(0);
                if (!delayQueue.isEmpty()) {
                    DelayedMessage message = delayQueue.take();
                    consumer.consume(message.message);
                    delayQueue.remove(message);
                    System.out.println("Consumed: " + message.getMessage() + " at " + System.currentTimeMillis());
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("Consumer thread interrupted");
        }
    });
    consumerThread1.start();
}
public static void reConsume(MqCausa.Message msg) {
    if (msg.getRetryTime() == 0) {
        log.error("msg : {} is dead", msg);
        deadMessageQueue.add(msg);
        return;
    }
    MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 1).build();
    DelayedMessage delayedMessage = new DelayedMessage(build, delayTimes.get(build.getRetryTime()));
    delayQueueList.get(msg.getRetryTime() - 1).add(delayedMessage);
}
static class DelayedMessage implements Delayed {
    private final MqCausa.Message message;
    private final long triggerTime;
    public DelayedMessage(MqCausa.Message message, long delayTime) {
        this.message = message;
        this.triggerTime = System.currentTimeMillis() + delayTime;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.triggerTime, ((DelayedMessage) other).triggerTime);
    }
    public MqCausa.Message getMessage() { return message; }
}

The article concludes with architecture diagrams for service discovery and scheduling, and provides the open‑source repository URL: https://github.com/karatttt/k-job .

backenddistributed systemsJavaload balancinggRPCMessage QueueJob Scheduling
Selected Java Interview Questions
Written by

Selected Java Interview Questions

A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.