This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4914947263 [multistage] Add support for the ranking ROW_NUMBER() 
window function (#10587)
4914947263 is described below

commit 49149472630159d7a7f5f49cdf81cbf9b0bd5e00
Author: Sonam Mandal <soman...@linkedin.com>
AuthorDate: Mon Apr 24 08:07:40 2023 -0700

    [multistage] Add support for the ranking ROW_NUMBER() window function 
(#10587)
---
 .../rules/PinotWindowExchangeNodeInsertRule.java   |  16 +-
 .../pinot/query/QueryEnvironmentTestBase.java      |   1 +
 .../resources/queries/WindowFunctionPlans.json     | 515 ++++++++++++++++-
 .../runtime/operator/WindowAggregateOperator.java  | 159 ++++--
 .../operator/WindowAggregateOperatorTest.java      |  69 ++-
 .../test/resources/queries/WindowFunctions.json    | 608 +++++++++++++++++++++
 6 files changed, 1322 insertions(+), 46 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
index e9283db8f0..1532feca6d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotWindowExchangeNodeInsertRule.java
@@ -19,6 +19,7 @@
 package org.apache.calcite.rel.rules;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
 import java.util.HashSet;
@@ -49,7 +50,7 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
   // Supported window functions
   // OTHER_FUNCTION supported are: BOOL_AND, BOOL_OR
   private static final Set<SqlKind> SUPPORTED_WINDOW_FUNCTION_KIND = 
ImmutableSet.of(SqlKind.SUM, SqlKind.SUM0,
-      SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.OTHER_FUNCTION);
+      SqlKind.MIN, SqlKind.MAX, SqlKind.COUNT, SqlKind.ROW_NUMBER, 
SqlKind.OTHER_FUNCTION);
 
   public PinotWindowExchangeNodeInsertRule(RelBuilderFactory factory) {
     super(operand(LogicalWindow.class, any()), factory, null);
@@ -145,19 +146,26 @@ public class PinotWindowExchangeNodeInsertRule extends 
RelOptRule {
   }
 
   private void validateWindowFrames(Window.Group windowGroup) {
+    // Has ROWS only aggregation call kind (e.g. ROW_NUMBER)?
+    boolean isRowsOnlyTypeAggregateCall = 
isRowsOnlyAggregationCallType(windowGroup.aggCalls);
     // For Phase 1 only the default frame is supported
-    Preconditions.checkState(!windowGroup.isRows, "Default frame must be of 
type RANGE and not ROWS");
+    Preconditions.checkState(!windowGroup.isRows || 
isRowsOnlyTypeAggregateCall,
+        "Default frame must be of type RANGE and not ROWS unless this is a 
ROWS only aggregation function");
     Preconditions.checkState(windowGroup.lowerBound.isPreceding() && 
windowGroup.lowerBound.isUnbounded(),
         String.format("Lower bound must be UNBOUNDED PRECEDING but it is: %s", 
windowGroup.lowerBound));
-    if (windowGroup.orderKeys.getKeys().isEmpty()) {
+    if (windowGroup.orderKeys.getKeys().isEmpty() && 
!isRowsOnlyTypeAggregateCall) {
       Preconditions.checkState(windowGroup.upperBound.isFollowing() && 
windowGroup.upperBound.isUnbounded(),
-          String.format("Upper bound must be UNBOUNDED PRECEDING but it is: 
%s", windowGroup.upperBound));
+          String.format("Upper bound must be UNBOUNDED FOLLOWING but it is: 
%s", windowGroup.upperBound));
     } else {
       Preconditions.checkState(windowGroup.upperBound.isCurrentRow(),
           String.format("Upper bound must be CURRENT ROW but it is: %s", 
windowGroup.upperBound));
     }
   }
 
+  private boolean 
isRowsOnlyAggregationCallType(ImmutableList<Window.RexWinAggCall> aggCalls) {
+    return aggCalls.stream().anyMatch(aggCall -> 
aggCall.getKind().equals(SqlKind.ROW_NUMBER));
+  }
+
   private boolean isPartitionByOnlyQuery(Window.Group windowGroup) {
     boolean isPartitionByOnly = false;
     if (windowGroup.orderKeys.getKeys().isEmpty()) {
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index 4a64e632aa..9b72e32613 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -114,6 +114,7 @@ public class QueryEnvironmentTestBase {
         new Object[]{"SELECT a.col1, SUM(a.col3) OVER (PARTITION BY a.col2, 
a.col1) FROM a"},
         new Object[]{"SELECT a.col1, SUM(a.col3) OVER (ORDER BY a.col2, 
a.col1), MIN(a.col3) OVER (ORDER BY a.col2, "
             + "a.col1) FROM a"},
+        new Object[]{"SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 
ORDER BY a.col3) FROM a"},
         new Object[]{"SELECT a.col1, SUM(a.col3) OVER (ORDER BY a.col2), 
MIN(a.col3) OVER (ORDER BY a.col2) FROM a"},
         new Object[]{"SELECT /*+ skipLeafStageGroupByAggregation */ a.col1, 
SUM(a.col3) FROM a WHERE a.col3 >= 0"
             + " AND a.col2 = 'a' GROUP BY a.col1"},
diff --git 
a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json 
b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
index ee277e54b0..0be92124b7 100644
--- a/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/WindowFunctionPlans.json
@@ -14,6 +14,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single empty OVER() only row_number",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER() FROM a",
+        "notes": "TODO: ROW_NUMBER() with empty OVER() and no other columns in 
select results in the leaf level LogicalProject not projecting any rows. This 
is incorrect since we need to project at least one column for assigning 
ROW_NUMBERS",
+        "output": [
+          "Execution Plan",
+          "\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING 
and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n  LogicalExchange(distribution=[hash])",
+          "\n    LogicalProject",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single empty OVER() and select col",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a",
@@ -27,6 +40,18 @@
           "\n"
         ]
       },
+      {
+        "description": "single empty OVER() row_number and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER() FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING 
and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n  LogicalExchange(distribution=[hash])",
+          "\n    LogicalProject(col1=[$0])",
+          "\n      LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single empty OVER() and select col with select alias",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, SUM(a.col3) OVER() 
AS sum FROM a",
@@ -71,6 +96,22 @@
           "\n"
         ]
       },
+      {
+        "description": "single empty OVER() row_number and select col with 
global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER() FROM a 
ORDER BY a.col2",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$0], EXPR$1=[$2], col2=[$1])",
+          "\n        LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col1=[$0], col2=[$1])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single empty OVER() and select col with LIMIT",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER() FROM a 
LIMIT 10",
@@ -130,6 +171,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single empty OVER() row_number select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER() FROM a 
WHERE a.col3 > 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalWindow(window#0=[window( rows between UNBOUNDED PRECEDING 
and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n  LogicalExchange(distribution=[hash])",
+          "\n    LogicalProject(col1=[$0])",
+          "\n      LogicalFilter(condition=[>($2, 10)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single empty OVER() with select transform and filter",
         "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), 
MIN(a.col3) OVER() FROM a where a.col1 IN ('foo', 'bar')",
@@ -224,6 +278,25 @@
           "\n"
         ]
       },
+      {
+        "description": "single empty OVER() row_number with aggregate avg and 
group by and order by",
+        "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3), ROW_NUMBER() OVER() FROM 
a GROUP BY a.col3 ORDER BY a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(EXPR$0=[$1], EXPR$1=[$2], col3=[$0])",
+          "\n        LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n          LogicalExchange(distribution=[hash])",
+          "\n            LogicalProject(col3=[$0], EXPR$0=[/(CAST($1):DOUBLE 
NOT NULL, $2)])",
+          "\n              LogicalAggregate(group=[{0}], agg#0=[$SUM0($1)], 
agg#1=[$SUM0($2)])",
+          "\n                LogicalExchange(distribution=[hash[0]])",
+          "\n                  LogicalAggregate(group=[{2}], 
agg#0=[$SUM0($2)], agg#1=[COUNT()])",
+          "\n                    LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple empty OVER()s only",
         "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(), COUNT(a.col2) 
OVER() FROM a",
@@ -367,6 +440,21 @@
           "\n"
         ]
       },
+      {
+        "description": "multiple empty OVER()s row_number with select 
transform and filter",
+        "notes": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "sql": "EXPLAIN PLAN FOR SELECT LENGTH(CONCAT(a.col1, ' ', a.col2)), 
ROW_NUMBER() OVER(), ROW_NUMBER() OVER() FROM a where a.col1 NOT IN ('foo', 
'bar', 'baz')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$0], EXPR$1=[$1], EXPR$2=[$1])",
+          "\n  LogicalWindow(window#0=[window( rows between UNBOUNDED 
PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash])",
+          "\n      LogicalProject($0=[LENGTH(CONCAT($0, ' ', $1))])",
+          "\n        LogicalFilter(condition=[AND(<>($0, 'bar'), <>($0, 
'baz'), <>($0, 'foo'))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple empty OVER()s with group by",
         "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(), SUM(a.col3) OVER() 
FROM a GROUP BY a.col3",
@@ -460,6 +548,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY) row_number only",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col2) 
FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(partition {0} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY) only with alias",
         "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3) OVER(PARTITION BY a.col2) 
AS sum FROM a",
@@ -500,6 +601,20 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY) row_number and select col 
with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, ROW_NUMBER() 
OVER(PARTITION BY a.col2) AS row_num FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final 
plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY) with default frame",
         "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
@@ -531,7 +646,7 @@
         ]
       },
       {
-        "description": "single OVER(PARTITION BY) and select col",
+        "description": "single OVER(PARTITION BY) and select col with LIMIT",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY 
a.col2) FROM a LIMIT 10",
         "output": [
           "Execution Plan",
@@ -562,6 +677,22 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY) row_number and select col 
with global order by with LIMIT",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, ROW_NUMBER() OVER(PARTITION BY 
a.col1) FROM a ORDER BY a.col1 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
+          "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
+          "\n        LogicalWindow(window#0=[window(partition {0} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col1=[$0], col2=[$1])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY) and transform col",
         "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) 
OVER(PARTITION BY a.col3) FROM a",
@@ -575,6 +706,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY) row_number and transform 
col",
+        "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), ROW_NUMBER() 
OVER(PARTITION BY a.col3) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col3=[$2], $1=[SUBSTR($0, 0, 2)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY) select col and filter",
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, AVG(a.col3) OVER(PARTITION BY 
a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
@@ -616,6 +760,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY) row_number with transform on 
partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY 
CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(partition {0} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject($0=[CONCAT($0, '-', $1)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY) with group by",
         "sql": "EXPLAIN PLAN FOR SELECT MIN(a.col3) OVER(PARTITION BY a.col3) 
FROM a GROUP BY a.col3",
@@ -646,6 +803,21 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY) row_number with select col 
and group by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY 
a.col1) FROM a GROUP BY a.col1, a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalWindow(window#0=[window(partition {0} rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalProject(col1=[$0])",
+          "\n      LogicalAggregate(group=[{0, 1}])",
+          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalAggregate(group=[{0, 2}])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY) with aggregate and group by",
         "sql": "EXPLAIN PLAN FOR SELECT SUM(a.col3), MIN(a.col3) 
OVER(PARTITION BY a.col3) FROM a GROUP BY a.col3",
@@ -843,6 +1015,23 @@
           "\n"
         ]
       },
+      {
+        "description": "multiple OVER(PARTITION BY)s row_number on the same 
key but in reverse order and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY 
a.col2, a.col1), ROW_NUMBER() OVER(PARTITION BY a.col1, a.col2) FROM a ORDER BY 
a.col1",
+        "notes": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[0]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$0], dir0=[ASC])",
+          "\n      LogicalProject(col1=[$0], $1=[$2], $2=[$3])",
+          "\n        LogicalWindow(window#0=[window(partition {0, 1} rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER(), 
ROW_NUMBER()])])",
+          "\n          LogicalExchange(distribution=[hash[0, 1]])",
+          "\n            LogicalProject(col1=[$0], col2=[$1])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple OVER(PARTITION BY)s on the same key but in 
reverse order and select col with global order by (avg agg)",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, SUM(a.col3) OVER(PARTITION BY 
a.col2, a.col1), AVG(a.col3) OVER(PARTITION BY a.col1, a.col2) FROM a ORDER BY 
a.col1",
@@ -1049,6 +1238,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(ORDER BY) row_number and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY 
a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(ORDER BY) and select col with select 
alias",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) 
OVER(ORDER BY a.col2) AS avg FROM a",
@@ -1062,6 +1264,20 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(ORDER BY) row_number and select col with 
select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, ROW_NUMBER() 
OVER(ORDER BY a.col2) AS row_number FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final 
plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(ORDER BY) with default frame",
         "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(ORDER BY a.col1 RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
@@ -1107,6 +1323,22 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(ORDER BY) row_number and select col with 
LIMIT",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY 
a.col2) FROM a LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[]], 
isSortOnSender=[false], isSortOnReceiver=[false])",
+          "\n    LogicalSort(fetch=[10])",
+          "\n      LogicalProject(col1=[$0], $1=[$2])",
+          "\n        LogicalWindow(window#0=[window(order by [1] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n          PinotLogicalSortExchange(distribution=[hash], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n            LogicalProject(col1=[$0], col2=[$1])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(ORDER BY) and select col with global order 
by with LIMIT",
         "sql": "EXPLAIN PLAN FOR SELECT a.col2, MIN(a.col3) OVER(ORDER BY 
a.col1 DESC) FROM a ORDER BY a.col1 LIMIT 10",
@@ -1165,7 +1397,21 @@
         ]
       },
       {
-        "description": "single OVER(ORDER BY) with transform on partition key",
+        "description": "single OVER(ORDER BY) row_number with select transform 
and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), 
ROW_NUMBER() OVER(ORDER BY a.col2) FROM a where a.col1 NOT IN ('foo', 'bar') OR 
a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [0] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col2=[$1], $1=[CONCAT($0, '-', $1)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($0, 'bar'), <>($0, 
'foo')), >=($2, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(ORDER BY) with transform on order by key",
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(ORDER BY 
CONCAT(a.col1, '-', a.col2)) FROM a",
         "output": [
           "Execution Plan",
@@ -1177,6 +1423,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(ORDER BY) row_number with transform on 
order by key",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(ORDER BY 
CONCAT(a.col1, '-', a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(order by [0] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject($0=[CONCAT($0, '-', $1)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple OVER(ORDER BY)s on the same key only",
         "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(ORDER BY a.col1), 
COUNT(a.col2) OVER(ORDER BY a.col1) FROM a",
@@ -1304,6 +1563,20 @@
           "\n"
         ]
       },
+      {
+        "description": "multiple OVER(ORDER BY)s row_number on the same key 
and transform col",
+        "sql": "EXPLAIN PLAN FOR SELECT REVERSE(a.col1), ROW_NUMBER() 
OVER(ORDER BY a.col2), ROW_NUMBER() OVER(ORDER BY a.col2) FROM a",
+        "notes": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$2])",
+          "\n  LogicalWindow(window#0=[window(order by [0] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col2=[$1], $1=[REVERSE($0)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple OVER(ORDER BY)s on the same key select col 
and filter",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(ORDER BY 
a.col1), COUNT(a.col1) OVER(ORDER BY a.col1) FROM a WHERE a.col3 > 42 AND 
a.col1 IN ('vader', 'chewbacca', 'yoda')",
@@ -1372,6 +1645,20 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) row_number 
only with alias",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col2 
ORDER BY a.col2) AS row_number FROM a",
+        "notes": "TODO: Look into why aliases are getting ignored in the final 
plan",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$1])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject(col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k1) and select 
col",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY 
a.col2 ORDER BY a.col2) FROM a",
@@ -1385,6 +1672,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) row_number 
and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY 
a.col2 ORDER BY a.col2) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k1) and select 
col with select alias",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) 
OVER(PARTITION BY a.col2 ORDER BY a.col2) AS avg FROM a",
@@ -1459,6 +1759,22 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) row_number 
and select col with global order by with LIMIT",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, ROW_NUMBER() OVER(PARTITION BY 
a.col1 ORDER BY a.col1) FROM a ORDER BY a.col1 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0], fetch=[10])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC], fetch=[10])",
+          "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
+          "\n        LogicalWindow(window#0=[window(partition {0} order by [0] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n          LogicalExchange(distribution=[hash[0]])",
+          "\n            LogicalProject(col1=[$0], col2=[$1])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k1) and transform 
col",
         "sql": "EXPLAIN PLAN FOR SELECT SUBSTR(a.col1, 0, 2), COUNT(a.col2) 
OVER(PARTITION BY a.col3 ORDER BY a.col3) FROM a",
@@ -1486,6 +1802,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) row_number 
select col and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, ROW_NUMBER() OVER(PARTITION BY 
a.col2 ORDER BY a.col2) FROM a WHERE a.col3 > 10 AND a.col3 <= 500",
+        "output": [
+          "Execution Plan",
+          "\nLogicalWindow(window#0=[window(partition {0} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n  LogicalExchange(distribution=[hash[0]])",
+          "\n    LogicalProject(col2=[$1])",
+          "\n      LogicalFilter(condition=[AND(>($2, 10), <=($2, 500))])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k1) with select 
transform and filter",
         "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), 
AVG(a.col3) OVER(PARTITION BY a.col2 ORDER BY a.col2) FROM a where a.col1 NOT 
IN ('foo', 'bar') OR a.col3 >= 42",
@@ -1526,6 +1855,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k1) row_number 
but order by has different direction and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY 
a.col2 ORDER BY a.col2 DESC) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [1 DESC] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[1]])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k1) but order by 
has different null direction and select col",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY 
a.col2 ORDER BY a.col2 NULLS FIRST) FROM a",
@@ -1720,6 +2062,20 @@
           "\n"
         ]
       },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k1) row_number 
with transform on partition key",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY 
REVERSE(CONCAT(a.col1, '-', a.col2)) ORDER BY REVERSE(CONCAT(a.col1, '-', 
a.col2))), ROW_NUMBER() OVER(PARTITION BY REVERSE(CONCAT(a.col1, '-', a.col2)) 
ORDER BY REVERSE(CONCAT(a.col1, '-', a.col2))) FROM a",
+        "notes": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(EXPR$0=[$1], EXPR$1=[$1])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    LogicalExchange(distribution=[hash[0]])",
+          "\n      LogicalProject($0=[REVERSE(CONCAT($0, '-', $1))])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple OVER(PARTITION BY k1 ORDER BY k1)s on the 
same key but order by has different direction and select col",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY 
a.col1 ORDER BY a.col1 DESC), MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY 
a.col1 DESC) FROM a",
@@ -1785,6 +2141,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) row_number 
only with select alias",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col2 
ORDER BY a.col1) AS row_number FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[1]], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k2) and select 
col",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY 
a.col2 ORDER BY a.col1) FROM a",
@@ -1798,6 +2167,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) row_number 
and select col",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY 
a.col2 ORDER BY a.col1) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[1]], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k2) and select 
col with select alias",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, AVG(a.col3) 
OVER(PARTITION BY a.col2 ORDER BY a.col1) AS avg FROM a",
@@ -1840,6 +2222,22 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) row_number 
and select col with global order by",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col2, ROW_NUMBER() OVER(PARTITION BY 
a.col1 ORDER BY a.col2) FROM a ORDER BY a.col1",
+        "output": [
+          "Execution Plan",
+          "\nLogicalSort(sort0=[$2], dir0=[ASC], offset=[0])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[2]], 
isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalSort(sort0=[$2], dir0=[ASC])",
+          "\n      LogicalProject(col2=[$1], EXPR$1=[$2], col1=[$0])",
+          "\n        LogicalWindow(window#0=[window(partition {0} order by [1] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n          PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n            LogicalProject(col1=[$0], col2=[$1])",
+          "\n              LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "single OVER(PARTITION BY k1 ORDER BY k2) and select 
col with LIMIT",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, AVG(a.col3) OVER(PARTITION BY 
a.col2 ORDER BY a.col1) FROM a LIMIT 10",
@@ -1914,7 +2312,21 @@
         ]
       },
       {
-        "description": "single OVER(PARTITION BY k1 ORDER BY k2) with 
transform on partition key",
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) row_number 
with select transform and filter",
+        "sql": "EXPLAIN PLAN FOR SELECT CONCAT(a.col1, '-', a.col2), 
ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col1) FROM a where a.col1 NOT 
IN ('foo', 'bar') OR a.col3 >= 42",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2], $1=[$3])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[1]], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col1=[$0], col2=[$1], $2=[CONCAT($0, '-', 
$1)])",
+          "\n        LogicalFilter(condition=[OR(AND(<>($0, 'bar'), <>($0, 
'foo')), >=($2, 42))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) with 
transform on partition key and order key",
         "sql": "EXPLAIN PLAN FOR SELECT AVG(a.col3) OVER(PARTITION BY 
CONCAT(a.col1, '-', a.col2) ORDER BY REVERSE(a.col2)) FROM a",
         "output": [
           "Execution Plan",
@@ -1926,6 +2338,19 @@
           "\n"
         ]
       },
+      {
+        "description": "single OVER(PARTITION BY k1 ORDER BY k2) row_number 
with transform on partition key and order key",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY 
CONCAT(a.col1, '-', a.col2) ORDER BY REVERSE(a.col2)) FROM a",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject($0=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {1} order by [0] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[1]], 
collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject($0=[REVERSE($1)], $1=[CONCAT($0, '-', $1)])",
+          "\n        LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the 
same key only (single window group)",
         "sql": "EXPLAIN PLAN FOR SELECT MAX(a.col3) OVER(PARTITION BY a.col1 
ORDER BY a.col3), COUNT(a.col2) OVER(PARTITION BY a.col1 ORDER BY a.col3) FROM 
a",
@@ -2067,6 +2492,20 @@
           "\n"
         ]
       },
+      {
+        "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s row_number 
on the same key select col and filter (single window group)",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY 
a.col1 ORDER BY a.col2), ROW_NUMBER() OVER(PARTITION BY a.col1 ORDER BY a.col2) 
FROM a WHERE a.col3 > 42 AND a.col1 IN ('vader', 'chewbacca', 'yoda')",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], EXPR$1=[$2], EXPR$2=[$2])",
+          "\n  LogicalWindow(window#0=[window(partition {0} order by [1] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n    PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n      LogicalProject(col1=[$0], col2=[$1])",
+          "\n        LogicalFilter(condition=[AND(>($2, 42), OR(=($0, 
'chewbacca':VARCHAR(9)), =($0, 'vader':VARCHAR(9)), =($0, 
'yoda':VARCHAR(9))))])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "multiple OVER(PARTITION BY k1 ORDER BY k2)s on the 
same key with select transform and filter (single window group)",
         "sql": "EXPLAIN PLAN FOR SELECT REVERSE(CONCAT(a.col1, ' ', a.col2)), 
MIN(a.col3) OVER(PARTITION BY a.col1 ORDER BY a.col2), MAX(a.col3) 
OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a where a.col2 NOT IN ('foo', 
'bar', 'baz')",
@@ -2128,6 +2567,21 @@
           "\n"
         ]
       },
+      {
+        "description": "Window function using row_number with GROUP BY example 
with aggregation used within ORDER BY clause in OVER",
+        "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*), ROW_NUMBER() 
OVER(ORDER BY COUNT(*) desc, a.col1 asc) from a GROUP BY a.col1, a.col3",
+        "output": [
+          "Execution Plan",
+          "\nLogicalWindow(window#0=[window(order by [1 DESC, 0] rows between 
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n  PinotLogicalSortExchange(distribution=[hash], collation=[[1 
DESC, 0]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n    LogicalProject(col1=[$0], EXPR$1=[$2])",
+          "\n      LogicalAggregate(group=[{0, 1}], EXPR$1=[$SUM0($2)])",
+          "\n        LogicalExchange(distribution=[hash[0, 1]])",
+          "\n          LogicalAggregate(group=[{0, 2}], EXPR$1=[COUNT()])",
+          "\n            LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
       {
         "description": "Window function with GROUP BY example with aggregation 
used within ORDER BY clause in OVER with PARTITION BY",
         "sql": "EXPLAIN PLAN FOR SELECT a.col1, COUNT(*), MAX(a.col3) 
OVER(PARTITION BY a.col1 ORDER BY COUNT(*) desc, a.col1 asc) from a GROUP BY 
a.col1, a.col3",
@@ -2142,6 +2596,34 @@
           "\n            LogicalTableScan(table=[[a]])",
           "\n"
         ]
+      },
+      {
+        "description": "Window function CTE: row_number WITH statement having 
OVER with PARTITION BY ORDER BY",
+        "sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, 
ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT 
a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(col1=[$0], $1=[$3])",
+          "\n  LogicalFilter(condition=[<($3, 5)])",
+          "\n    LogicalWindow(window#0=[window(partition {1} order by [2] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n      PinotLogicalSortExchange(distribution=[hash[1]], 
collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n        LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "Window function subquery: row_number having OVER with 
PARTITION BY ORDER BY",
+        "sql": "EXPLAIN PLAN FOR SELECT row_number, col2, col3 FROM (SELECT 
ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3 DESC) as row_number, 
a.col2, a.col3 FROM a) WHERE row_number <= 10",
+        "output": [
+          "Execution Plan",
+          "\nLogicalProject(row_number=[$2], col2=[$0], col3=[$1])",
+          "\n  LogicalFilter(condition=[<=($2, 10)])",
+          "\n    LogicalWindow(window#0=[window(partition {0} order by [1 
DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n      PinotLogicalSortExchange(distribution=[hash[0]], 
collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
+          "\n        LogicalProject(col2=[$1], col3=[$2])",
+          "\n          LogicalTableScan(table=[[a]])",
+          "\n"
+        ]
       }
     ]
   },
@@ -2150,7 +2632,7 @@
       {
         "description": "unsupported window functions such as row_number()",
         "notes": "not yet supported",
-        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
ORDER BY a.col3) FROM a",
+        "sql": "EXPLAIN PLAN FOR SELECT RANK() OVER(PARTITION BY a.col1 ORDER 
BY a.col3) FROM a",
         "expectedException": "Error explain query plan for.*"
       },
       {
@@ -2242,6 +2724,31 @@
         "description": "Wrong table",
         "sql": "EXPLAIN PLAN FOR SELECT MAX(b.col3) OVER(PARTITION BY b.col1 
ORDER BY b.col2) FROM a ORDER BY SUM(b.col3)",
         "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - even default frame cannot be specified",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
ORDER BY a.col2 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - default frame for ROW_NUMBER is different from aggregation window 
functions, resulting in multiple window groups",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
ORDER BY a.col2), SUM(a.col1) OVER(PARTITION BY a.col1 ORDER BY a.col2) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - custom frames not allowed",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - custom frames not allowed",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER() OVER(PARTITION BY a.col1 
ROWS 2 PRECEDING) FROM a",
+        "expectedException": "Error explain query plan for.*"
+      },
+      {
+        "description": "Apache Calcite failures with ROW_NUMBER() window 
functions - passing argument to ROW_NUMBER() should fail",
+        "sql": "EXPLAIN PLAN FOR SELECT ROW_NUMBER(a.col3) OVER(PARTITION BY 
a.col2) FROM a",
+        "expectedException": "Error explain query plan for.*"
       }
     ]
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index d64f74b558..b22d1d55cd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -52,8 +53,10 @@ import org.slf4j.LoggerFactory;
  * columns and in addition will add the aggregation columns to the output data.
  * [input columns, aggregate result1, ... aggregate resultN]
  *
- * The window functions supported today are 
SUM/COUNT/MIN/MAX/AVG/BOOL_OR/BOOL_AND aggregations. Window functions also
- * include other types of functions such as rank and value functions.
+ * The window functions supported today are:
+ * Aggregation: SUM/COUNT/MIN/MAX/AVG/BOOL_OR/BOOL_AND aggregations
+ * Ranking: ROW_NUMBER ranking functions
+ * Value: [none]
  *
  * Unlike the AggregateOperator which will output one row per group, the 
WindowAggregateOperator
  * will output as many rows as input rows.
@@ -67,7 +70,7 @@ import org.slf4j.LoggerFactory;
  * If the input is single value, the output type will be input type. 
Otherwise, the output type will be double.
  *
  * TODO:
- *     1. Add support for rank window functions
+ *     1. Add support for additional rank window functions
  *     2. Add support for value window functions
  *     3. Add support for custom frames (including ROWS support)
  *     4. Add support for null direction handling (even for PARTITION BY only 
queries with custom null direction)
@@ -77,6 +80,9 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   private static final String EXPLAIN_NAME = "WINDOW";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(WindowAggregateOperator.class);
 
+  // List of window functions which can only be applied as ROWS window frame 
type
+  private static final Set<String> ROWS_ONLY_FUNCTION_NAMES = 
ImmutableSet.of("ROW_NUMBER");
+
   private final MultiStageOperator _inputOperator;
   private final List<RexExpression> _groupSet;
   private final OrderSetInfo _orderSetInfo;
@@ -118,8 +124,6 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     _orderSetInfo = new OrderSetInfo(orderSet, orderSetDirection, 
orderSetNullDirection, _isPartitionByOnly);
     _windowFrame = new WindowFrame(lowerBound, upperBound, windowFrameType);
 
-    Preconditions.checkState(_windowFrame.getWindowFrameType() == 
WindowNode.WindowFrameType.RANGE,
-        "Only RANGE type frames are supported at present");
     Preconditions.checkState(_windowFrame.isUnboundedPreceding(),
         "Only default frame is supported, lowerBound must be UNBOUNDED 
PRECEDING");
     Preconditions.checkState(_windowFrame.isUnboundedFollowing() || 
_windowFrame.isUpperBoundCurrentRow(),
@@ -135,9 +139,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     for (int i = 0; i < aggCallsSize; i++) {
       RexExpression.FunctionCall agg = _aggCalls.get(i);
       String functionName = agg.getFunctionName();
-      if (!mergers.containsKey(functionName)) {
-        throw new IllegalStateException("Unexpected aggregation function name: 
" + functionName);
-      }
+      validateAggregationCalls(functionName, mergers);
       _windowAccumulators[i] = new WindowAggregateAccumulator(agg, mergers, 
functionName, inputSchema, _orderSetInfo);
     }
 
@@ -182,6 +184,22 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     }
   }
 
+  private void validateAggregationCalls(String functionName,
+      Map<String, Function<DataSchema.ColumnDataType, 
AggregationUtils.Merger>> mergers) {
+    if (!mergers.containsKey(functionName)) {
+      throw new IllegalStateException("Unexpected aggregation function name: " 
+ functionName);
+    }
+
+    if (ROWS_ONLY_FUNCTION_NAMES.contains(functionName)) {
+      Preconditions.checkState(_windowFrame.getWindowFrameType() == 
WindowNode.WindowFrameType.ROW
+              && _windowFrame.isUpperBoundCurrentRow(),
+          String.format("%s must be of ROW frame type and have CURRENT ROW as 
the upper bound", functionName));
+    } else {
+      Preconditions.checkState(_windowFrame.getWindowFrameType() == 
WindowNode.WindowFrameType.RANGE,
+          String.format("Only RANGE type frames are supported at present for 
function: %s", functionName));
+    }
+  }
+
   private boolean isPartitionByOnlyQuery(List<RexExpression> groupSet, 
List<RexExpression> orderSet) {
     if (CollectionUtils.isEmpty(orderSet)) {
       return true;
@@ -205,18 +223,42 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
   private TransferableBlock produceWindowAggregatedBlock() {
     Key emptyOrderKey = AggregationUtils.extractEmptyKey();
     List<Object[]> rows = new ArrayList<>(_numRows);
-    for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) {
-      Key partitionKey = e.getKey();
-      List<Object[]> rowList = e.getValue();
-      for (Object[] existingRow : rowList) {
-        Object[] row = new Object[existingRow.length + _aggCalls.size()];
-        Key orderKey = _isPartitionByOnly ? emptyOrderKey
-            : AggregationUtils.extractRowKey(existingRow, 
_orderSetInfo.getOrderSet());
-        System.arraycopy(existingRow, 0, row, 0, existingRow.length);
-        for (int i = 0; i < _windowAccumulators.length; i++) {
-          row[i + existingRow.length] = 
_windowAccumulators[i].getResultForKeys(partitionKey, orderKey);
+    if (_windowFrame.getWindowFrameType() == WindowNode.WindowFrameType.RANGE) 
{
+      for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) {
+        Key partitionKey = e.getKey();
+        List<Object[]> rowList = e.getValue();
+        for (Object[] existingRow : rowList) {
+          Object[] row = new Object[existingRow.length + _aggCalls.size()];
+          Key orderKey = _isPartitionByOnly ? emptyOrderKey
+              : AggregationUtils.extractRowKey(existingRow, 
_orderSetInfo.getOrderSet());
+          System.arraycopy(existingRow, 0, row, 0, existingRow.length);
+          for (int i = 0; i < _windowAccumulators.length; i++) {
+            row[i + existingRow.length] = 
_windowAccumulators[i].getRangeResultForKeys(partitionKey, orderKey);
+          }
+          rows.add(row);
+        }
+      }
+    } else {
+      Key previousPartitionKey = null;
+      Object[] previousRowValues = new Object[_windowAccumulators.length];
+      for (int i = 0; i < _windowAccumulators.length; i++) {
+        previousRowValues[i] = null;
+      }
+
+      for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) {
+        Key partitionKey = e.getKey();
+        List<Object[]> rowList = e.getValue();
+        for (Object[] existingRow : rowList) {
+          Object[] row = new Object[existingRow.length + _aggCalls.size()];
+          System.arraycopy(existingRow, 0, row, 0, existingRow.length);
+          for (int i = 0; i < _windowAccumulators.length; i++) {
+            row[i + existingRow.length] = 
_windowAccumulators[i].computeRowResultForCurrentRow(partitionKey,
+                previousPartitionKey, row, previousRowValues[i]);
+            previousRowValues[i] = row[i + existingRow.length];
+          }
+          previousPartitionKey = partitionKey;
+          rows.add(row);
         }
-        rows.add(row);
       }
     }
     _hasReturnedWindowAggregateBlock = true;
@@ -244,16 +286,29 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       }
 
       List<Object[]> container = block.getContainer();
-      for (Object[] row : container) {
-        _numRows++;
-        // TODO: Revisit null direction handling for all query types
-        Key key = AggregationUtils.extractRowKey(row, _groupSet);
-        Key orderKey = _isPartitionByOnly ? emptyOrderKey
-            : AggregationUtils.extractRowKey(row, _orderSetInfo.getOrderSet());
-        _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
-        int aggCallsSize = _aggCalls.size();
-        for (int i = 0; i < aggCallsSize; i++) {
-          _windowAccumulators[i].accumulate(key, orderKey, row);
+      if (_windowFrame.getWindowFrameType() == 
WindowNode.WindowFrameType.RANGE) {
+        // Only need to accumulate the aggregate function values for RANGE 
type. ROW type can be calculated as
+        // we output the rows since the aggregation value depends on the 
neighboring rows.
+        for (Object[] row : container) {
+          _numRows++;
+          // TODO: Revisit null direction handling for all query types
+          Key key = AggregationUtils.extractRowKey(row, _groupSet);
+          _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+          // Only need to accumulate the aggregate function values for RANGE 
type. ROW type can be calculated as
+          // we output the rows since the aggregation value depends on the 
neighboring rows.
+          Key orderKey = _isPartitionByOnly ? emptyOrderKey
+              : AggregationUtils.extractRowKey(row, 
_orderSetInfo.getOrderSet());
+          int aggCallsSize = _aggCalls.size();
+          for (int i = 0; i < aggCallsSize; i++) {
+            _windowAccumulators[i].accumulateRangeResults(key, orderKey, row);
+          }
+        }
+      } else {
+        for (Object[] row : container) {
+          _numRows++;
+          // TODO: Revisit null direction handling for all query types
+          Key key = AggregationUtils.extractRowKey(row, _groupSet);
+          _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
         }
       }
       block = _inputOperator.nextBlock();
@@ -342,25 +397,59 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
     }
   }
 
+  private static class MergeRowNumber implements AggregationUtils.Merger {
+
+    @Override
+    public Object initialize(Object other, DataSchema.ColumnDataType dataType) 
{
+      return 1L;
+    }
+
+    @Override
+    public Object merge(Object left, Object right) {
+      return ((Number) left).longValue() + 1L;
+    }
+  }
+
   private static class WindowAggregateAccumulator extends 
AggregationUtils.Accumulator {
     private static final Map<String, Function<DataSchema.ColumnDataType, 
AggregationUtils.Merger>> WIN_AGG_MERGERS =
         ImmutableMap.<String, Function<DataSchema.ColumnDataType, 
AggregationUtils.Merger>>builder()
             .putAll(AggregationUtils.Accumulator.MERGERS)
+            .put("ROW_NUMBER", cdt -> new MergeRowNumber())
             .build();
 
-    private final Map<Key, OrderKeyResult> _orderByResults = new HashMap<>();
     private final boolean _isPartitionByOnly;
-    private final Key _emptyOrderKey;
+
+    // Fields needed only for RANGE frame type queries (ORDER BY)
+    private final Map<Key, OrderKeyResult> _orderByResults = new HashMap<>();
 
     WindowAggregateAccumulator(RexExpression.FunctionCall aggCall, Map<String,
         Function<DataSchema.ColumnDataType, AggregationUtils.Merger>> merger, 
String functionName,
         DataSchema inputSchema, OrderSetInfo orderSetInfo) {
       super(aggCall, merger, functionName, inputSchema);
       _isPartitionByOnly = CollectionUtils.isEmpty(orderSetInfo.getOrderSet()) 
|| orderSetInfo.isPartitionByOnly();
-      _emptyOrderKey = AggregationUtils.extractEmptyKey();
     }
 
-    public void accumulate(Key key, Key orderKey, Object[] row) {
+    /**
+     * For ROW type queries the aggregation function value depends on the 
order of the rows rather than on the actual
+     * keys. For such queries compute the current row value based on the 
previous row and previous partition key.
+     * This should only be called for ROW type queries.
+     */
+    public Object computeRowResultForCurrentRow(Key currentPartitionKey, Key 
previousPartitionKey, Object[] row,
+        Object previousRowOutputValue) {
+      Object value = _inputRef == -1 ? _literal : row[_inputRef];
+      if (previousPartitionKey == null || 
!currentPartitionKey.equals(previousPartitionKey)) {
+        return _merger.initialize(currentPartitionKey, _dataType);
+      } else {
+        return _merger.merge(previousRowOutputValue, value);
+      }
+    }
+
+    /**
+     * For RANGE type queries, accumulate the function values for each 
PARTITION BY key and ORDER BY key based on
+     * the current row. Should only be called for RANGE type queries where the 
aggregation values are tied to the
+     * RANGE key and not to the row ordering. This should only be called for 
RANGE type queries.
+     */
+    public void accumulateRangeResults(Key key, Key orderKey, Object[] row) {
       if (_isPartitionByOnly) {
         accumulate(key, row);
         return;
@@ -388,7 +477,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       }
     }
 
-    public Object getResultForKeys(Key key, Key orderKey) {
+    public Object getRangeResultForKeys(Key key, Key orderKey) {
       if (_isPartitionByOnly) {
         return _results.get(key);
       } else {
@@ -396,7 +485,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       }
     }
 
-    public Map<Key, OrderKeyResult> getOrderByResults() {
+    public Map<Key, OrderKeyResult> getRangeOrderByResults() {
       return _orderByResults;
     }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
index b8d5f58eb4..7b28eca581 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java
@@ -20,9 +20,12 @@ package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.pinot.common.utils.DataSchema;
@@ -43,6 +46,7 @@ import org.testng.annotations.Test;
 
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.LONG;
 import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.STRING;
 
 
@@ -385,12 +389,12 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = ".*Unexpected aggregation "
-      + "function name: ROW_NUMBER.*")
+      + "function name: RANK.*")
   public void testShouldThrowOnUnknownRankAggFunction() {
     // TODO: Remove this test when support is added for RANK functions
     // Given:
     List<RexExpression> calls = ImmutableList.of(
-        new RexExpression.FunctionCall(SqlKind.ROW_NUMBER, 
FieldSpec.DataType.INT, "ROW_NUMBER", ImmutableList.of()));
+        new RexExpression.FunctionCall(SqlKind.RANK, FieldSpec.DataType.INT, 
"RANK", ImmutableList.of()));
     List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
     DataSchema outSchema = new DataSchema(new String[]{"unknown"}, new 
DataSchema.ColumnDataType[]{DOUBLE});
     DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new 
DataSchema.ColumnDataType[]{DOUBLE});
@@ -402,6 +406,65 @@ public class WindowAggregateOperatorTest {
             WindowNode.WindowFrameType.RANGE, Collections.emptyList(), 
outSchema, inSchema);
   }
 
+  @Test
+  public void testRowNumberRankingFunction() {
+    // Given:
+    List<RexExpression> calls = ImmutableList.of(
+        new RexExpression.FunctionCall(SqlKind.ROW_NUMBER, 
FieldSpec.DataType.INT, "ROW_NUMBER", ImmutableList.of()));
+    List<RexExpression> group = ImmutableList.of(new 
RexExpression.InputRef(0));
+    List<RexExpression> order = ImmutableList.of(new 
RexExpression.InputRef(1));
+
+    DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new 
DataSchema.ColumnDataType[]{INT, STRING});
+    // Input should be in sorted order on the order by key as SortExchange 
will handle pre-sorting the data
+    Mockito.when(_input.nextBlock())
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, "and"}, 
new Object[]{2, "bar"},
+            new Object[]{2, "foo"}))
+        .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, "foo"}, 
new Object[]{2, "foo"},
+            new Object[]{2, "the"}, new Object[]{3, "true"}))
+        .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock());
+
+    DataSchema outSchema =
+        new DataSchema(new String[]{"group", "arg", "row_number"}, new 
DataSchema.ColumnDataType[]{INT, STRING, LONG});
+
+    // When:
+    WindowAggregateOperator operator =
+        new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), 
_input, group, order,
+            Collections.emptyList(), Collections.emptyList(), calls, 
Integer.MIN_VALUE, 0,
+            WindowNode.WindowFrameType.ROW, Collections.emptyList(), 
outSchema, inSchema);
+
+    TransferableBlock result = operator.getNextBlock();
+    while (result.isNoOpBlock()) {
+      result = operator.getNextBlock();
+    }
+    TransferableBlock eosBlock = operator.getNextBlock();
+    List<Object[]> resultRows = result.getContainer();
+    Map<Integer, List<Object[]>> expectedPartitionToRowsMap = new HashMap<>();
+    expectedPartitionToRowsMap.put(1, Collections.singletonList(new 
Object[]{1, "foo", 1L}));
+    expectedPartitionToRowsMap.put(2, Arrays.asList(new Object[]{2, "bar", 
1L}, new Object[]{2, "foo", 2L},
+        new Object[]{2, "foo", 3L}, new Object[]{2, "the", 4L}));
+    expectedPartitionToRowsMap.put(3, Arrays.asList(new Object[]{3, "and", 
1L}, new Object[]{3, "true", 2L}));
+
+    Integer previousPartitionKey = null;
+    Map<Integer, List<Object[]>> resultsPartitionToRowsMap = new HashMap<>();
+    for (Object[] row : resultRows) {
+      Integer currentPartitionKey = (Integer) row[0];
+      if (!currentPartitionKey.equals(previousPartitionKey)) {
+        
Assert.assertFalse(resultsPartitionToRowsMap.containsKey(currentPartitionKey));
+      }
+      resultsPartitionToRowsMap.computeIfAbsent(currentPartitionKey, k -> new 
ArrayList<>()).add(row);
+      previousPartitionKey = currentPartitionKey;
+    }
+
+    resultsPartitionToRowsMap.forEach((key, value) -> {
+      List<Object[]> expectedRows = expectedPartitionToRowsMap.get(key);
+      Assert.assertEquals(value.size(), expectedRows.size());
+      for (int i = 0; i < value.size(); i++) {
+        Assert.assertEquals(value.get(i), expectedRows.get(i));
+      }
+    });
+    Assert.assertTrue(eosBlock.isEndOfStreamBlock(), "Second block is EOS 
(done processing)");
+  }
+
   @Test
   public void testNonEmptyOrderByKeysNotMatchingPartitionByKeys() {
     // Given:
@@ -482,7 +545,7 @@ public class WindowAggregateOperatorTest {
   }
 
   @Test(expectedExceptions = IllegalStateException.class, 
expectedExceptionsMessageRegExp = "Only RANGE type frames "
-      + "are supported at present")
+      + "are supported at present.*")
   public void testShouldThrowOnCustomFramesRows() {
     // TODO: Remove this test once custom frame support is added
     // Given:
diff --git 
a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json 
b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json
index 08b19bb26a..638079d7f4 100644
--- a/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json
+++ b/pinot-query-runtime/src/test/resources/queries/WindowFunctions.json
@@ -51,6 +51,30 @@
           [768]
         ]
       },
+      {
+        "description": "Single empty OVER() row_number",
+        "sql": "SELECT ROW_NUMBER() OVER() FROM {tbl}",
+        "comments": "ROW_NUMBER() window function without any other columns in 
select or OVER or filter clauses fails as nothing is projected from the leaf",
+        "expectedException": "(?s)Received error query execution result 
block:.*",
+        "outputs": [
+          [1],
+          [2],
+          [3],
+          [4],
+          [5],
+          [6],
+          [7],
+          [8],
+          [9],
+          [10],
+          [11],
+          [12],
+          [13],
+          [14],
+          [15],
+          [16]
+        ]
+      },
       {
         "description": "Single OVER(ORDER BY) sum",
         "sql": "SELECT SUM(int_col) OVER(ORDER BY string_col) FROM {tbl}",
@@ -119,6 +143,29 @@
           ["h", 768, 150]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with select columns 
(two ORDER BY columns for deterministic output)",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(ORDER BY string_col, 
int_col), int_col FROM {tbl}",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          ["a", 1, 2],
+          ["a", 2, 2],
+          ["a", 3, 42],
+          ["a", 4, 42],
+          ["a", 5, 42],
+          ["b", 6, 3],
+          ["b", 7, 100],
+          ["c", 8, 2],
+          ["c", 9, 3],
+          ["c", 10, 101],
+          ["c", 11, 150],
+          ["d", 12, 42],
+          ["e", 13, 42],
+          ["e", 14, 42],
+          ["g", 15, 3],
+          ["h", 16, 150]
+        ]
+      },
       {
         "description": "Single OVER(ORDER BY) sum with one DESC column with 
select columns (two ORDER BY columns for deterministic output)",
         "sql": "SELECT string_col, SUM(int_col) OVER(ORDER BY string_col DESC, 
int_col), int_col FROM {tbl}",
@@ -165,6 +212,29 @@
           ["a", 768, 2]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with two DESC columns 
with select columns (two ORDER BY columns for deterministic output)",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(ORDER BY string_col DESC, 
int_col DESC), int_col FROM {tbl}",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          ["h", 1, 150],
+          ["g", 2, 3],
+          ["e", 3, 42],
+          ["e", 4, 42],
+          ["d", 5, 42],
+          ["c", 6, 150],
+          ["c", 7, 101],
+          ["c", 8, 3],
+          ["c", 9, 2],
+          ["b", 10, 100],
+          ["b", 11, 3],
+          ["a", 12, 42],
+          ["a", 13, 42],
+          ["a", 14, 42],
+          ["a", 15, 2],
+          ["a", 16, 2]
+        ]
+      },
       {
         "description": "Single OVER(ORDER BY) sum with second DESC column with 
select columns (two ORDER BY columns for deterministic output)",
         "sql": "SELECT string_col, SUM(int_col) OVER(ORDER BY string_col, 
int_col DESC), int_col FROM {tbl}",
@@ -326,6 +396,30 @@
           ["h", 106.69]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with select col with 
global order by",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(ORDER BY string_col) FROM 
{tbl} ORDER BY string_col",
+        "keepOutputRowOrder": false,
+        "comments": "Can't enable keeping the row order as the ordering is 
only based on the string_col and can change on sorting",
+        "outputs": [
+          ["a", 1],
+          ["a", 2],
+          ["a", 3],
+          ["a", 4],
+          ["a", 5],
+          ["b", 6],
+          ["b", 7],
+          ["c", 8],
+          ["c", 9],
+          ["c", 10],
+          ["c", 11],
+          ["d", 12],
+          ["e", 13],
+          ["e", 14],
+          ["g", 15],
+          ["h", 16]
+        ]
+      },
       {
         "description": "Single empty OVER() count with select col with global 
order by with LIMIT",
         "sql": "SELECT string_col, COUNT(int_col) OVER() FROM {tbl} ORDER BY 
string_col LIMIT 5",
@@ -405,6 +499,14 @@
           ["a", 1]
         ]
       },
+      {
+        "description": "Single empty OVER() row_number with select col and 
filter",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER() FROM {tbl} WHERE 
string_col = 'a' AND bool_col = false",
+        "comments": "Hard to test ROW_NUMBER() with empty OVER() since the 
assigned row number is purely dependent on order of processing the rows and is 
not deterministic",
+        "outputs": [
+          ["a", 1]
+        ]
+      },
       {
         "description": "Single OVER(ORDER BY) with select col and filter",
         "sql": "SELECT string_col, COUNT(bool_col) OVER(ORDER BY bool_col) 
FROM {tbl} WHERE string_col = 'a' AND bool_col = false",
@@ -477,6 +579,19 @@
           [400, 359]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with select col and 
filter (two ORDER BY columns for deterministic output)",
+        "sql": "SELECT double_col, ROW_NUMBER() OVER(ORDER BY string_col, 
double_col) FROM {tbl} WHERE string_col IN ('b', 'c')",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [1, 1],
+          [100, 2],
+          [1.01, 3],
+          [1.5, 4],
+          [100, 5],
+          [400, 6]
+        ]
+      },
       {
         "description": "Single empty OVER() with select transform and filter",
         "sql": "SELECT CONCAT(string_col, bool_col, '-'), MAX(int_col) OVER() 
FROM {tbl} where int_col < 50 OR double_col = 1.01",
@@ -516,6 +631,26 @@
           ["g-true", 101]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with select transform 
and filter (two ORDER BY columns for deterministic output)",
+        "sql": "SELECT CONCAT(string_col, bool_col, '-'), ROW_NUMBER() 
OVER(ORDER BY string_col, bool_col) FROM {tbl} where int_col < 50 OR double_col 
= 1.01",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          ["a-false", 1],
+          ["a-true", 2],
+          ["a-true", 3],
+          ["a-true", 4],
+          ["a-true", 5],
+          ["b-false", 6],
+          ["c-false", 7],
+          ["c-false", 8],
+          ["c-true", 9],
+          ["d-false", 10],
+          ["e-false", 11],
+          ["e-true", 12],
+          ["g-true", 13]
+        ]
+      },
       {
         "description": "Single empty OVER() with group by",
         "sql": "SELECT MAX({tbl}.int_col) OVER() FROM {tbl} GROUP BY int_col",
@@ -541,6 +676,19 @@
           [150]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with group by",
+        "sql": "SELECT ROW_NUMBER() OVER(ORDER BY {tbl}.int_col) FROM {tbl} 
GROUP BY int_col",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [1],
+          [2],
+          [3],
+          [4],
+          [5],
+          [6]
+        ]
+      },
       {
         "description": "Single empty OVER() with select col and group by",
         "sql": "SELECT string_col, MIN({tbl}.int_col) OVER() FROM {tbl} GROUP 
BY string_col, int_col",
@@ -603,6 +751,19 @@
           [300, 398]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with agg col and 
group by",
+        "sql": "SELECT SUM(int_col), ROW_NUMBER() OVER(ORDER BY {tbl}.int_col) 
FROM {tbl} GROUP BY int_col",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [6, 1],
+          [9, 2],
+          [252, 3],
+          [100, 4],
+          [101, 5],
+          [300, 6]
+        ]
+      },
       {
         "description": "Single empty OVER() with select col, agg col and group 
by",
         "sql": "SELECT int_col, SUM(int_col), SUM({tbl}.int_col) OVER() FROM 
{tbl} GROUP BY int_col",
@@ -654,6 +815,19 @@
           [150, 300, 398]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with select col, agg 
col and group by with global order by",
+        "sql": "SELECT int_col, SUM(int_col), ROW_NUMBER() OVER(ORDER BY 
{tbl}.int_col) FROM {tbl} GROUP BY int_col ORDER BY int_col",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [2, 6, 1],
+          [3, 9, 2],
+          [42, 252, 3],
+          [100, 100, 4],
+          [101, 101, 5],
+          [150, 300, 6]
+        ]
+      },
       {
         "description": "Single empty OVER() with select col, agg col and group 
by with a filter",
         "sql": "SELECT int_col, SUM(int_col), SUM({tbl}.int_col) OVER() FROM 
{tbl} WHERE int_col < 100 GROUP BY int_col",
@@ -673,6 +847,16 @@
           [42, 252, 47]
         ]
       },
+      {
+        "description": "Single OVER(ORDER BY) row_number with select col, agg 
col and group by with a filter",
+        "sql": "SELECT int_col, SUM(int_col), ROW_NUMBER() OVER(ORDER BY 
{tbl}.int_col) FROM {tbl} WHERE int_col < 100 GROUP BY int_col",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [2, 6, 1],
+          [3, 9, 2],
+          [42, 252, 3]
+        ]
+      },
       {
         "description": "Single empty OVER() with select col, agg col and group 
by with a filter that matches no rows",
         "sql": "SELECT int_col, SUM(int_col), SUM({tbl}.int_col) OVER() FROM 
{tbl} WHERE int_col > 200 GROUP BY int_col",
@@ -1063,6 +1247,14 @@
           ["a", 4, 50.5]
         ]
       },
+      {
+        "description": "Multiple empty OVER()s row_number with select col and 
filter",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(), ROW_NUMBER() OVER() 
FROM {tbl} WHERE string_col = 'a' AND bool_col = false",
+        "comments": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "outputs": [
+          ["a", 1, 1]
+        ]
+      },
       {
         "description": "Multiple OVER(ORDER BY)s with select col and filter",
         "sql": "SELECT string_col, COUNT(bool_col) OVER(ORDER BY string_col), 
MIN(double_col) OVER(ORDER BY string_col) FROM {tbl} WHERE string_col = 'a' AND 
bool_col != false",
@@ -1074,6 +1266,18 @@
           ["a", 4, 50.5]
         ]
       },
+      {
+        "description": "Multiple OVER(ORDER BY)s row_number with select col 
and filter",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(ORDER BY string_col), 
ROW_NUMBER() OVER(ORDER BY string_col) FROM {tbl} WHERE string_col = 'a' AND 
bool_col != false",
+        "comments": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          ["a", 1, 1],
+          ["a", 2, 2],
+          ["a", 3, 3],
+          ["a", 4, 4]
+        ]
+      },
       {
         "description": "Multiple empty OVER()s with select col and filter that 
matches no rows",
         "sql": "SELECT string_col, COUNT(bool_col) OVER(), MIN(double_col) 
OVER() FROM {tbl} WHERE string_col = 'a' AND bool_col != false AND int_col > 
200",
@@ -1355,6 +1559,24 @@
           [400, 800, 972.54, 97.254]
         ]
       },
+      {
+        "description": "Multiple OVER(ORDER BY)s row_number with select col, 
agg col and group by with global order by",
+        "sql": "SELECT double_col, SUM(double_col), ROW_NUMBER() OVER(ORDER BY 
{tbl}.double_col), ROW_NUMBER() OVER(ORDER BY {tbl}.double_col) FROM {tbl} 
GROUP BY double_col ORDER BY double_col",
+        "comments": "ROW_NUMBER requires ROWS as the default frame, and the 
default frame cannot be overridden, thus it cannot be combined with other 
functions yet",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [1, 1, 1, 1],
+          [1.01, 1.01, 2, 2],
+          [1.5, 1.5, 3, 3],
+          [1.53, 1.53, 4, 4],
+          [42, 126, 5, 5],
+          [50.5, 101, 6, 6],
+          [75, 75, 7, 7],
+          [100, 300, 8, 8],
+          [300, 300, 9, 9],
+          [400, 800, 10, 10]
+        ]
+      },
       {
         "description": "Multiple empty OVER()s with select col, agg col and 
group by with a filter",
         "sql": "SELECT double_col, SUM(double_col), SUM({tbl}.double_col) 
OVER(), AVG({tbl}.double_col) OVER() FROM {tbl} WHERE double_col > 100 GROUP BY 
double_col",
@@ -1496,6 +1718,28 @@
           [54]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY) row_number",
+        "sql": "SELECT ROW_NUMBER() OVER(PARTITION BY string_col) FROM {tbl}",
+        "outputs": [
+          [1],
+          [2],
+          [3],
+          [4],
+          [5],
+          [1],
+          [2],
+          [1],
+          [1],
+          [1],
+          [1],
+          [2],
+          [1],
+          [2],
+          [3],
+          [4]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2) sum",
         "sql": "SELECT SUM(int_col) OVER(PARTITION BY string_col ORDER BY 
int_col) FROM {tbl}",
@@ -1542,6 +1786,28 @@
           ["c", 54, 2]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY) row_number with select 
columns",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col) 
FROM {tbl}",
+        "outputs": [
+          ["a", 1],
+          ["a", 2],
+          ["a", 3],
+          ["a", 4],
+          ["a", 5],
+          ["b", 1],
+          ["b", 2],
+          ["e", 1],
+          ["e", 2],
+          ["d", 1],
+          ["h", 1],
+          ["g", 1],
+          ["c", 1],
+          ["c", 2],
+          ["c", 3],
+          ["c", 4]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2) sum with 
select columns",
         "sql": "SELECT string_col, SUM(int_col) OVER(PARTITION BY string_col 
ORDER BY int_col), int_col FROM {tbl}",
@@ -1566,6 +1832,30 @@
           ["h", 150, 150]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k2) row_number 
with select columns",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col 
ORDER BY int_col), int_col FROM {tbl}",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          ["a", 1, 2],
+          ["a", 2, 2],
+          ["a", 3, 42],
+          ["a", 4, 42],
+          ["a", 5, 42],
+          ["b", 1, 3],
+          ["b", 2, 100],
+          ["c", 1, -101],
+          ["c", 2, 2],
+          ["c", 3, 3],
+          ["c", 4, 150],
+          ["d", 1, 42],
+          ["e", 1, 42],
+          ["e", 2, 42],
+          ["g", 1, 3],
+          ["h", 1, 150]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2, k3) sum with 
select columns",
         "sql": "SELECT string_col, SUM(int_col) OVER(PARTITION BY string_col 
ORDER BY int_col, bool_col), int_col, bool_col FROM {tbl}",
@@ -1614,6 +1904,30 @@
           ["h", 150, 150, false]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k2, k3 DESC) 
row_number with select columns",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col 
ORDER BY int_col, bool_col DESC), int_col, bool_col FROM {tbl}",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          ["a", 1, 2, true],
+          ["a", 2, 2, true],
+          ["a", 3, 42, true],
+          ["a", 4, 42, true],
+          ["a", 5, 42, false],
+          ["b", 1, 3, false],
+          ["b", 2, 100, false],
+          ["c", 1, -101, false],
+          ["c", 2, 2, false],
+          ["c", 3, 3, true],
+          ["c", 4, 150, false],
+          ["d", 1, 42, false],
+          ["e", 1, 42, true],
+          ["e", 2, 42, false],
+          ["g", 1, 3, true],
+          ["h", 1, 150, false]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2 DESC, k3 DESC) 
sum with select columns",
         "sql": "SELECT string_col, SUM(int_col) OVER(PARTITION BY string_col 
ORDER BY int_col DESC, bool_col DESC), int_col, bool_col FROM {tbl}",
@@ -1706,6 +2020,28 @@
           ["c", 54, 2]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k1) row_number 
with select columns with order by DESC",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col 
ORDER BY string_col DESC) FROM {tbl}",
+        "outputs": [
+          ["a", 1],
+          ["a", 2],
+          ["a", 3],
+          ["a", 4],
+          ["a", 5],
+          ["b", 1],
+          ["b", 2],
+          ["e", 1],
+          ["e", 2],
+          ["d", 1],
+          ["h", 1],
+          ["g", 1],
+          ["c", 1],
+          ["c", 2],
+          ["c", 3],
+          ["c", 4]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) max with select columns with 
alias",
         "sql": "SELECT string_col AS str, MAX(double_col) OVER(PARTITION BY 
string_col) AS max, int_col FROM {tbl}",
@@ -1752,6 +2088,30 @@
           ["g", 100, 3]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k2) row_number 
with select columns with alias",
+        "sql": "SELECT string_col AS str, ROW_NUMBER() OVER(PARTITION BY 
string_col ORDER BY int_col) AS row_number, int_col FROM {tbl}",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          ["a", 1, 2],
+          ["a", 2, 2],
+          ["a", 3, 42],
+          ["a", 4, 42],
+          ["a", 5, 42],
+          ["b", 1, 3],
+          ["b", 2, 100],
+          ["e", 1, 42],
+          ["e", 2, 42],
+          ["d", 1, 42],
+          ["c", 1, -101],
+          ["c", 2, 2],
+          ["c", 3, 3],
+          ["c", 4, 150],
+          ["h", 1, 150],
+          ["g", 1, 3]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) min with select columns and 
default frame",
         "sql": "SELECT bool_col, MIN(int_col) OVER(PARTITION BY bool_col ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), double_col FROM {tbl}",
@@ -1822,6 +2182,29 @@
           ["h", -1.53]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY) row_number with select col 
with global order by",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col) 
as row_number FROM {tbl} ORDER BY string_col, row_number",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          ["a", 1],
+          ["a", 2],
+          ["a", 3],
+          ["a", 4],
+          ["a", 5],
+          ["b", 1],
+          ["b", 2],
+          ["c", 1],
+          ["c", 2],
+          ["c", 3],
+          ["c", 4],
+          ["d", 1],
+          ["e", 1],
+          ["e", 2],
+          ["g", 1],
+          ["h", 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2) avg with 
select col with global order by (use two global ORDER BY keys for deterministic 
ordering)",
         "sql": "SELECT string_col, AVG(double_col) OVER(PARTITION BY 
string_col ORDER BY int_col) FROM {tbl} ORDER BY string_col, int_col",
@@ -1891,6 +2274,29 @@
           ["h", false, -1.53]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY (2 keys) ORDER BY different 
key) row_number with select col with global order by (added int_col to global 
order by for deterministic results)",
+        "sql": "SELECT string_col, bool_col, ROW_NUMBER() OVER(PARTITION BY 
string_col, bool_col ORDER BY int_col) as row_number FROM {tbl} ORDER BY 
string_col, bool_col, int_col, row_number",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          ["a", false, 1],
+          ["a", true, 1],
+          ["a", true, 2],
+          ["a", true, 3],
+          ["a", true, 4],
+          ["b", false, 1],
+          ["b", false, 2],
+          ["c", false, 1],
+          ["c", false, 2],
+          ["c", false, 3],
+          ["c", true, 1],
+          ["d", false, 1],
+          ["e", false, 1],
+          ["e", true, 1],
+          ["g", true, 1],
+          ["h", false, 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) count with select col with 
global order by with LIMIT",
         "sql": "SELECT string_col, COUNT(int_col) OVER(PARTITION BY 
string_col) FROM {tbl} ORDER BY string_col LIMIT 6",
@@ -1966,6 +2372,30 @@
           ["g-true", 3]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k2) row_number 
and transform col",
+        "sql": "SELECT CONCAT(string_col, bool_col, '-'), ROW_NUMBER() 
OVER(PARTITION BY string_col ORDER BY bool_col) FROM {tbl}",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          ["a-false", 1],
+          ["a-true", 2],
+          ["a-true", 3],
+          ["a-true", 4],
+          ["a-true", 5],
+          ["b-false", 1],
+          ["b-false", 2],
+          ["c-false", 1],
+          ["c-false", 2],
+          ["c-false", 3],
+          ["c-true", 4],
+          ["d-false", 1],
+          ["e-false", 1],
+          ["e-true", 2],
+          ["h-false", 1],
+          ["g-true", 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) with select col and filter",
         "sql": "SELECT string_col, COUNT(bool_col) OVER(PARTITION BY 
string_col) FROM {tbl} WHERE string_col = 'a' AND bool_col = false",
@@ -1973,6 +2403,13 @@
           ["a", 1]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY) row_number with select col 
and filter",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col) 
FROM {tbl} WHERE string_col = 'a' AND bool_col = false",
+        "outputs": [
+          ["a", 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2) with select 
col and filter",
         "sql": "SELECT string_col, COUNT(bool_col) OVER(PARTITION BY 
string_col ORDER BY bool_col) FROM {tbl} WHERE string_col = 'a' AND bool_col = 
false",
@@ -2048,6 +2485,15 @@
           [400, 51]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k1) row_number 
with select col and filter",
+        "sql": "SELECT double_col, ROW_NUMBER() OVER(PARTITION BY bool_col, 
string_col ORDER BY bool_col, string_col) FROM {tbl} WHERE string_col IN ('b', 
'c') AND int_col < 100 AND int_col > 0",
+        "outputs": [
+          [100, 1],
+          [100, 1],
+          [400, 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k1) with select 
col and filter where ORDER BY is DESC",
         "sql": "SELECT double_col, SUM(int_col) OVER(PARTITION BY bool_col, 
string_col ORDER BY bool_col, string_col DESC) FROM {tbl} WHERE string_col IN 
('b', 'c')",
@@ -2115,6 +2561,28 @@
           ["g-true", 3]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER BY k2) row_number 
with select transform and filter",
+        "sql": "SELECT CONCAT(string_col, bool_col, '-'), ROW_NUMBER() 
OVER(PARTITION BY string_col, int_col ORDER BY bool_col) FROM {tbl} where 
int_col < 50 OR double_col = 1",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          ["a-true", 1],
+          ["a-true", 2],
+          ["a-false", 1],
+          ["a-true", 2],
+          ["a-true", 3],
+          ["b-false", 1],
+          ["b-false", 1],
+          ["c-false", 1],
+          ["c-false", 1],
+          ["c-true", 1],
+          ["d-false", 1],
+          ["e-false", 1],
+          ["e-true", 2],
+          ["g-true", 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) with group by",
         "sql": "SELECT MAX({tbl}.int_col) OVER(PARTITION BY {tbl}.string_col) 
FROM {tbl} GROUP BY string_col, int_col",
@@ -2171,6 +2639,24 @@
           ["h", 150]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY) row_number with select col 
and group by",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY 
{tbl}.string_col) FROM {tbl} GROUP BY string_col, int_col",
+        "outputs": [
+          ["a", 1],
+          ["a", 2],
+          ["b", 1],
+          ["b", 2],
+          ["c", 1],
+          ["c", 2],
+          ["c", 3],
+          ["c", 4],
+          ["d", 1],
+          ["e", 1],
+          ["g", 1],
+          ["h", 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2) with select 
col and group by",
         "sql": "SELECT string_col, MIN({tbl}.int_col) OVER(PARTITION BY 
{tbl}.string_col ORDER BY {tbl}.int_col) FROM {tbl} GROUP BY string_col, 
int_col",
@@ -2223,6 +2709,26 @@
           [150, 150]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER by k2) row_number 
with agg col and group by",
+        "sql": "SELECT SUM(int_col), ROW_NUMBER() OVER(PARTITION BY 
{tbl}.string_col ORDER BY {tbl}.int_col) FROM {tbl} GROUP BY string_col, 
int_col",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          [4, 1],
+          [126, 2],
+          [3, 1],
+          [100, 2],
+          [-101, 1],
+          [2, 2],
+          [3, 3],
+          [150, 4],
+          [42, 1],
+          [84, 1],
+          [3, 1],
+          [150, 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) with select col, agg col and 
group by",
         "sql": "SELECT int_col, SUM(int_col), SUM({tbl}.int_col) 
OVER(PARTITION BY {tbl}.int_col) FROM {tbl} GROUP BY int_col",
@@ -2287,6 +2793,25 @@
           [150, 150, 150]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY k1 ORDER by k2) row_number 
with select col, agg col and group by with global order by",
+        "sql": "SELECT int_col, SUM(int_col), ROW_NUMBER() OVER(PARTITION BY 
{tbl}.string_col ORDER BY {tbl}.int_col) FROM {tbl} GROUP BY string_col, 
int_col ORDER BY string_col, int_col",
+        "keepOutputRowOrder": true,
+        "outputs": [
+          [2, 4, 1],
+          [42, 126, 2],
+          [3, 3, 1],
+          [100, 100, 2],
+          [-101, -101, 1],
+          [2, 2, 2],
+          [3, 3, 3],
+          [150, 150, 4],
+          [42, 42, 1],
+          [42, 84, 1],
+          [3, 3, 1],
+          [150, 150, 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY) with select col, agg col and 
group by with a filter",
         "sql": "SELECT int_col, SUM(int_col), SUM({tbl}.int_col) 
OVER(PARTITION BY {tbl}.int_col) FROM {tbl} WHERE int_col >= 100 GROUP BY 
int_col",
@@ -2295,6 +2820,14 @@
           [150, 300, 150]
         ]
       },
+      {
+        "description": "Single OVER(PARTITION BY) row_number with select col, 
agg col and group by with a filter",
+        "sql": "SELECT int_col, SUM(int_col), ROW_NUMBER() OVER(PARTITION BY 
{tbl}.int_col) FROM {tbl} WHERE int_col >= 100 GROUP BY int_col",
+        "outputs": [
+          [100, 100, 1],
+          [150, 300, 1]
+        ]
+      },
       {
         "description": "Single OVER(PARTITION BY k1 ORDER BY k2) with select 
col, agg col and group by with a filter",
         "sql": "SELECT int_col, SUM(int_col), SUM({tbl}.int_col) 
OVER(PARTITION BY {tbl}.int_col ORDER BY {tbl}.string_col) FROM {tbl} WHERE 
int_col >= 100 GROUP BY string_col, int_col",
@@ -2406,6 +2939,30 @@
           ["h", 150, 150, -1.53]
         ]
       },
+      {
+        "description": "Multiple OVER(PARTITION BY k1 ORDER BY k2)s row_number 
with select columns",
+        "sql": "SELECT string_col, ROW_NUMBER() OVER(PARTITION BY string_col 
ORDER BY int_col), int_col, ROW_NUMBER() OVER(PARTITION BY string_col ORDER BY 
int_col) FROM {tbl}",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          ["a", 1, 2, 1],
+          ["a", 2, 2, 2],
+          ["a", 3, 42, 3],
+          ["a", 4, 42, 4],
+          ["a", 5, 42, 5],
+          ["b", 1, 3, 1],
+          ["b", 2, 100, 2],
+          ["c", 1, -101, 1],
+          ["c", 2, 2, 2],
+          ["c", 3, 3, 3],
+          ["c", 4, 150, 4],
+          ["d", 1, 42, 1],
+          ["e", 1, 42, 1],
+          ["e", 2, 42, 2],
+          ["g", 1, 3, 1],
+          ["h", 1, 150, 1]
+        ]
+      },
       {
         "description": "Multiple OVER(PARTITION BY k1 ORDER BY k2, k3)s with 
select columns",
         "sql": "SELECT string_col, SUM(int_col) OVER(PARTITION BY string_col 
ORDER BY int_col, bool_col), int_col, MAX(double_col) OVER(PARTITION BY 
string_col ORDER BY int_col, bool_col) FROM {tbl}",
@@ -3128,6 +3685,57 @@
         "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
         "keepOutputRowOrder": false,
         "outputs": []
+      },
+      {
+        "description": "Subquery with ROW_NUMBER window function to get all 
values with ROW_NUMBER < value",
+        "sql": "SELECT row_number, string_col, int_col FROM (SELECT 
ROW_NUMBER() OVER(PARTITION BY string_col ORDER BY int_col) AS row_number, 
string_col, int_col from {tbl}) WHERE row_number <= 2",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          [1, "a", 2],
+          [2, "a", 2],
+          [1, "b", 3],
+          [2, "b", 100],
+          [1, "c", -101],
+          [2, "c", 2],
+          [1, "d", 42],
+          [1, "e", 42],
+          [2, "e", 42],
+          [1, "g", 3],
+          [1, "h", 150]
+        ]
+      },
+      {
+        "description": "Subquery with ROW_NUMBER window function to get all 
values with ROW_NUMBER < value where ORDER BY is DESC",
+        "sql": "SELECT row_number, string_col, int_col FROM (SELECT 
ROW_NUMBER() OVER(PARTITION BY string_col ORDER BY int_col DESC) AS row_number, 
string_col, int_col from {tbl}) WHERE row_number <= 2",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          [1, "a", 42],
+          [2, "a", 42],
+          [1, "b", 100],
+          [2, "b", 3],
+          [1, "c", 150],
+          [2, "c", 3],
+          [1, "d", 42],
+          [1, "e", 42],
+          [2, "e", 42],
+          [1, "g", 3],
+          [1, "h", 150]
+        ]
+      },
+      {
+        "description": "CTE with ROW_NUMBER window function to get all values 
with ROW_NUMBER < value",
+        "sql": "WITH windowfunc AS (SELECT ROW_NUMBER() OVER(PARTITION BY 
string_col ORDER BY int_col) AS row_number, string_col, int_col from {tbl}) 
SELECT row_number, string_col, int_col FROM windowfunc WHERE row_number > 2",
+        "comments": "Cannot enforce a global ordering as partitions aren't 
ordered, just keys within a partition are",
+        "keepOutputRowOrder": false,
+        "outputs": [
+          [3, "a", 42],
+          [4, "a", 42],
+          [5, "a", 42],
+          [3, "c", 3],
+          [4, "c", 150]
+        ]
       }
     ]
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to