How ClickHouse Distributed Tables Write Data: Sync vs Async Explained
This article dissects ClickHouse's Distributed table write path, detailing how the Distributed engine creates tables, decides between synchronous and asynchronous insertion, manages local and remote shards, handles sharding keys, and ensures atomic data distribution across the cluster.
ClickHouse, like ElasticSearch, uses data sharding (shard) as a core feature of its distributed storage, improving efficiency through parallel reads and writes. The Distributed engine implements a Distributed table mechanism that creates a view over all local tables for convenient distributed queries.
Distributed Table Engine Overview
The Distributed table engine does not store data itself; it reads from or writes to remote nodes' tables. Its creation depends on existing local tables, with a statement similar to:
<code>CREATE TABLE {table} ON CLUSTER {cluster}
AS {local_table}
ENGINE = Distributed({cluster}, {database}, {local_table}, {policy})
</code>The
policycan be a random function (e.g.,
rand()) or a hash function (e.g.,
halfMD5hash(id)).
Cluster Configuration
ClickHouse cluster nodes are defined in
remote_serversXML configuration, for example:
<code><remote_servers>
<logs>
<shard>
<weight>1</weight>
<internal_replication>true</internal_replication>
<replica>
<priority>1</priority>
<host>example01-01-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-01-2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<weight>2</weight>
<internal_replication>true</internal_replication>
<replica>
<host>example01-02-1</host>
<port>9000</port>
</replica>
<replica>
<host>example01-02-2</host>
<port>9000</port>
</replica>
</shard>
</logs>
</remote_servers>
</code>Write Path Entry Point
The write process starts with the constructor of
DistributedBlockOutputStream:
<code>DistributedBlockOutputStream(const Context &context_, StorageDistributed &storage_, const ASTPtr &query_ast_, const ClusterPtr &cluster_, bool insert_sync_, UInt64 insert_timeout_);
</code>If
insert_sync_is
true, the insertion is synchronous; otherwise it is asynchronous. The flag is derived from the
insert_distributed_syncsetting and the presence of an
owned_cluster(used by table functions).
Synchronous vs Asynchronous Insertion
Synchronous insertion writes directly to the target table, while asynchronous insertion first writes to a local temporary file and later distributes the data to remote nodes.
<code>DistributedBlockOutputStream::write()
↓
if insert_sync
↓
writeSync()
else
↓
writeAsync()
</code>Asynchronous Write Implementation
The core of asynchronous writing is
writeAsyncImpl(). It checks whether the shard has internal replication and then either writes to the local node first or iterates over all shards:
<code>writeAsyncImpl()
↓
if shard_info.hasInternalReplication()
writeToLocal()
writeToShard()
else
for each shard { writeToShard() }
</code> writeToLocal()prefers a local replica when available.
writeToShard()handles the actual file creation and hard‑linking to ensure atomicity.
<code>void DistributedBlockOutputStream::writeToShard(const Block &block, const std::vector<std::string> &dir_names)
{
std::string first_file_tmp_path{};
bool first = true;
for (const auto &dir_name : dir_names)
{
const auto &path = storage.getPath() + dir_name + '/';
if (Poco::File(path).createDirectory())
storage.requireDirectoryMonitor(dir_name);
const auto &file_name = toString(storage.file_names_increment.get()) + ".bin";
const auto &block_file_path = path + file_name;
if (first)
{
first = false;
const auto &tmp_path = path + "tmp/";
Poco::File(tmp_path).createDirectory();
const auto &block_file_tmp_path = tmp_path + file_name;
first_file_tmp_path = block_file_tmp_path;
WriteBufferFromFile out{block_file_tmp_path};
CompressedWriteBuffer compress{out};
NativeBlockOutputStream stream{compress, ClickHouseRevision::get(), block.cloneEmpty()};
writeVarUInt(UInt64(DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER), out);
context.getSettingsRef().serialize(out);
writeStringBinary(query_string, out);
stream.writePrefix();
stream.write(block);
stream.writeSuffix();
}
if (link(first_file_tmp_path.data(), block_file_path.data()))
throwFromErrnoWithPath("Could not link " + block_file_path + " to " + first_file_tmp_path, block_file_path, ErrorCodes::CANNOT_LINK);
}
// ...
}
</code>Data files are stored under
/var/lib/clickhouse/data/{database}/{table}/, with a separate directory for each shard (e.g.,
'default@ck2-0:9000,default@ck2-1:9000'). Each shard directory contains a
tmpsubdirectory used to write temporary files before hard‑linking them into the final location, guaranteeing that only complete files are visible to the distribution thread.
<code># tree
.
├── 'default@ck2-0:9000,default@ck2-1:9000'
│ ├── 25.bin
│ └── tmp
│ └── 26.bin
└── 'default@ck3-0:9000,default@ck3-1:9000'
└── tmp
</code>Sharding Key Processing
If a sharding key is defined and the cluster has more than one shard, the block is split before asynchronous distribution:
<code>writeAsync()
↓
if storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)
writeAsyncImpl(block) // normal path
else
writeSplitAsync(block) // split then writeAsyncImpl for each shard
</code>The sharding key expression is built when the table is created:
<code>sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context, getColumns().getAllPhysical(), false);
</code>During splitting, a selector is created that maps each row to a shard based on the evaluated sharding key and the cluster's slot‑to‑shard mapping:
<code>IColumn::Selector DistributedBlockOutputStream::createSelector(const Block &source_block)
{
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
const auto &key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName());
const auto &slot_to_shard = cluster->getSlotToShard();
// ... selector construction logic ...
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
}
</code>Rows are then scattered to the appropriate shards using the selector, respecting the weight configuration defined in
remote_servers.
Practical Recommendations
Writing to Distributed tables creates temporary local data, increasing CPU and memory usage; minimize write operations on such tables. The temporary block is split by sharding key and weight, generating many smaller blocks that increase merge load on remote nodes. When a Distributed table is created via a table function, it usually performs synchronous writes; be aware of this behavior.
Understanding these mechanisms helps you use Distributed tables efficiently and troubleshoot performance issues.
JD Cloud Developers
JD Cloud Developers (Developer of JD Technology) is a JD Technology Group platform offering technical sharing and communication for AI, cloud computing, IoT and related developers. It publishes JD product technical information, industry content, and tech event news. Embrace technology and partner with developers to envision the future.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.