This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c6d5b11b7c [multistage] Push Collation from Exchange to Lite Mode 
Sort (#16551)
3c6d5b11b7c is described below

commit 3c6d5b11b7cf8aefb0e6251bb672b3325148e311
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Aug 11 15:05:11 2025 -0500

    [multistage] Push Collation from Exchange to Lite Mode Sort (#16551)
---
 .../physical/v2/opt/rules/LiteModeSortInsertRule.java     | 15 ++++++++++++++-
 .../test/resources/queries/PhysicalOptimizerPlans.json    | 15 +++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
index b4d102a45dd..f356abcbb67 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.physical.v2.opt.rules;
 import com.google.common.base.Preconditions;
 import java.util.List;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
@@ -29,6 +30,7 @@ import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.planner.logical.RexExpressionUtils;
 import org.apache.pinot.query.planner.physical.v2.PRelNode;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
 import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule;
 import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall;
@@ -87,9 +89,20 @@ public class LiteModeSortInsertRule extends PRelOptRule {
       int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : 
serverStageLimit;
       return aggregate.withLimit(limit);
     }
+    RelCollation relCollation = RelCollations.EMPTY;
+    if (!call._parents.isEmpty()) {
+      // Pass collation from the Exchange above if it exists.
+      PRelNode parent = call._parents.getLast();
+      if (parent.unwrap() instanceof PhysicalExchange) {
+        PhysicalExchange physicalExchange = (PhysicalExchange) parent.unwrap();
+        if (physicalExchange.getRelCollation() != null) {
+          relCollation = physicalExchange.getRelCollation();
+        }
+      }
+    }
     PRelNode input = call._currentNode;
     return new PhysicalSort(input.unwrap().getCluster(), 
RelTraitSet.createEmpty(), List.of(),
-        RelCollations.EMPTY, null /* offset */, newFetch, input, nodeId(), 
input.getPinotDataDistributionOrThrow(),
+        relCollation, null /* offset */, newFetch, input, nodeId(), 
input.getPinotDataDistributionOrThrow(),
         true);
   }
 
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index 106d0e7f9a9..e716a543bb0 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -805,6 +805,21 @@
           "\n                PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
+      },
+      {
+        "description": "Tests collation push down for Lite Mode. Collation 
from the exchange above should get pushed down to the sort below.",
+        "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR WITH tmp AS (SELECT col1, ROW_NUMBER() OVER (ORDER BY ts DESC) as 
row_num FROM a) SELECT COUNT(*) FROM tmp WHERE row_num <= 100",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalAggregate(group=[{}], agg#0=[COUNT()], aggType=[DIRECT])",
+          "\n  PhysicalFilter(condition=[<=($1, 100)])",
+          "\n    PhysicalWindow(window#0=[window(order by [0 DESC] rows 
between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
collation=[[0 DESC]])",
+          "\n        PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[100000])",
+          "\n          PhysicalProject(ts=[$7])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
       }
     ]
   },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to