Big Data 14 min read

Understanding Flink Join Types, Optimizations, and Physical Plan Translation

This article explains the different join types supported by Apache Flink—including regular, interval, temporal, and lookup joins—provides SQL examples, details how the Flink optimizer transforms logical plans into efficient physical plans, and describes the underlying code generation and execution mechanisms.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding Flink Join Types, Optimizations, and Physical Plan Translation

Flink offers several join mechanisms for stream processing: regular inner joins that keep both sides in state, interval joins that match records within a time window, temporal joins that use versioned tables, and lookup joins that query external dimension tables at join time.

Examples include a basic SQL inner join:

SELECT * FROM Orders
JOIN Product
ON Orders.productId = Product.id

and an interval join that restricts matches to one hour before or after the order time:

SELECT ...
FROM Orders AS o JOIN Payment AS p ON
  o.orderId = p.orderId AND
  p.payTime BETWEEN orderTime - INTERVAL '1' HOUR AND
  orderTime + INTERVAL '1' HOUR

Temporal joins require a versioned table with primary key and event‑time attributes; they are expressed with the FOR SYSTEM_TIME AS OF syntax. A sample DDL creates an orders table and a CDC‑backed currency_rates versioned table, then joins them on currency.

-- left table (append‑only)
CREATE TABLE orders (
    order_id STRING,
    price DECIMAL(32,2),
    currency STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- right versioned table (CDC)
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
    WATERMARK FOR update_time AS update_time,
    PRIMARY KEY(currency) NOT ENFORCED
) WITH (
   'connector' = 'kafka',
   'value.format' = 'debezium-json',
   /* ... */
);

SELECT order_id, price, currency, conversion_rate, order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;

Lookup joins are similar to temporal joins but treat the right side as a dimension table accessed via a hash map or external lookup function. The syntax also uses FOR SYSTEM_TIME AS OF and supports only equality conditions.

SELECT o.amount, o.currency, r.rate, o.amount * r.rate
FROM Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency;

The Flink optimizer first parses the SQL into a logical plan (e.g., LogicalSnapshot, LogicalCorrelate), then applies a series of logical rules such as LogicalCorrelateToJoinFromLookupTableRuleWithFilter to convert correlates into joins, pushes projections, and simplifies conditions.

// Example rule matching logic
class LogicalCorrelateToJoinFromLookupTableRuleWithFilter
  extends LogicalCorrelateToJoinFromLookupTemporalTableRule(
    operand(classOf[LogicalCorrelate],
      operand(classOf[RelNode], any()),
      operand(classOf[LogicalFilter],
        operand(classOf[LogicalSnapshot],
          operand(classOf[RelNode], any())))),
    "LogicalCorrelateToJoinFromLookupTableRuleWithFilter"
) { ... }

After logical optimization, physical rules convert the plan to a stream‑physical representation, producing operators such as StreamPhysicalLookupJoin and StreamPhysicalCalc. The final physical plan is then translated into an executable graph where lookup joins become StreamExecLookupJoin transformations that generate lookup functions via code generation.

// Core part of the physical lookup join creation
private def doTransform(
  join: FlinkLogicalJoin,
  input: FlinkLogicalRel,
  temporalTable: RelOptTable,
  calcProgram: Option[RexProgram]
): StreamPhysicalLookupJoin = {
  val joinInfo = join.analyzeCondition
  val cluster = join.getCluster
  val providedTrait = join.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
  val requiredTrait = input.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
  val convInput = RelOptRule.convert(input, requiredTrait)
  new StreamPhysicalLookupJoin(cluster, providedTrait, convInput, temporalTable, calcProgram, joinInfo, join.getJoinType)
}

Overall, the article walks through the end‑to‑end process of writing a Flink join query, understanding its logical representation, applying optimizer rules, and finally generating the runtime execution plan.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataFlinkSQLJoinLookup JoinTemporal Join
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.