Six Common Approaches to Synchronize MySQL Data to Elasticsearch
This article reviews six mainstream solutions for keeping MySQL and Elasticsearch in sync—including synchronous double‑write, asynchronous MQ‑based double‑write, Logstash polling, Canal binlog listening, DataX batch migration, and Flink stream processing—detailing their scenarios, advantages, drawbacks, and practical code examples to guide optimal technical selection.
In distributed architectures, combining MySQL with Elasticsearch (ES) is a standard pattern for handling high‑concurrency queries and complex searches, but efficient data synchronization remains a challenging design problem.
Solution 1: Synchronous Double‑Write
Scenario: Real‑time critical data with simple business logic, such as financial transaction records.
Write to MySQL and ES simultaneously in the business code.
Code example:
@Transactional
public void createOrder(Order order) {
// write to MySQL
orderMapper.insert(order);
// sync to ES
IndexRequest request = new IndexRequest("orders")
.id(order.getId())
.source(JSON.toJSONString(order), XContentType.JSON);
client.index(request, RequestOptions.DEFAULT);
}Pain points:
Hard‑coded intrusion: Every write operation must add ES logic.
Performance bottleneck: Double‑write extends transaction time, reducing TPS by over 30%.
Data consistency risk: If ES write fails, a compensation mechanism (e.g., local transaction table + retry) is required.
Solution 2: Asynchronous Double‑Write (MQ)
Scenario: E‑commerce order status updates need to be searchable by a customer service system.
Decouple using a message queue.
Code example:
// Producer
public void updateProduct(Product product) {
productMapper.update(product);
kafkaTemplate.send("product-update", product.getId());
}
// Consumer
@KafkaListener(topics = "product-update")
public void syncToEs(String productId) {
Product product = productMapper.selectById(productId);
esClient.index(product);
}Advantages:
Throughput boost: MQ smooths spikes, handling ten‑thousands of QPS.
Fault isolation: ES downtime does not affect the main business flow.
Drawbacks:
Message backlog: Traffic bursts may cause consumption delay (monitor lag).
Ordering issues: Need partition keys to guarantee order for the same data.
Solution 3: Logstash Periodic Pull
Scenario: T+1 analysis of user behavior logs.
Low‑intrusion but high latency.
Configuration example:
input {
jdbc {
jdbc_driver => "com.mysql.jdbc.Driver"
jdbc_url => "jdbc:mysql://localhost:3306/log_db"
schedule => "*/5 * * * *" # every 5 minutes
statement => "SELECT * FROM user_log WHERE update_time > :sql_last_value"
}
}
output {
elasticsearch {
hosts => ["es-host:9200"]
index => "user_logs"
}
}Pros: Zero code changes, suitable for historical data migration.
Cons: Minute‑level delay (cannot meet real‑time search) and full‑table scans impose heavy load unless incremental indexes are optimized.
Solution 4: Canal Binlog Listening
Scenario: Real‑time search for social platform feeds (e.g., Weibo hot topics).
Tech stack: Canal + RocketMQ + ES.
High real‑time, low intrusion.
Key configuration:
# canal.properties
canal.instance.master.address=127.0.0.1:3306
canal.mq.topic=canal.es.syncPitfalls:
Data drift: Need to handle DDL changes via a schema registry.
Idempotent consumption: Use _id as a unique key to avoid duplicate writes.
Solution 5: DataX Batch Sync
Scenario: Migrating historical order data from sharded MySQL to ES.
Preferred for large‑scale data migration.
Configuration snippet:
{
"job": {
"content": [{
"reader": {
"name": "mysqlreader",
"parameter": {
"splitPk": "id",
"querySql": "SELECT * FROM orders"
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://es-host:9200",
"index": "orders"
}
}
}]
}
}Performance tuning: Adjust channel to match shard count and set limit for batch queries to avoid OOM.
Solution 6: Flink Stream Processing
Scenario: Real‑time recommendation scoring when product prices change, requiring user‑profile joins.
Suitable for complex ETL pipelines.
Code snippet:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new CanalSource())
.map(record -> parseToPriceEvent(record))
.keyBy(event -> event.getProductId())
.connect(userProfileBroadcastStream)
.process(new PriceRecommendationProcess())
.addSink(new ElasticsearchSink());Advantages:
State management: Precise handling of out‑of‑order events via Watermark.
Dimension table join: Real‑time profile association using Broadcast State.
Summary & Recommendation
The table below compares the six solutions on real‑time capability, intrusiveness, complexity, and suitable stages.
Solution
Real‑time
Intrusiveness
Complexity
Applicable Stage
Synchronous Double‑Write
Seconds
High
Low
Small monolith
MQ Asynchronous
Seconds
Medium
Medium
Mid‑size distributed system
Logstash
Minutes
None
Low
Offline analysis
Canal
Milliseconds
None
High
High‑concurrency production
DataX
Hours
None
Medium
Historical migration
Flink
Milliseconds
Low
Very High
Real‑time data warehouse
Recommendation:
If the team lacks middleware ops capability → choose Logstash or synchronous double‑write.
Need second‑level latency and can refactor code → MQ asynchronous + local transaction table.
Seek ultimate real‑time with ample resources → combine Canal and Flink.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.