Elasticsearch Data Write Process: Coordinating Node Phase Explained
This article explains how Elasticsearch handles data ingestion by detailing the roles of cluster nodes, the coordinating node workflow, index structures, shard allocation, request validation, pipeline processing, automatic index creation, and the underlying code paths involved in the write operation.
Background
The author explores how external data is written to Elasticsearch and persisted to storage, motivated by curiosity after reading the "Elasticsearch: The Definitive Guide" and Lucene documentation, and decides to study the source code directly.
Related Concepts
Elasticsearch clusters consist of several node roles: Master, Data, Ingest (pre‑processing), Coordinating, and Tribe nodes. Each role has specific responsibilities and configuration settings.
Master Node
Responsible for cluster‑level operations such as index creation and cluster state updates. A node becomes master when node.master:true and can be queried via GET _cat/nodes?v&h=name,node.role,master . Example configuration for a dedicated master node:
node.master: true
node.data: falseTo avoid split‑brain, discovery.zen.minimum_master_node should be set to (master_eligible_nodes / 2) + 1 .
Data Node
Stores data and executes CRUD, search, and aggregation operations. Configuration for a dedicated data node:
node.master:false
node.data:true
node.ingest:falseIngest (Pre‑processing) Node
Introduced in ES 5.0, it runs pipelines of processors before indexing documents. Dedicated ingest node configuration:
node.master:false
node.data:false
node.ingest:trueCoordinating Node
Any node can act as a coordinating node; it receives client requests, forwards them to the appropriate primary shard, gathers responses from replicas, merges results, and returns the final response. A dedicated coordinating node can be configured as:
node.master:false
node.data:false
node.ingest:falseTribe Node
Acts as a federated client across multiple clusters, allowing unified queries. Example configuration includes HTTP and transport ports and the definition of each remote cluster (t1, t2, …).
http.port: 9200
http.publish_port: 9200
transport.tcp.port: 9300
node.master:false
node.data:false
tribe.t1.cluster.name: cluster_test1
tribe.t1.network.host: 192.168.70.109
... (additional tribe settings)Index Level
Indexes can be thought of as databases (pre‑6.0) or tables (post‑6.0). An index consists of _index , _type (deprecated), and _id . Documents are stored in shards, each shard being a Lucene index composed of multiple segments.
Shard and Replica
Data is split into primary shards and replica shards to enable horizontal scaling and high availability. Primary shards handle write operations first; replicas are updated afterward. Elasticsearch provides weak consistency: reads from the primary see the latest data, while reads from replicas may be stale.
Data Write Process
Writes are handled via Index requests (single document) or Bulk requests (multiple documents). The process is divided into three phases: coordinating node handling, primary shard handling, and replica handling. This article focuses on the first phase.
Basic Write Flow
1. Client sends a request to a node. 2. The node determines the target shard based on the document _id and routes the request to the node holding the primary shard. 3. The primary shard processes the request, forwards it to replica shards, and reports success back to the coordinating node, which finally responds to the client.
Detailed Coordinating Node Workflow
The coordinating node performs parameter validation (e.g., IndexRequest.validate() ), handles ingest pipelines, checks automatic index creation settings, and groups bulk requests by shard to minimize network round‑trips.
Parameter validation example (excerpt):
// Parent validation
ActionRequestValidationException validationException = super.validate();
// Type must not be null
if (type == null) {
validationException = addValidationError("type is missing", validationException);
}
// Source (document body) must not be null
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
// Content type must be specified
if (contentType == null) {
validationException = addValidationError("content type is missing", validationException);
}
... (additional checks)Ingest pipeline example:
PUT _ingest/pipeline/pipelineA
{
"description": "inner pipeline",
"processors": [
{ "set": { "field": "inner_pipeline_set", "value": "inner" } }
]
}When a document is indexed with ?pipeline=pipelineA , the processor adds the field inner_pipeline_set":"inner" before the document is stored.
Automatic index creation (default enabled) collects all target indices from the bulk request, filters those that already exist, and creates missing ones via the master node. Example code for creating missing indices uses an AtomicInteger counter to track completion.
// Collect indices
final Set
indices = bulkRequest.requests.stream()
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE ||
request.versionType() == VersionType.EXTERNAL ||
request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
... // Create missing indices and decrement counterAfter validation and possible index creation, the coordinating node groups bulk items by ShardId :
Map
> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) continue;
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting()
.indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
requestsByShard.computeIfAbsent(shardId, s -> new ArrayList<>())
.add(new BulkItemRequest(i, request));
}Each shard‑specific bulk request ( BulkShardRequest ) is then sent to TransportShardBulkAction , which forwards it to the primary shard (or routes to the appropriate node) via the ReroutePhase of TransportReplicationAction . The primary shard executes the operation, updates the translog, and replicates to replicas.
Summary
The coordinating node in Elasticsearch orchestrates the write path by validating requests, applying ingest pipelines, optionally creating indices, routing requests to the correct primary shard, and merging shard‑level responses. Understanding this phase helps developers tune bulk indexing performance, configure dedicated coordinating nodes, and avoid resource contention in large clusters.
References
[1] https://blog.csdn.net/a19860903/article/details/72467996 [2] https://blog.csdn.net/weixin_36564655/article/details/82736327 [3] https://www.elastic.co/guide/cn/elasticsearch/guide/current/distrib-write.html [4] http://developer.51cto.com/art/201904/594615.htm
Xueersi Online School Tech Team
The Xueersi Online School Tech Team, dedicated to innovating and promoting internet education technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.