Understanding Top‑N Optimization in Flink SQL: Logical and Physical Plans
This article explains how Flink SQL implements Top‑N queries, shows the standard SQL syntax, analyzes the logical and physical execution plans generated by the optimizer, and details the internal Rank node, optimization rules, state handling, and configuration options for efficient stream processing.
When developing business logic with Flink, the Top‑N pattern is common; while the DataStream API already provides mature solutions, Flink SQL also supports it with a standard syntax that can be copied directly from the official documentation.
The SQL syntax is:
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]Using the EXPLAIN statement on a sample query reveals the logical plan, optimized logical plan, and physical execution plan. The key part is that the row_number() OVER(PARTITION BY ...) clause is transformed into a Rank RelNode during logical optimization.
EXPLAIN PLAN FOR SELECT * FROM (
SELECT *,
row_number() OVER(PARTITION BY merchandiseId ORDER BY totalQuantity DESC) AS rownum
FROM (
SELECT merchandiseId, sum(quantity) AS totalQuantity
FROM rtdw_dwd.kafka_order_done_log
GROUP BY merchandiseId
)
) WHERE rownum <= 10
== Abstract Syntax Tree ==
LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[$2])
+- LogicalFilter(condition=[<=($2, 10)])
+- LogicalProject(merchandiseId=[$0], totalQuantity=[$1], rownum=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY $1 DESC NULLS LAST)])
+- ...
== Optimized Logical Plan ==
Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])
+- Exchange(distribution=[hash[merchandiseId]])
+- ...
== Physical Execution Plan ==
Stage 1 : Data Source
...
Stage 2 : Operator
...
Stage 4 : Operator
...
Stage 6 : Operator
content : Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=10], partitionBy=[merchandiseId], orderBy=[totalQuantity DESC], select=[merchandiseId, totalQuantity, w0$o0])
ship_strategy : HASHThe Rank node records several important fields: partitionKey (grouping key), orderKey (sorting key and order), rankType (ROW_NUMBER, RANK, DENSE_RANK), rankRange (the N in Top‑N), strategy (result update strategy – AppendFast, Retract, or UpdateFast), and outputRankNumber (whether the ranking number is emitted).
During physical planning, the StreamPhysicalRankRule converts the logical FlinkLogicalRank node into a physical StreamExecRank node. For partitioned Top‑N, data is hashed by partitionKey; for global Top‑N, parallelism is forced to 1. Currently only ROW_NUMBER is supported.
The execution strategy determines which ProcessFunction is used. The most common is RetractableTopNFunction, which maintains two states: dataState (a MapState<RowData, List<RowData>> storing all rows for a partition) and treeMap (a ValueState<SortedMap<RowData, Long>> backed by a red‑black tree to keep counts and produce the Top‑N results efficiently). Using a tree map gives logarithmic complexity, which suits Flink’s time‑sensitive but space‑tolerant environment.
It is essential to enable idle state retention time so that dataState and treeMap do not grow indefinitely. For time‑windowed rankings (e.g., daily or hourly), the time dimension should be added to the PARTITION BY clause rather than relying solely on retention time.
The StreamExecRank operator also exposes a configurable parameter table.exec.topn.cache-size (default 10000) that controls the size of the Top‑N cache; increasing it can reduce state accesses for large Top‑N sizes and improve performance.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
