This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2de0888e96 [multistage] Bug Fixes and Improvements to Physical 
Optimizer (#15813)
2de0888e96 is described below

commit 2de0888e96e8d8ae0b322cfdc7ca7ce466b03bce
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Fri May 16 17:29:37 2025 -0500

    [multistage] Bug Fixes and Improvements to Physical Optimizer (#15813)
---
 .../planner/physical/v2/ExchangeStrategy.java      |  66 ++++++-
 .../v2/PlanFragmentAndMailboxAssignment.java       |  25 +--
 .../planner/physical/v2/RelToPRelConverter.java    |   4 +-
 .../physical/v2/nodes/PhysicalAggregate.java       |  15 +-
 .../physical/v2/nodes/PhysicalExchange.java        |  29 +--
 .../planner/physical/v2/ExchangeStrategyTest.java  |  69 +++++++
 .../resources/queries/PhysicalOptimizerPlans.json  | 205 +++++++++++++--------
 7 files changed, 281 insertions(+), 132 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
index 82441f47aa..43d7974750 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java
@@ -18,6 +18,11 @@
  */
 package org.apache.pinot.query.planner.physical.v2;
 
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.rel.RelDistributions;
+
+
 /**
  * Defines how data is transferred across an Exchange.
  */
@@ -25,17 +30,17 @@ public enum ExchangeStrategy {
   /**
    * There's a single stream in the receiver, so each stream in the sender 
sends data to the same.
    */
-  SINGLETON_EXCHANGE,
+  SINGLETON_EXCHANGE(false),
   /**
    * stream-ID X sends data to stream-ID X. This cannot be modeled by 
PARTITIONING_EXCHANGE because the fan-out for
    * this type of exchange is 1:1.
    */
-  IDENTITY_EXCHANGE,
+  IDENTITY_EXCHANGE(false),
   /**
    * Each stream will partition the outgoing stream based on a set of keys and 
a hash function.
    * Fanout for this type of exchange is 1:all.
    */
-  PARTITIONING_EXCHANGE,
+  PARTITIONING_EXCHANGE(true),
   /**
    * stream-ID X will sub-partition: i.e. divide the stream so that the data 
is sent to the streams
    * {@code X, X + F, X + 2*F, ...}. Here F is the sub-partitioning factor. 
Records are assigned based on a
@@ -43,25 +48,70 @@ public enum ExchangeStrategy {
    * partition counts divides the other.
    * <b>Note:</b> This is different and better than partitioning exchange 
because the fanout is F, and not N * (N*F).
    */
-  SUB_PARTITIONING_HASH_EXCHANGE,
+  SUB_PARTITIONING_HASH_EXCHANGE(true),
   /**
    * Same as above but records are sub-partitioned in a round-robin way. This 
will increase parallelism but lose
    * data partitioning.
    */
-  SUB_PARTITIONING_RR_EXCHANGE,
+  SUB_PARTITIONING_RR_EXCHANGE(false),
   /**
    * Similar to sub-partitioning, except it does the inverse and merges 
partitions. Partitions are merged in a way
    * that we still preserve partitions, but only change the partition count. 
i.e. if current partition count is 16,
    * and we want 8 partitions, then partition-0 in the receiver will receive 
data from partition-0 and partition-8
    * in the sender.
    */
-  COALESCING_PARTITIONING_EXCHANGE,
+  COALESCING_PARTITIONING_EXCHANGE(true),
   /**
    * Each stream will send data to all receiving streams.
    */
-  BROADCAST_EXCHANGE,
+  BROADCAST_EXCHANGE(false),
   /**
    * Records are sent randomly from a given worker in the sender to some 
worker in the receiver.
    */
-  RANDOM_EXCHANGE
+  RANDOM_EXCHANGE(false);
+
+  /**
+   * This is true when the Exchange Strategy is such that it requires a 
List&lt;Integer&gt; representing the
+   * distribution keys. The list must be non-empty.
+   */
+  private final boolean _requireKeys;
+
+  /**
+   * See {@link #_requireKeys}.
+   */
+  public boolean isRequireKeys() {
+    return _requireKeys;
+  }
+
+  ExchangeStrategy(boolean requireKeys) {
+    _requireKeys = requireKeys;
+  }
+
+  public static RelDistribution getRelDistribution(ExchangeStrategy 
exchangeStrategy, List<Integer> keys) {
+    if (exchangeStrategy.isRequireKeys() && keys.isEmpty()) {
+      throw new IllegalStateException(String.format("ExchangeStrategy=%s 
requires distribution keys, but none found",
+          exchangeStrategy));
+    } else if (!exchangeStrategy.isRequireKeys() && !keys.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "ExchangeStrategy=%s does not require distribution keys but found 
%s", exchangeStrategy, keys));
+    }
+    switch (exchangeStrategy) {
+      case PARTITIONING_EXCHANGE:
+      case SUB_PARTITIONING_HASH_EXCHANGE:
+      case COALESCING_PARTITIONING_EXCHANGE:
+        return RelDistributions.hash(keys);
+      case IDENTITY_EXCHANGE:
+        return RelDistributions.hash(List.of());
+      case BROADCAST_EXCHANGE:
+        return RelDistributions.BROADCAST_DISTRIBUTED;
+      case SINGLETON_EXCHANGE:
+        return RelDistributions.SINGLETON;
+      case SUB_PARTITIONING_RR_EXCHANGE:
+        return RelDistributions.ROUND_ROBIN_DISTRIBUTED;
+      case RANDOM_EXCHANGE:
+        return RelDistributions.RANDOM_DISTRIBUTED;
+      default:
+        throw new IllegalStateException(String.format("Unexpected exchange 
strategy: %s", exchangeStrategy));
+    }
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index a7733b1a28..88761f0bcf 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -116,7 +116,8 @@ public class PlanFragmentAndMailboxAssignment {
       int senderFragmentId = context._planFragmentMap.size();
       final DataSchema inputFragmentSchema = 
PRelToPlanNodeConverter.toDataSchema(
           pRelNode.getPRelInput(0).unwrap().getRowType());
-      RelDistribution.Type distributionType = 
inferDistributionType(physicalExchange.getExchangeStrategy());
+      RelDistribution.Type distributionType = 
ExchangeStrategy.getRelDistribution(
+          physicalExchange.getExchangeStrategy(), 
physicalExchange.getDistributionKeys()).getType();
       List<PlanNode> inputs = new ArrayList<>();
       MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, 
inputFragmentSchema, inputs, currentFragmentId,
           PinotRelExchangeType.getDefaultExchangeType(), distributionType, 
physicalExchange.getDistributionKeys(),
@@ -156,27 +157,6 @@ public class PlanFragmentAndMailboxAssignment {
     return fragment;
   }
 
-  private RelDistribution.Type inferDistributionType(ExchangeStrategy desc) {
-    RelDistribution.Type distributionType;
-    switch (desc) {
-      case PARTITIONING_EXCHANGE:
-        distributionType = RelDistribution.Type.HASH_DISTRIBUTED;
-        break;
-      case IDENTITY_EXCHANGE:
-        distributionType = RelDistribution.Type.HASH_DISTRIBUTED;
-        break;
-      case SINGLETON_EXCHANGE:
-        distributionType = RelDistribution.Type.SINGLETON;
-        break;
-      case BROADCAST_EXCHANGE:
-        distributionType = RelDistribution.Type.BROADCAST_DISTRIBUTED;
-        break;
-      default:
-        throw new IllegalStateException("");
-    }
-    return distributionType;
-  }
-
   private Map<Integer, QueryServerInstance> createWorkerMap(List<String> 
workers, Context context) {
     Map<Integer, QueryServerInstance> mp = new HashMap<>();
     for (String instance : workers) {
@@ -225,6 +205,7 @@ public class PlanFragmentAndMailboxAssignment {
             .put(senderStageId, new 
SharedMailboxInfos(mailboxInfoListForReceiver));
         break;
       }
+      case RANDOM_EXCHANGE:
       case PARTITIONING_EXCHANGE:
       case BROADCAST_EXCHANGE: {
         MailboxInfos mailboxInfoListForSender = new 
MailboxInfos(createMailboxInfo(receiverWorkers));
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
index 67a24c75fe..f3cd36452e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java
@@ -94,9 +94,11 @@ public class RelToPRelConverter {
     } else if (relNode instanceof PinotLogicalAggregate) {
       Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input 
of agg. Found: %s", inputs);
       PinotLogicalAggregate aggRel = (PinotLogicalAggregate) relNode;
+      // Use AggType.DIRECT here because at this point aggregation split 
hasn't happened yet.
+      AggregateNode.AggType aggType = AggregateNode.AggType.DIRECT;
       return new PhysicalAggregate(aggRel.getCluster(), aggRel.getTraitSet(), 
aggRel.getHints(), aggRel.getGroupSet(),
           aggRel.getGroupSets(), aggRel.getAggCallList(), 
nodeIdGenerator.get(), inputs.get(0), null, false,
-          AggregateNode.AggType.DIRECT, false, List.of(), 0);
+          aggType, aggRel.isLeafReturnFinalResult(), aggRel.getCollations(), 
aggRel.getLimit());
     } else if (relNode instanceof Join) {
       Preconditions.checkState(relNode.getInputs().size() == 2, "Expected 
exactly 2 inputs to join. Found: %s", inputs);
       Join join = (Join) relNode;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
index 1306c8d794..149795eb46 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java
@@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.RelHint;
@@ -51,7 +52,7 @@ public class PhysicalAggregate extends Aggregate implements 
PRelNode {
   public PhysicalAggregate(RelOptCluster cluster, RelTraitSet traitSet, 
List<RelHint> hints,
       ImmutableBitSet groupSet, @Nullable List<ImmutableBitSet> groupSets, 
List<AggregateCall> aggCalls,
       int nodeId, PRelNode pRelInput, @Nullable PinotDataDistribution 
pinotDataDistribution, boolean leafStage,
-      AggregateNode.AggType aggType, boolean leafReturnFinalResult, 
List<RelFieldCollation> collations,
+      AggregateNode.AggType aggType, boolean leafReturnFinalResult, @Nullable 
List<RelFieldCollation> collations,
       int limit) {
     super(cluster, traitSet, hints, pRelInput.unwrap(), groupSet, groupSets, 
aggCalls);
     _nodeId = nodeId;
@@ -60,7 +61,7 @@ public class PhysicalAggregate extends Aggregate implements 
PRelNode {
     _leafStage = leafStage;
     _aggType = aggType;
     _leafReturnFinalResult = leafReturnFinalResult;
-    _collations = collations;
+    _collations = collations == null ? List.of() : collations;
     _limit = limit;
   }
 
@@ -114,6 +115,16 @@ public class PhysicalAggregate extends Aggregate 
implements PRelNode {
         _collations, _limit);
   }
 
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    RelWriter relWriter = super.explainTerms(pw);
+    relWriter.item("aggType", _aggType);
+    relWriter.itemIf("leafReturnFinalResult", true, _leafReturnFinalResult);
+    relWriter.itemIf("collations", _collations, !_collations.isEmpty());
+    relWriter.itemIf("limit", _limit, _limit > 0);
+    return relWriter;
+  }
+
   public AggregateNode.AggType getAggType() {
     return _aggType;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
index 66846faff7..b00d786f00 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java
@@ -26,10 +26,10 @@ import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Exchange;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
 import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTraitDef;
@@ -81,7 +81,7 @@ public class PhysicalExchange extends Exchange implements 
PRelNode {
       List<Integer> distributionKeys, ExchangeStrategy exchangeStrategy, 
@Nullable RelCollation relCollation,
       PinotExecStrategyTrait execStrategyTrait) {
     super(input.unwrap().getCluster(), 
EMPTY_TRAIT_SET.plus(execStrategyTrait), input.unwrap(),
-        getRelDistribution(exchangeStrategy, distributionKeys));
+        ExchangeStrategy.getRelDistribution(exchangeStrategy, 
distributionKeys));
     _nodeId = nodeId;
     _pRelInputs = Collections.singletonList(input);
     _pinotDataDistribution = pinotDataDistribution;
@@ -161,26 +161,9 @@ public class PhysicalExchange extends Exchange implements 
PRelNode {
   @Override public RelWriter explainTerms(RelWriter pw) {
     return pw.item("input", input)
         .item("exchangeStrategy", _exchangeStrategy)
-        .item("distKeys", _distributionKeys)
-        .item("execStrategy", getRelExchangeType())
-        .item("collation", _relCollation);
-  }
-
-  private static RelDistribution getRelDistribution(ExchangeStrategy 
exchangeStrategy, List<Integer> keys) {
-    switch (exchangeStrategy) {
-      case IDENTITY_EXCHANGE:
-      case PARTITIONING_EXCHANGE:
-      case SUB_PARTITIONING_HASH_EXCHANGE:
-      case COALESCING_PARTITIONING_EXCHANGE:
-        return RelDistributions.hash(keys);
-      case BROADCAST_EXCHANGE:
-        return RelDistributions.BROADCAST_DISTRIBUTED;
-      case SINGLETON_EXCHANGE:
-        return RelDistributions.SINGLETON;
-      case SUB_PARTITIONING_RR_EXCHANGE:
-        return RelDistributions.ROUND_ROBIN_DISTRIBUTED;
-      default:
-        throw new IllegalStateException(String.format("Unexpected exchange 
strategy: %s", exchangeStrategy));
-    }
+        .itemIf("distKeys", _distributionKeys, 
CollectionUtils.isNotEmpty(_distributionKeys))
+        .itemIf("execStrategy", getRelExchangeType(),
+            getRelExchangeType() != 
PinotRelExchangeType.getDefaultExchangeType())
+        .itemIf("collation", _relCollation, 
CollectionUtils.isNotEmpty(_relCollation.getFieldCollations()));
   }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java
new file mode 100644
index 0000000000..b0bc6ea11c
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+import java.util.List;
+import org.apache.calcite.rel.RelDistribution;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ExchangeStrategyTest {
+  @Test
+  public void testGetRelDistribution() {
+    // Ensure each exchange strategy can be mapped to a RelDistribution.
+    for (ExchangeStrategy exchangeStrategy : ExchangeStrategy.values()) {
+      List<Integer> keys = exchangeStrategy.isRequireKeys() ? List.of(1) : 
List.of();
+      assertNotNull(ExchangeStrategy.getRelDistribution(exchangeStrategy, 
keys));
+    }
+    // Manual check for each exchange strategy.
+    // Start with hash types
+    List<ExchangeStrategy> hashExchangeStrategies = 
List.of(ExchangeStrategy.PARTITIONING_EXCHANGE,
+        ExchangeStrategy.SUB_PARTITIONING_HASH_EXCHANGE, 
ExchangeStrategy.COALESCING_PARTITIONING_EXCHANGE);
+    for (ExchangeStrategy exchangeStrategy : hashExchangeStrategies) {
+      assertEquals(ExchangeStrategy.getRelDistribution(exchangeStrategy, 
List.of(1)).getType(),
+          RelDistribution.Type.HASH_DISTRIBUTED);
+    }
+    // Singleton exchange
+    
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.SINGLETON_EXCHANGE,
 List.of()).getType(),
+        RelDistribution.Type.SINGLETON);
+    // Random exchange
+    
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.RANDOM_EXCHANGE,
 List.of()).getType(),
+        RelDistribution.Type.RANDOM_DISTRIBUTED);
+    // Broadcast exchange
+    
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.BROADCAST_EXCHANGE,
 List.of()).getType(),
+        RelDistribution.Type.BROADCAST_DISTRIBUTED);
+    // Sub-partitioning round robin exchange
+    
assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.SUB_PARTITIONING_RR_EXCHANGE,
+        List.of()).getType(), RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED);
+  }
+
+  @Test
+  public void testGetRelDistributionInvalid() {
+    // Test case when empty keys list but strategy requires keys
+    assertThrows(IllegalStateException.class, () -> {
+      
ExchangeStrategy.getRelDistribution(ExchangeStrategy.PARTITIONING_EXCHANGE, 
List.of());
+    });
+    // Test case when non-empty keys list but strategy does not accept keys
+    assertThrows(IllegalStateException.class, () -> {
+      ExchangeStrategy.getRelDistribution(ExchangeStrategy.IDENTITY_EXCHANGE, 
List.of(1));
+    });
+  }
+}
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index af9048bec3..acab86c39c 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -7,13 +7,13 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$0], dir0=[ASC])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalProject(col1=[$0], ts=[$1], col3=[$3])",
           "\n      PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n          PhysicalProject(col1=[$0], ts=[$7])",
           "\n            PhysicalTableScan(table=[[default, a]])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n          PhysicalProject(col2=[$1], col3=[$2])",
           "\n            PhysicalTableScan(table=[[default, b]])",
           "\n"
@@ -25,13 +25,13 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$0], dir0=[ASC])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])",
           "\n      PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n          PhysicalProject(col1=[$0], ts=[$7])",
           "\n            PhysicalTableScan(table=[[default, a]])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n          PhysicalProject(col2=[$1], col3=[$2])",
           "\n            PhysicalTableScan(table=[[default, b]])",
           "\n"
@@ -43,9 +43,9 @@
         "output": [
           "Execution Plan",
           "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n    PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]])",
           "\n    PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
@@ -56,10 +56,10 @@
         "output": [
           "Execution Plan",
           "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n    PhysicalFilter(condition=[>=($2, 0)])",
           "\n      PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]])",
           "\n    PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
@@ -70,10 +70,10 @@
         "output": [
           "Execution Plan",
           "\nPhysicalJoin(condition=[AND(=($0, $10), >($2, $11))], 
joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n    PhysicalFilter(condition=[>=($2, 0)])",
           "\n      PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])",
           "\n    PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
@@ -84,9 +84,9 @@
         "output": [
           "Execution Plan",
           "\nPhysicalJoin(condition=[AND(=($0, $9), =($1, $10))], 
joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]])",
           "\n    PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]])",
           "\n    PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
@@ -102,10 +102,10 @@
           "Execution Plan",
           "\nPhysicalProject(col1=[$0], col2=[$1])",
           "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[2]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[2]])",
           "\n      PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        PhysicalTableScan(table=[[default, a]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n      PhysicalProject(col3=[$2])",
           "\n        PhysicalTableScan(table=[[default, b]])",
           "\n"
@@ -118,10 +118,10 @@
           "Execution Plan",
           "\nPhysicalProject(col1=[$0], col2=[$1])",
           "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n      PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n        PhysicalTableScan(table=[[default, b]])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n      PhysicalProject(col3=[$2])",
           "\n        PhysicalTableScan(table=[[default, b]])",
           "\n"
@@ -136,18 +136,18 @@
           "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
           "\n    PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
           "\n      PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
           "\n          PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n            PhysicalTableScan(table=[[default, a]])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n          PhysicalProject(col3=[$2])",
           "\n            PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
           "\n              PhysicalTableScan(table=[[default, b]])",
-          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n        PhysicalProject(col3=[$2])",
           "\n          PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
           "\n            PhysicalTableScan(table=[[default, b]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n      PhysicalProject(col3=[$2])",
           "\n        PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
           "\n          PhysicalTableScan(table=[[default, b]])",
@@ -166,19 +166,19 @@
           "\n        PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
           "\n          PhysicalProject(col1=[$0], col2=[$1], col3=[$2], 
col31=[$2])",
           "\n            PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
           "\n                PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n                  PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n                PhysicalProject(col3=[$2])",
           "\n                  PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
           "\n                    PhysicalTableScan(table=[[default, b]])",
-          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
-          "\n            PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
           "\n              PhysicalProject(col3=[$2], $f1=[true])",
           "\n                PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
           "\n                  PhysicalTableScan(table=[[default, b]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n      PhysicalProject(col3=[$2])",
           "\n        PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
           "\n          PhysicalTableScan(table=[[default, b]])",
@@ -197,20 +197,20 @@
           "\n        PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
           "\n          PhysicalProject(col1=[$0], col2=[$1], col3=[$2], 
col31=[$2])",
           "\n            PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
           "\n                PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
           "\n                  PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n                PhysicalProject(col3=[$2])",
           "\n                  PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
           "\n                    PhysicalTableScan(table=[[default, b]])",
-          "\n          PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
-          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], 
execStrategy=[STREAMING], collation=[[]])",
-          "\n              PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+          "\n          PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[FINAL])",
+          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n              PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[LEAF])",
           "\n                PhysicalProject(col3=[$2], $f1=[true])",
           "\n                  PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
           "\n                    PhysicalTableScan(table=[[default, a]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n      PhysicalProject(col3=[$2])",
           "\n        PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
           "\n          PhysicalTableScan(table=[[default, b]])",
@@ -228,14 +228,14 @@
           "Execution Plan",
           "\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
           "\n  PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n      PhysicalProject(col1=[$0], col2=[$1])",
           "\n        PhysicalTableScan(table=[[default, a]])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n      PhysicalProject(col2=[$1])",
           "\n        PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER 
NOT NULL)])",
           "\n          PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n    PhysicalProject(col2=[$1])",
           "\n      PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER 
NOT NULL)])",
           "\n        PhysicalTableScan(table=[[default, a]])",
@@ -253,19 +253,19 @@
           "\n      PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
           "\n        PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
           "\n          PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
-          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n              PhysicalProject(col1=[$0], col2=[$1])",
           "\n                PhysicalTableScan(table=[[default, a]])",
-          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n              PhysicalProject(col2=[$1])",
           "\n                PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
           "\n                  PhysicalTableScan(table=[[default, a]])",
-          "\n        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
-          "\n          PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+          "\n        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n          PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
           "\n            PhysicalProject(col2=[$1], $f1=[true])",
           "\n              PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
           "\n                PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n    PhysicalProject(col2=[$1])",
           "\n      PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER 
NOT NULL)])",
           "\n        PhysicalTableScan(table=[[default, a]])",
@@ -277,28 +277,28 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
COUNT(*) FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 
NOT IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a 
WHERE col3 = 'lorem') GROUP BY col1",
         "output": [
           "Execution Plan",
-          "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
-          "\n    PhysicalAggregate(group=[{0}], agg#0=[COUNT()])",
+          "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
+          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n    PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[LEAF])",
           "\n      PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
           "\n        PhysicalProject(col1=[$0], col2=[$1])",
           "\n          PhysicalFilter(condition=[IS NOT TRUE($4)])",
           "\n            PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
           "\n              PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
           "\n                PhysicalJoin(condition=[=($1, $2)], 
joinType=[semi])",
-          "\n                  
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n                  
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n                    PhysicalProject(col1=[$0], col2=[$1])",
           "\n                      PhysicalTableScan(table=[[default, a]])",
-          "\n                  
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n                  
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n                    PhysicalProject(col2=[$1])",
           "\n                      PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
           "\n                        PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
-          "\n                PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])",
+          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
           "\n                  PhysicalProject(col2=[$1], $f1=[true])",
           "\n                    PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
           "\n                      PhysicalTableScan(table=[[default, a]])",
-          "\n        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n          PhysicalProject(col2=[$1])",
           "\n            PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'lorem'):INTEGER NOT NULL)])",
           "\n              PhysicalTableScan(table=[[default, a]])",
@@ -307,6 +307,59 @@
       }
     ]
   },
+  "physical_opt_group_trim_enabled": {
+    "queries": [
+      {
+        "description": "SQL hint based group by optimization with partitioned 
aggregated values and group trim enabled",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_leaf_return_final_result='true', is_enable_group_trim='true') */ 
col1, COUNT(DISTINCT col2) AS cnt FROM a WHERE col3 >= 0 GROUP BY col1 ORDER BY 
cnt DESC LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n    PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
+          "\n      PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], 
aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n          PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true], 
collations=[[1 DESC]], limit=[10])",
+          "\n            PhysicalFilter(condition=[>=($2, 0)])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SQL hint based group by optimization with group trim 
enabled without returning group key",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a 
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n    PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
+          "\n      PhysicalProject(cnt=[$1])",
+          "\n        PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], 
aggType=[FINAL], limit=[10])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])",
+          "\n              PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      },
+      {
+        "description": "SQL hint based distinct optimization with group trim 
enabled",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE 
col3 >= 0 LIMIT 10",
+        "output": [
+          "Execution Plan",
+          "\nPhysicalSort(fetch=[10])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n    PhysicalSort(fetch=[10])",
+          "\n      PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], 
limit=[10])",
+          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+          "\n          PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], 
limit=[10])",
+          "\n            PhysicalFilter(condition=[>=($2, 0)])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n"
+        ]
+      }
+    ]
+  },
   "physical_opt_misc_auto_identity": {
     "queries": [
       {
@@ -315,20 +368,20 @@
         "output": [
           "Execution Plan",
           "\nPhysicalProject(EXPR$0=[$1], col3=[$0])",
-          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])",
-          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[LEAF])",
           "\n        PhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n          PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n          PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n            PhysicalProject(col2=[$1], col3=[$2])",
           "\n              PhysicalTableScan(table=[[default, a]])",
-          "\n          PhysicalAggregate(group=[{0}])",
+          "\n          PhysicalAggregate(group=[{0}], aggType=[DIRECT])",
           "\n            PhysicalUnion(all=[true])",
-          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n                PhysicalProject(col2=[$1])",
           "\n                  PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n                    PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], 
execStrategy=[STREAMING], collation=[[]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n                PhysicalProject(col2=[$1])",
           "\n                  PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
           "\n                    PhysicalTableScan(table=[[default, a]])",
@@ -345,7 +398,7 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$0], dir0=[ASC])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalProject(col2=[$1], col3=[$2])",
           "\n      PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n        PhysicalTableScan(table=[[default, a]])",
@@ -358,7 +411,7 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
           "\n      PhysicalProject(col2=[$1], col3=[$2])",
           "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
@@ -372,7 +425,7 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])",
           "\n      PhysicalProject(col2=[$1], col3=[$2])",
           "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
@@ -386,7 +439,7 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(offset=[10], fetch=[11])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalSort(fetch=[21])",
           "\n      PhysicalProject(col2=[$1], col3=[$2])",
           "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
@@ -400,9 +453,9 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT])",
           "\n        PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
@@ -413,9 +466,9 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n    PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT])",
           "\n        PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
@@ -441,7 +494,7 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR SELECT col2, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col2",
         "output": [
           "Execution Plan",
-          "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()])",
+          "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT], 
limit=[100000])",
           "\n  PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n    PhysicalTableScan(table=[[default, a]])",
           "\n"
@@ -455,9 +508,9 @@
           "\nPhysicalProject(rnk=[$3], col1=[$0])",
           "\n  PhysicalFilter(condition=[=($3, 1)])",
           "\n    PhysicalWindow(window#0=[window(partition {1} order by [2] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n      PhysicalAggregate(group=[{0, 1, 2}])",
-          "\n        PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
-          "\n          PhysicalAggregate(group=[{0, 1, 2}])",
+          "\n      PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
+          "\n        PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n          PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], 
limit=[100000])",
           "\n            PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n              PhysicalTableScan(table=[[default, a]])",
           "\n"
@@ -469,9 +522,9 @@
         "output": [
           "Execution Plan",
           "\nPhysicalSort(offset=[100], fetch=[400])",
-          "\n  PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)])",
-          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
-          "\n      PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()])",
+          "\n  PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], 
aggType=[FINAL])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], 
aggType=[LEAF], limit=[100000])",
           "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n          PhysicalTableScan(table=[[default, a]])",
           "\n"
@@ -487,12 +540,12 @@
         "output": [
           "Execution Plan",
           "\nPhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalSort(fetch=[100000])",
           "\n      PhysicalProject(col2=[$1], col3=[$2])",
           "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n          PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n    PhysicalSort(fetch=[100000])",
           "\n      PhysicalProject(col1=[$0])",
           "\n        PhysicalTableScan(table=[[default, b]])",
@@ -505,14 +558,14 @@
         "output": [
           "Execution Plan",
           "\nPhysicalProject(EXPR$0=[$1], col2=[$0])",
-          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT()])",
+          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[DIRECT])",
           "\n    PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
-          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n        PhysicalSort(fetch=[100000])",
           "\n          PhysicalProject(col2=[$1])",
           "\n            PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
           "\n              PhysicalTableScan(table=[[default, a]])",
-          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], 
distKeys=[[]], execStrategy=[STREAMING], collation=[[]])",
+          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n        PhysicalSort(fetch=[100000])",
           "\n          PhysicalProject(col1=[$0])",
           "\n            PhysicalTableScan(table=[[default, b]])",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to