Databases 13 min read

How ClickHouse Local Join Cuts Query Time and Memory Usage in Supply‑Chain Planning

This article explains how moving aggregation logic from in‑memory processing to ClickHouse SQL, synchronizing configuration data, and leveraging ClickHouse ReplacingMergeTree tables with local joins dramatically reduces query latency and memory consumption for large‑scale supply‑chain planning workloads.

JD Cloud Developers
JD Cloud Developers
JD Cloud Developers
How ClickHouse Local Join Cuts Query Time and Memory Usage in Supply‑Chain Planning

Introduction

This article discusses bottlenecks encountered in a supply‑chain planning system and shares tools and methods used to resolve them, focusing on practical solutions rather than deep theoretical details.

Business Background

The system stores planning, dimensional, and configuration data in TiDB, while large historical reference data resides in ClickHouse. As business grew, requirements for filtering historical data and updating business tags caused long T+1 activation cycles, excessive memory usage, and query latencies exceeding 10 seconds.

Solution

Experiments showed that performing aggregation directly in SQL on either TiDB or ClickHouse is far faster than in‑memory aggregation, reducing execution time from about 5 seconds to roughly 300 ms. The main optimization is to sync business configuration data to ClickHouse and join results from TiDB and ClickHouse.

1. ClickHouse ReplacingMergeTree Table and Dimension‑Table Mode

After reviewing the official documentation, we chose ReplacingMergeTree with the

FINAL

keyword for deduplication. The following DDL creates a replicated table suitable for production use:

<code>CREATE TABLE IF NOT EXISTS library.blacklist ON CLUSTER xx (
    `dept_id_1` Int32 COMMENT '一级部门ID',
    `dept_id_2` Int32 COMMENT '二级部门ID',
    `dept_id_3` Int32 COMMENT '三级部门ID',
    `saler` String COMMENT '销售erp',
    `pur_controller` String COMMENT '采控erp',
    `update_time` DateTime COMMENT '更新时间',
    `is_deleted` UInt8 COMMENT '有效标识 0:未删除 1:已删除'
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/xx/jdob_ha/sop_pre_mix/blacklist/{shard_dict}', '{replica_dict}', update_time)
ORDER BY (dept_id_3, saler, pur_controller)
SETTINGS storage_policy = 'jdob_ha';</code>

Key points:

Use

ReplicatedReplacingMergeTree

instead of plain

ReplacingMergeTree

.

The path parameters are JD‑specific metadata keys and can be left as shown.

update_time

is used in

ORDER BY

to keep the latest row.

This table follows the dimension‑table pattern, storing a full copy on each node.

2. ClickHouse Local Join

Local join executes the join on each distributed node before sending results to the aggregating node, offering far lower latency and resource consumption than the default Global Join.

Requirements for Data Sharding

Both tables must share the same sharding function;

rand()

cannot be used. A consistent hash algorithm is recommended.

<code>-- Distributed table example
CREATE TABLE IF NOT EXISTS sop_pre_mix.history ON CLUSTER xx AS sop_pre_mix.history_local ENGINE = Distributed('xx','sop_pre_mix','history_local',rand());</code>

Local Join Syntax Pitfalls

The correct pattern is:

<code>SELECT *
FROM a.dis JOIN (SELECT * FROM b.local WHERE b.cond2) ON a.key = b.key
WHERE a.cond1;</code>

Filters on the distributed left table must appear in the outer

WHERE

clause; wrapping the left side in a subquery breaks the local join.

In older ClickHouse versions, the dimension table must be referenced with its database prefix (e.g., sop_pre_mix.sop_sale_plan_rule_core_dim ) or the engine will report “table not found”.

Final Local Join Presentation

Left table (distributed, ~2.9 billion rows):

<code>CREATE TABLE IF NOT EXISTS sop_prod_mix.sop_sale_history_week_local ON CLUSTER xx (
    dept_id_1 Int32 COMMENT '一级部门id',
    dept_name_1 String COMMENT '一级部门名称',
    ...
    sale_amount_lunar_sp Decimal(20,2) COMMENT '同期自营销售出库金额',
    dt String COMMENT '数据日期'
) ENGINE = ReplicatedMergeTree('/clickhouse/LFRH_CK_Pub_115/jdob_ha/sop_prod_mix/sop_sale_history_week_local/{shard}', '{replica}')
PARTITION BY dt
ORDER BY (dept_id_1,dept_id_2,dept_id_3,saler,pur_controller,cate_id_3,ym,ymw)
SETTINGS storage_policy = 'jdob_ha', index_granularity = 8192;

CREATE TABLE IF NOT EXISTS sop_prod_mix.sop_sale_history_week ON CLUSTER xx AS sop_prod_mix.sop_sale_history_week_local ENGINE = Distributed('xx','sop_prod_mix','sop_sale_history_week_local',rand());</code>

Right table (dimension, ~4,500 rows):

<code>CREATE TABLE IF NOT EXISTS sop_pre_mix.sop_dim_blacklist ON CLUSTER xx (
    `dept_id_1` Int32 COMMENT '一级部门ID',
    `dept_id_2` Int32 COMMENT '二级部门ID',
    `dept_id_3` Int32 COMMENT '三级部门ID',
    `saler` String COMMENT '销售erp',
    `pur_controller` String COMMENT '采控erp',
    `update_time` DateTime COMMENT '更新时间',
    `is_deleted` UInt8 COMMENT '有效标识 0:未删除 1:已删除'
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/xx/jdob_ha/sop_pre_mix/sop_dim_blacklist/{shard_dict}', '{replica_dict}', update_time)
ORDER BY (dept_id_3, saler, pur_controller)
SETTINGS storage_policy = 'jdob_ha';</code>

Sample query that leverages the local join:

<code>SELECT a.ymw AS ymw,
       a.dept_id_2 AS dept_id_2,
       a.dept_id_3 AS dept_id_3,
       a.week AS week,
       a.cold_type AS cold_type,
       a.year AS YEAR,
       a.net_type AS net_type,
       a.saler AS saler,
       a.pur_controller AS pur_controller,
       a.dept_id_1 AS dept_id_1,
       a.month AS MONTH,
       a.ym AS ym,
       CASE WHEN a.dept_id_1 != c.dept_id_1 OR c.dept_id_1 IS NULL THEN -100 ELSE c.core_dim_id END AS core_dim_id,
       CASE WHEN a.dept_id_1 != c.dept_id_1 OR c.dept_id_1 IS NULL THEN -100 ELSE c.core_dim_id END AS brand_id,
       SUM(initial_inv_amount) AS initial_inv_amount,
       SUM(gmv_lunar_sp) AS gmvLunarSp
FROM sop_pur_history_week a
LEFT JOIN (
    SELECT dept_id_2, dept_id_3, pur_controller, saler
    FROM sop_pre_mix.sop_dim_blacklist FINAL
    WHERE is_deleted = 0 AND dept_id_3 IN (12345,23456,...)
) b ON a.dept_id_3 = b.dept_id_3 AND a.pur_controller = b.pur_controller AND a.saler = b.saler
LEFT JOIN (
    SELECT dept_id_1, dept_id_2, dept_id_3, pur_controller, saler, core_dim_id
    FROM sop_pre_mix.sop_sale_plan_rule_core_dim FINAL
    WHERE is_deleted = 0 AND dept_id_3 IN (12345,23456,...) AND core_dim_id IN (12310) AND plan_dim = 'brand'
) c ON a.dept_id_1 = c.dept_id_1 AND a.dept_id_2 = c.dept_id_2 AND a.dept_id_3 = c.dept_id_3 AND a.saler = c.saler AND a.pur_controller = c.pur_controller AND a.brand_id = c.core_dim_id
WHERE dt = '2023-12-16'
  AND a.dept_id_3 IN (12345,23456,...)
  AND a.brand_id IN (12310)
  AND (a.dept_id_3 != b.dept_id_3 OR b.dept_id_3 IS NULL)
GROUP BY a.ymw, a.dept_id_2, a.dept_id_3, a.week, a.cold_type, a.year, a.net_type, a.saler, a.pur_controller, a.dept_id_1, a.month, a.ym, c.dept_id_1, core_dim_id;</code>

Resource comparison shows that the local‑join version consumes far fewer rows and memory on a 9‑shard, 18‑node cluster, saving more than tenfold resources as the shard count increases.

Final Optimization Effects

The changes eliminated frequent OOM incidents during regular queries and delivered stable performance improvements. Test‑environment benchmarks (shown in the following chart) confirm the gains, and production results are even better.

performance optimizationClickHouseTiDBDatabase EngineeringLocal Join
JD Cloud Developers
Written by

JD Cloud Developers

JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.