Investigation of Data Loss in a Flink Kafka Consumer Caused by Mixed Kafka Cluster Configuration
The data loss in a Flink‑Kafka job was caused by a mis‑configured bootstrap.servers list that mixed production and pre‑release Kafka clusters, leading different subtasks to connect to different clusters, resulting in inconsistent partition discovery and offset fetching, which omitted several partitions until the list was corrected.
We encountered data loss in a simple Flink job that consumes data from Kafka and writes to Elasticsearch. No exceptions were observed, and the logs indicated that the messages never entered Flink, suggesting the issue occurred before the Flink pipeline.
By checking the consumption statistics, we discovered that only three out of five Kafka partitions were being consumed. Screenshots showed the missing partitions.
Further investigation revealed that the bootstrap.servers configuration mixed addresses from both the production (pro) and pre‑release (pre) Kafka clusters:
bootstrap.servers=[business-s2-002-kafka1.xxxx.com.cn:9092,business-s2-002-kafka2.xxxx.com.cn:9092,business-s2-002-kafka3.xxxx.com.cn:9092,pre-kafka1.xxxx.com.cn:9092,pre-kafka2.xxxx.com.cn:9092,pre-kafka3.xxxx.com.cn:9092]
According to the Kafka consumer algorithm, the client selects the first reachable address from the list, but in practice the selection appeared random, leading to some sub‑tasks connecting to the pre cluster and others to the pro cluster.
The consumer group sp2_group_name_G_2023_10_08_16_58_03 subscribed to test_topic_A . The production cluster has five partitions (0‑4), while the pre cluster only has three partitions (0‑2).
Key log excerpts (shown in the original images) pointed to the consumer’s open and run methods. The relevant code fragments are:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}", new Object[]{this.getRuntimeContext().getIndexOfThisSubtask(), this.subscribedPartitionsToStartOffsets.size(), this.specificStartupOffsets, this.subscribedPartitionsToStartOffsets.keySet()});
and
List<KafkaTopicPartition> allPartitions = this.partitionDiscoverer.discoverPartitions();
Analysis of the workflow:
Step 1 – The consumer reads the latest offsets for each partition based on timestamps. The logs show offsets for partitions 0‑4.
Step 2 – Each sub‑task (0‑3) calls discoverPartitions() to obtain the partitions assigned to it. Because the client may connect to either the pro or pre cluster, the assignment becomes inconsistent. In one run we observed:
Partition 4 was missing, likely because the sub‑tasks that connected to the pre cluster could not see it.
Step 3 – The fetcher creates a connection to pull data. If the offsets were obtained from the pro cluster but the fetcher connects to the pre cluster (where those offsets do not exist), no data is returned, causing the apparent loss.
To verify the hypothesis, a second Flink job with the same bootstrap.servers configuration was launched. Its logs showed that only partitions 0‑2 were discovered for offsets, confirming that the client had connected to the pre cluster for the offset‑fetch phase, while discoverPartitions() later connected to the pro cluster for partition assignment.
After correcting the bootstrap.servers list to contain only the production cluster addresses, the partition assignment became stable and the data loss issue disappeared.
In summary, the root cause was a misconfigured Kafka bootstrap server list that mixed production and pre‑release clusters, leading to inconsistent partition discovery and offset handling.
HelloTech
Official Hello technology account, sharing tech insights and developments.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.