Big Data 14 min read

Flink Real-Time Data Development: Cases on Data Skew, Watermark Failure, and GroupBy Issues

The article walks through three Flink streaming pitfalls—data‑skew‑induced back‑pressure, lost watermarks after interval joins, and ineffective group‑by causing duplicate rows—and shows how to resolve them with two‑stage distinct aggregation, hash‑based key distribution, processing‑time windows or split jobs, and mini‑batch buffering.

DaTaobao Tech
DaTaobao Tech
DaTaobao Tech
Flink Real-Time Data Development: Cases on Data Skew, Watermark Failure, and GroupBy Issues

Recently I worked on several real‑time data development tasks using Flink and encountered problems such as data skew causing back‑pressure, interval joins, and watermarks becoming ineffective. By analyzing and solving these issues I deepened my understanding of Flink’s principles and mechanisms, and now share the experience.

The article presents three cases, each divided into background, problem analysis, and solution.

Case 1: Data Skew

Background : Real‑time exposure streams need to count UV and PV for each creative over the last 24 hours, updated every minute. The typical solution uses a hop sliding window:

SELECT
    HOP_START(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) AS window_start,
    HOP_END(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) AS window_end,
    creative_id,
    COUNT(DISTINCT uid) AS exp_uv,
    COUNT(uid) AS exp_pv
FROM dwd_expos_detail
GROUP BY HOP(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR), creative_id;

Problem : The GlobalWindowAggregate operator becomes busy for a long time, causing back‑pressure and high latency.

Analysis : The bottleneck is caused by data skew when grouping by a key that is unevenly distributed (e.g., group‑by + count distinct). The skew leads to one subtask being overloaded.

Solution :

Enable PartialFinal for count‑distinct to split the aggregation into two stages: table.optimizer.distinct-agg.split.enabled: true

When PartialFinal is not applicable (as with hop windows), manually hash the distinct key and add a modulo key to make the distribution uniform, then perform a two‑stage aggregation.

SELECT
    HOP_START(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) AS window_start,
    HOP_END(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) AS window_end,
    creative_id,
    COUNT(DISTINCT uid) AS exp_uv,
    COUNT(uid) AS exp_pv
FROM dwd_expos_detail
GROUP BY HOP(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR), creative_id, MOD(HASH_CODE(uid), 1024);

SELECT
    window_start,
    window_end,
    creative_id,
    SUM(exp_uv) AS exp_uv,
    SUM(exp_pv) AS exp_pv
FROM (
    ...same inner query as above...
) GROUP BY window_start, window_end, creative_id;

The topology shows balanced subtask load after the fix.

Case 2: Watermark Failure

Background : Two real‑time streams are interval‑joined, then a hop window aggregates metrics per creative.

Problem : After the join, the window never fires because the watermark is lost.

Analysis : Watermarks are dropped after GroupBy, interval join, or OVER windows, so Event‑Time windows cannot be triggered.

Solution :

Option 1: Add a processing‑time field after the join and use it for windowing (accuracy may suffer).

Option 2: Create a separate Flink job that performs the interval join, writes the result to a temporary table (tt), then a second job consumes the tt with a proper event‑time watermark and applies the hop window.

INSERT INTO sink_dwd_pop_pay_detail_ri
SELECT p1.uid, p1.order_id, p1.order_amount, p1.ts, p2.creative_id
FROM (
    SELECT uid, order_id, order_amount, ts FROM dwd_trade_detail
) p1
JOIN dwd_clk_uv_detail p2
ON p2.ts BETWEEN p1.ts - INTERVAL '6' HOUR AND p1.ts
   AND p1.uid = p2.uid;
SELECT
    HOP_START(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) AS window_start,
    HOP_END(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR) AS window_end,
    creative_id,
    SUM(order_amount) AS total_gmv,
    COUNT(DISTINCT uid) AS cnt_order_uv,
    ROUND(SUM(order_amount) / COUNT(DISTINCT uid), 2) AS gmv_per_uv
FROM source_dwd_pop_pay_detail_ri
GROUP BY HOP(ts, INTERVAL '1' MINUTE, INTERVAL '24' HOUR), creative_id;

Case 3: Group‑By Ineffectiveness

Background : Real‑time streams need to label each material as filtered or not based on two array fields (lastValidPlanInfo, validPlanInfo). The intended result is a single row per material with the minimal is_filtered value.

Problem : The ODPS sink shows duplicate rows for the same material because Flink writes each intermediate result directly (no upsert), and lateral table + UNION ALL expands one input row into multiple rows.

Analysis : Lateral table splits array fields into multiple rows; UNION ALL creates two rows per material (one with is_filtered = 1, another with = 0). Since ODPS cannot retract or update, both rows are persisted.

Solution : Use Flink mini‑batch (micro‑batch) to buffer records and perform a single group‑by aggregation before writing:

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 1s

This reduces the number of writes and ensures only one output per material.

Conclusion : FlinkSQL is powerful for real‑time data needs, but its streaming semantics differ from batch SQL (e.g., ODPS). Understanding watermarks, back‑pressure, and state handling is essential to avoid “mysterious” results and to write efficient, correct streaming jobs.

Optimizationreal-timeflinkSQLwatermarkData Skew
DaTaobao Tech
Written by

DaTaobao Tech

Official account of DaTaobao Technology

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.