nateab opened a new pull request, #27592:
URL: https://github.com/apache/flink/pull/27592

   ## What is the purpose of the change
   
   This pull request fixes 
[FLINK-26051](https://issues.apache.org/jira/browse/FLINK-26051): when 
combining a `ROW_NUMBER()` window function with a subsequent query containing 
`CASE WHEN` expressions and a `WHERE` clause, the stream planner throws **"The 
window can only be ordered in ASCENDING mode."**
   
   The root cause is a rule ordering problem in the stream planner's 
optimization phases:
   
   1. **LOGICAL phase** (Volcano/cost-based): `FlinkCalcMergeRule` merges two 
Calc nodes — the inner ROW_NUMBER filter (`WHERE row_num <= 2`) and the outer 
`CASE WHEN`/`WHERE` — into a single complex Calc.
   2. **LOGICAL_REWRITE phase** (HEP/sequential): `FlinkLogicalRankRule` looks 
for the pattern `FlinkLogicalCalc` on `FlinkLogicalOverAggregate`. By this 
point, the merged Calc is too complex for the rule to match because the 
remaining predicates access the rank field, so `matches()` returns false.
   3. The query falls through to `StreamExecOverAggregate` which enforces 
ascending-only ordering, causing the error.
   
   The batch planner already handles this correctly by placing rank rules 
alongside `FlinkCalcMergeRule` in the same Volcano phase, so the optimizer 
explores both alternatives.
   
   The fix adds a new Volcano-safe rank rule variant 
(`FlinkLogicalRankRuleForConstantRangeAllFunctions`) to `LOGICAL_RULES` in the 
stream planner. This rule handles all `SqlRankFunction` types (ROW_NUMBER, 
RANK, DENSE_RANK) with constant rank ranges, and unlike 
`FlinkLogicalRankRuleForRangeEnd`, does not throw exceptions in `matches()`, 
making it safe for the Volcano optimizer. Placing it in the same phase as 
`FlinkCalcMergeRule` allows the optimizer to explore the Rank conversion path 
before or as an alternative to merging Calcs. The existing 
`FlinkLogicalRankRule.INSTANCE` is kept in `LOGICAL_REWRITE` as a safety net 
for variable rank ranges.
   
   ## Brief change log
   
     - Added `FlinkLogicalRankRuleForConstantRangeAllFunctions`: a new rule 
class that accepts all `SqlRankFunction` types with constant rank ranges, safe 
for the Volcano optimizer (no exceptions in `matches()`)
     - Added `FlinkLogicalRankRule.CONSTANT_RANGE_ALL_FUNCTIONS_INSTANCE` to 
`LOGICAL_RULES` in `FlinkStreamRuleSets`, along with `CalcRankTransposeRule` 
and `ConstantRankNumberColumnRemoveRule`, placed before the calc rules block — 
mirroring the batch planner pattern
     - Added regression test `testRowNumberWithCaseWhenAndWhereClause` in 
stream `RankTest`
     - Updated expected plans for `testRankFunctionInMiddle` (stream 
`RankTest`) and `testMultiSameRankFunctionsWithSameGroup` 
(`FlinkLogicalRankRuleForRangeEndTest`) to reflect cosmetic field naming 
changes from earlier rank conversion
   
   ## Verifying this change
   
   This change added a test and can be verified as follows:
   
     - Added `testRowNumberWithCaseWhenAndWhereClause` that reproduces the 
exact bug scenario: `ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC)` with 
`WHERE row_num <= 2`, wrapped in a query with `CASE WHEN c > 10 THEN 'big' ELSE 
'small' END` and `WHERE b <> 'z'`. The optimized plan correctly shows a `Rank` 
node instead of `OverAggregate`.
     - Verified all 76 existing rank-related tests pass: stream `RankTest` (46 
tests), batch `RankTest` (16 tests), `FlinkLogicalRankRuleForRangeEndTest` (14 
tests)
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no (optimizer 
rule only, no runtime change)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to