This is an automated email from the ASF dual-hosted git repository.

chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ea9407819d [multistage] Cleanup Plan Fragmenter Logic + Mailbox 
Receive Sort Fixes (#15943)
ea9407819d is described below

commit ea9407819dbc8ac7b3b53dc9deb768be91c972db
Author: Ankit Sultana <ankitsult...@uber.com>
AuthorDate: Mon Jun 2 16:59:49 2025 -0500

    [multistage] Cleanup Plan Fragmenter Logic + Mailbox Receive Sort Fixes 
(#15943)
    
    * [multistage] Cleanup Plan Fragmenter Logic
    
    * minor refactor
    
    * cleanup the cleanup
    
    * bug fixes
    
    * undo collation fix in lite-mode
---
 .../query/context/PhysicalPlannerContext.java      |   5 +
 .../v2/PlanFragmentAndMailboxAssignment.java       | 145 ++---
 .../physical/v2/opt/PhysicalOptRuleSet.java        |   2 +
 .../v2/opt/rules/RootExchangeInsertRule.java       |  78 +++
 .../resources/queries/PhysicalOptimizerPlans.json  | 604 +++++++++++----------
 5 files changed, 484 insertions(+), 350 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 363eaa2055..66809336e1 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -82,6 +82,7 @@ public class PhysicalPlannerContext {
     _instanceId = instanceId;
     _queryOptions = queryOptions == null ? Map.of() : queryOptions;
     _useLiteMode = PhysicalPlannerContext.useLiteMode(queryOptions);
+    _instanceIdToQueryServerInstance.put(instanceId, 
getBrokerQueryServerInstance());
   }
 
   public Supplier<Integer> getNodeIdGenerator() {
@@ -128,6 +129,10 @@ public class PhysicalPlannerContext {
     return 
Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER,
 "false"));
   }
 
+  private QueryServerInstance getBrokerQueryServerInstance() {
+    return new QueryServerInstance(_instanceId, _hostName, _port, _port);
+  }
+
   private static boolean useLiteMode(@Nullable Map<String, String> 
queryOptions) {
     if (queryOptions == null) {
       return false;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index 88761f0bcf..abaa363daf 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -20,7 +20,6 @@ package org.apache.pinot.query.planner.physical.v2;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -28,9 +27,7 @@ import java.util.Map;
 import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.RelDistribution;
-import org.apache.calcite.rel.core.Exchange;
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.commons.collections4.MapUtils;
 import org.apache.pinot.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
 import org.apache.pinot.common.utils.DataSchema;
@@ -38,6 +35,7 @@ import org.apache.pinot.query.context.PhysicalPlannerContext;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
 import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
 import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
 import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.planner.plannode.PlanNode;
@@ -48,112 +46,133 @@ import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.SharedMailboxInfos;
 
 
+/**
+ * <h1>Responsibilities</h1>
+ * This does the following:
+ * <ul>
+ *   <li>Splits plan around PhysicalExchange nodes to create plan 
fragments.</li>
+ *   <li>Converts PRelNodes to PlanNodes.</li>
+ *   <li>
+ *     Creates mailboxes for connecting plan fragments. This is done simply 
based on the workers in the send/receive
+ *     plan nodes, and the exchange strategy (identity, partitioning, etc.).
+ *   </li>
+ *   <li>
+ *     Creates metadata for each plan fragment, which includes the scanned 
tables, unavailable segments, etc.
+ *   </li>
+ * </ul>
+ * <h1>Design Note</h1>
+ * This class is completely un-opinionated. The old optimizer had a lot of 
custom logic added to mailbox assignment,
+ * but this class instead doesn't do any special handling, apart from 
assigning mailboxes based on the exchange
+ * strategy. This is an important and conscious design choice, because it 
ensures division of responsibilities and
+ * allows optimizer rules like worker assignment to completely own their 
responsibilities. This is also important for
+ * keeping the optimizer maximally pluggable. (e.g. you can swap out the 
default worker assignment rule with a
+ * custom rule like the LiteMode worker assignment rule).
+ */
 public class PlanFragmentAndMailboxAssignment {
   private static final int ROOT_FRAGMENT_ID = 0;
-  private static final int FIRST_NON_ROOT_FRAGMENT_ID = 1;
 
   public Result compute(PRelNode rootPRelNode, PhysicalPlannerContext 
physicalPlannerContext) {
-    Preconditions.checkState(!(rootPRelNode.unwrap() instanceof Exchange), 
"root node should never be exchange");
-    final DataSchema rootDataSchema = 
PRelToPlanNodeConverter.toDataSchema(rootPRelNode.unwrap().getRowType());
-    // Create input fragment's send node.
-    MailboxSendNode sendNode = new MailboxSendNode(FIRST_NON_ROOT_FRAGMENT_ID, 
rootDataSchema, new ArrayList<>(),
-        ROOT_FRAGMENT_ID, PinotRelExchangeType.getDefaultExchangeType(), 
RelDistribution.Type.SINGLETON,
-        null, false, null, false);
-    // Create root receive node.
-    MailboxReceiveNode rootReceiveNode = new 
MailboxReceiveNode(ROOT_FRAGMENT_ID, rootDataSchema,
-        FIRST_NON_ROOT_FRAGMENT_ID, 
PinotRelExchangeType.getDefaultExchangeType(),
-        RelDistribution.Type.BROADCAST_DISTRIBUTED, null, null, false, false, 
sendNode);
     // Create the first two fragments.
     Context context = new Context(physicalPlannerContext);
-    PlanFragment rootFragment = createFragment(ROOT_FRAGMENT_ID, 
rootReceiveNode, new ArrayList<>(), context);
-    PlanFragment firstInputFragment = 
createFragment(FIRST_NON_ROOT_FRAGMENT_ID, sendNode, new ArrayList<>(), 
context);
-    rootFragment.getChildren().add(firstInputFragment);
-    QueryServerInstance brokerInstance = new 
QueryServerInstance(physicalPlannerContext.getInstanceId(),
-        physicalPlannerContext.getHostName(), 
physicalPlannerContext.getPort(), physicalPlannerContext.getPort());
-    computeMailboxInfos(FIRST_NON_ROOT_FRAGMENT_ID, ROOT_FRAGMENT_ID,
-        
createWorkerMap(rootPRelNode.getPinotDataDistributionOrThrow().getWorkers(), 
context),
-        ImmutableMap.of(0, brokerInstance), 
ExchangeStrategy.SINGLETON_EXCHANGE, context);
     // Traverse entire tree.
-    
context._fragmentMetadataMap.get(ROOT_FRAGMENT_ID).setWorkerIdToServerInstanceMap(ImmutableMap.of(
-        0, brokerInstance));
-    visit(rootPRelNode, sendNode, firstInputFragment, context);
+    process(rootPRelNode, null, ROOT_FRAGMENT_ID, context);
     Result result = new Result();
     result._fragmentMetadataMap = context._fragmentMetadataMap;
     result._planFragmentMap = context._planFragmentMap;
     return result;
   }
 
-  /**
-   * Invariants: 1. Parent PlanNode does not have current node in input yet. 
2. This node is NOT the fragment root. This
-   * is because each fragment root is a MailboxSendNode.
-   */
-  private void visit(PRelNode pRelNode, @Nullable PlanNode parent, 
PlanFragment currentFragment, Context context) {
-    int currentFragmentId = currentFragment.getFragmentId();
-    DispatchablePlanMetadata fragmentMetadata = 
context._fragmentMetadataMap.get(currentFragmentId);
-    if (MapUtils.isEmpty(fragmentMetadata.getWorkerIdToServerInstanceMap())) {
-      // TODO: This is quite a complex invariant.
-      fragmentMetadata.setWorkerIdToServerInstanceMap(createWorkerMap(
-          pRelNode.getPinotDataDistributionOrThrow().getWorkers(), context));
-    }
+  private void process(PRelNode pRelNode, @Nullable PlanNode parent, int 
currentFragmentId, Context context) {
     if (pRelNode.unwrap() instanceof TableScan) {
-      TableScanMetadata tableScanMetadata = 
Objects.requireNonNull(pRelNode.getTableScanMetadata(),
-          "No metadata in table scan PRelNode");
-      String tableName = 
tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow();
-      if (!tableScanMetadata.getUnavailableSegmentsMap().isEmpty()) {
-        fragmentMetadata.addUnavailableSegments(tableName,
-            tableScanMetadata.getUnavailableSegmentsMap().get(tableName));
-      }
-      fragmentMetadata.addScannedTable(tableName);
-      
fragmentMetadata.setWorkerIdToSegmentsMap(tableScanMetadata.getWorkedIdToSegmentsMap());
-      NodeHint nodeHint = NodeHint.fromRelHints(((TableScan) 
pRelNode.unwrap()).getHints());
-      
fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
-      if (tableScanMetadata.getTimeBoundaryInfo() != null) {
-        
fragmentMetadata.setTimeBoundaryInfo(tableScanMetadata.getTimeBoundaryInfo());
-      }
+      processTableScan((PhysicalTableScan) pRelNode.unwrap(), 
currentFragmentId, context);
     }
     if (pRelNode.unwrap() instanceof PhysicalExchange) {
+      // Split an exchange into two fragments: one for the sender and one for 
the receiver.
+      // The sender fragment will have a MailboxSendNode and receiver a 
MailboxReceiveNode.
+      // It is possible that the receiver fragment doesn't exist yet (e.g. 
when PhysicalExchange is the root node).
+      // In that case, we also create it here. If it exists already, we simply 
re-use it.
       PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap();
-      int senderFragmentId = context._planFragmentMap.size();
+      PlanFragment receiverFragment = 
context._planFragmentMap.get(currentFragmentId);
+      int senderFragmentId = context._planFragmentMap.size() + 
(receiverFragment == null ? 1 : 0);
       final DataSchema inputFragmentSchema = 
PRelToPlanNodeConverter.toDataSchema(
           pRelNode.getPRelInput(0).unwrap().getRowType());
       RelDistribution.Type distributionType = 
ExchangeStrategy.getRelDistribution(
           physicalExchange.getExchangeStrategy(), 
physicalExchange.getDistributionKeys()).getType();
-      List<PlanNode> inputs = new ArrayList<>();
-      MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, 
inputFragmentSchema, inputs, currentFragmentId,
-          PinotRelExchangeType.getDefaultExchangeType(), distributionType, 
physicalExchange.getDistributionKeys(),
-          false, physicalExchange.getRelCollation().getFieldCollations(), 
false /* todo: set sortOnSender */);
+      MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, 
inputFragmentSchema, new ArrayList<>(),
+          currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(), 
distributionType,
+          physicalExchange.getDistributionKeys(), false, 
physicalExchange.getRelCollation().getFieldCollations(),
+          false /* sort on sender */);
       MailboxReceiveNode receiveNode = new 
MailboxReceiveNode(currentFragmentId, inputFragmentSchema,
           senderFragmentId, PinotRelExchangeType.getDefaultExchangeType(), 
distributionType,
           physicalExchange.getDistributionKeys(), 
physicalExchange.getRelCollation().getFieldCollations(),
-          false /* TODO: set sort on receiver */, false /* TODO: set sort on 
sender */, sendNode);
-      PlanFragment newPlanFragment = createFragment(senderFragmentId, 
sendNode, new ArrayList<>(), context);
+          !physicalExchange.getRelCollation().getFieldCollations().isEmpty(), 
false, sendNode);
+      if (receiverFragment == null) {
+        /*
+         * If the root node is an exchange, then the root fragment will not 
exist yet. We create it here.
+         */
+        receiverFragment = createFragment(currentFragmentId, receiveNode, new 
ArrayList<>(), context,
+            pRelNode.getPinotDataDistributionOrThrow().getWorkers());
+      }
+      PlanFragment newPlanFragment = createFragment(senderFragmentId, 
sendNode, new ArrayList<>(), context,
+          
physicalExchange.getPRelInputs().get(0).getPinotDataDistributionOrThrow().getWorkers());
       Map<Integer, QueryServerInstance> senderWorkers = 
createWorkerMap(pRelNode.getPRelInput(0)
           .getPinotDataDistributionOrThrow().getWorkers(), context);
       Map<Integer, QueryServerInstance> receiverWorkers = 
createWorkerMap(pRelNode.getPinotDataDistributionOrThrow()
           .getWorkers(), context);
       computeMailboxInfos(senderFragmentId, currentFragmentId, senderWorkers, 
receiverWorkers,
           physicalExchange.getExchangeStrategy(), context);
-      currentFragment.getChildren().add(newPlanFragment);
-      visit(pRelNode.getPRelInput(0), sendNode, newPlanFragment, context);
+      
context._planFragmentMap.get(currentFragmentId).getChildren().add(newPlanFragment);
+      process(pRelNode.getPRelInput(0), sendNode, 
newPlanFragment.getFragmentId(), context);
       if (parent != null) {
         parent.getInputs().add(receiveNode);
       }
       return;
     }
+    // Convert PRelNode to PlanNode, and create parent/input PlanNode tree.
     PlanNode planNode = PRelToPlanNodeConverter.toPlanNode(pRelNode, 
currentFragmentId);
+    if (context._planFragmentMap.isEmpty()) {
+      /*
+       * If the root-node is NOT an exchange, then we create the root fragment 
here. If it's an exchange, it will be
+       * created in the process of handling the exchange.
+       */
+      createFragment(ROOT_FRAGMENT_ID, planNode, new ArrayList<>(), context,
+          pRelNode.getPinotDataDistributionOrThrow().getWorkers());
+    }
     for (PRelNode input : pRelNode.getPRelInputs()) {
-      visit(input, planNode, currentFragment, context);
+      process(input, planNode, currentFragmentId, context);
     }
     if (parent != null) {
       parent.getInputs().add(planNode);
     }
   }
 
+  private void processTableScan(PhysicalTableScan tableScan, int 
currentFragmentId, Context context) {
+    DispatchablePlanMetadata fragmentMetadata = 
context._fragmentMetadataMap.get(currentFragmentId);
+    TableScanMetadata tableScanMetadata = 
Objects.requireNonNull(tableScan.getTableScanMetadata(),
+        "No metadata in table scan PRelNode");
+    String tableName = 
tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow();
+    if (!tableScanMetadata.getUnavailableSegmentsMap().isEmpty()) {
+      fragmentMetadata.addUnavailableSegments(tableName,
+          tableScanMetadata.getUnavailableSegmentsMap().get(tableName));
+    }
+    fragmentMetadata.addScannedTable(tableName);
+    
fragmentMetadata.setWorkerIdToSegmentsMap(tableScanMetadata.getWorkedIdToSegmentsMap());
+    NodeHint nodeHint = NodeHint.fromRelHints(tableScan.getHints());
+    
fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
+    if (tableScanMetadata.getTimeBoundaryInfo() != null) {
+      
fragmentMetadata.setTimeBoundaryInfo(tableScanMetadata.getTimeBoundaryInfo());
+    }
+  }
+
   private PlanFragment createFragment(int fragmentId, PlanNode planNode, 
List<PlanFragment> inputFragments,
-      Context context) {
+      Context context, List<String> workers) {
+    // track new plan fragment
     PlanFragment fragment = new PlanFragment(fragmentId, planNode, 
inputFragments);
     context._planFragmentMap.put(fragmentId, fragment);
-    context._fragmentMetadataMap.put(fragmentId, new 
DispatchablePlanMetadata());
+    // add fragment metadata
+    DispatchablePlanMetadata fragmentMetadata = new DispatchablePlanMetadata();
+    fragmentMetadata.setWorkerIdToServerInstanceMap(createWorkerMap(workers, 
context));
+    context._fragmentMetadataMap.put(fragmentId, fragmentMetadata);
     return fragment;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
index aa8420e198..c3dd8f9de8 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java
@@ -28,6 +28,7 @@ import 
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageBoundaryRul
 import 
org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule;
 import 
org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeSortInsertRule;
 import 
org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeWorkerAssignmentRule;
+import 
org.apache.pinot.query.planner.physical.v2.opt.rules.RootExchangeInsertRule;
 import org.apache.pinot.query.planner.physical.v2.opt.rules.SortPushdownRule;
 import 
org.apache.pinot.query.planner.physical.v2.opt.rules.WorkerExchangeAssignmentRule;
 
@@ -48,6 +49,7 @@ public class PhysicalOptRuleSet {
     if (context.isUseLiteMode()) {
       transformers.add(create(new LiteModeSortInsertRule(context), 
RuleExecutors.Type.POST_ORDER, context));
     }
+    transformers.add(new RootExchangeInsertRule(context));
     return transformers;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
new file mode 100644
index 0000000000..714f096a31
--- /dev/null
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.opt.rules;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange;
+import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer;
+
+
+/**
+ * Adds an exchange node at the root of the plan if the root node is not 
already located as a singleton on the broker.
+ * This is because the entire data needs to be returned by the broker to the 
client.
+ */
+public class RootExchangeInsertRule implements PRelNodeTransformer {
+  private final PhysicalPlannerContext _context;
+
+  public RootExchangeInsertRule(PhysicalPlannerContext context) {
+    _context = context;
+  }
+
+  @Override
+  public PRelNode execute(PRelNode currentNode) {
+    PinotDataDistribution rootDataDistribution = 
currentNode.getPinotDataDistributionOrThrow();
+    List<String> workers = List.of(brokerWorkerId());
+    if (rootDataDistribution.getWorkers().equals(workers)) {
+      // If the root node is already distributed to the broker, no need to 
insert an exchange.
+      return currentNode;
+    }
+    PinotDataDistribution pinotDataDistribution = new 
PinotDataDistribution(RelDistribution.Type.SINGLETON,
+        workers, workers.hashCode(), null, inferCollation(currentNode));
+    return new PhysicalExchange(nodeId(), currentNode, pinotDataDistribution, 
List.of(),
+        ExchangeStrategy.SINGLETON_EXCHANGE, null, 
PinotExecStrategyTrait.getDefaultExecStrategy());
+  }
+
+  private String brokerWorkerId() {
+    return String.format("0@%s", _context.getInstanceId());
+  }
+
+  private int nodeId() {
+    return _context.getNodeIdGenerator().get();
+  }
+
+  /**
+   * If the current node is distributed to a single worker, inherit the 
collation trait from it. Otherwise, return null.
+   */
+  @Nullable
+  private RelCollation inferCollation(PRelNode currentNode) {
+    // Infer collation from the current node if needed.
+    if (currentNode.getPinotDataDistributionOrThrow().getWorkers().size() != 
1) {
+      return null;
+    }
+    return currentNode.getPinotDataDistributionOrThrow().getCollation();
+  }
+}
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index acab86c39c..d9ef04fa2a 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -6,16 +6,17 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT a.col1, 
a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$0], dir0=[ASC])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalProject(col1=[$0], ts=[$1], col3=[$3])",
-          "\n      PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n          PhysicalProject(col1=[$0], ts=[$7])",
-          "\n            PhysicalTableScan(table=[[default, a]])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n          PhysicalProject(col2=[$1], col3=[$2])",
-          "\n            PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[ASC])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalProject(col1=[$0], ts=[$1], col3=[$3])",
+          "\n        PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalProject(col1=[$0], ts=[$7])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalProject(col2=[$1], col3=[$2])",
+          "\n              PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -24,16 +25,17 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT a.col1 
AS value1, a.ts AS ts1, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY 
a.col1",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$0], dir0=[ASC])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])",
-          "\n      PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n          PhysicalProject(col1=[$0], ts=[$7])",
-          "\n            PhysicalTableScan(table=[[default, a]])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n          PhysicalProject(col2=[$1], col3=[$2])",
-          "\n            PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[ASC])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])",
+          "\n        PhysicalJoin(condition=[=($0, $2)], joinType=[inner])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalProject(col1=[$0], ts=[$7])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalProject(col2=[$1], col3=[$2])",
+          "\n              PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -42,11 +44,12 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM 
a JOIN b ON a.col1 = b.col2",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n    PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]])",
-          "\n    PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n      PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]])",
+          "\n      PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -55,12 +58,13 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM 
a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n    PhysicalFilter(condition=[>=($2, 0)])",
-          "\n      PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]])",
-          "\n    PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalJoin(condition=[=($0, $10)], joinType=[inner])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n      PhysicalFilter(condition=[>=($2, 0)])",
+          "\n        PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[1]])",
+          "\n      PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -69,12 +73,13 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM 
a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[AND(=($0, $10), >($2, $11))], 
joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n    PhysicalFilter(condition=[>=($2, 0)])",
-          "\n      PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])",
-          "\n    PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalJoin(condition=[AND(=($0, $10), >($2, $11))], 
joinType=[inner])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n      PhysicalFilter(condition=[>=($2, 0)])",
+          "\n        PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])",
+          "\n      PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -83,11 +88,12 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM 
a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[AND(=($0, $9), =($1, $10))], 
joinType=[inner])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]])",
-          "\n    PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]])",
-          "\n    PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalJoin(condition=[AND(=($0, $9), =($1, $10))], 
joinType=[inner])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]])",
+          "\n      PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0, 1]])",
+          "\n      PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       }
@@ -100,14 +106,15 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(col1=[$0], col2=[$1])",
-          "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[2]])",
-          "\n      PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n        PhysicalTableScan(table=[[default, a]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n      PhysicalProject(col3=[$2])",
-          "\n        PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(col1=[$0], col2=[$1])",
+          "\n    PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[2]])",
+          "\n        PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n        PhysicalProject(col3=[$2])",
+          "\n          PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -116,14 +123,15 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM b WHERE col3 IN (SELECT col3 FROM b)",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(col1=[$0], col2=[$1])",
-          "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n      PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n        PhysicalTableScan(table=[[default, b]])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n      PhysicalProject(col3=[$2])",
-          "\n        PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(col1=[$0], col2=[$1])",
+          "\n    PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
+          "\n      PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n        PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n          PhysicalTableScan(table=[[default, b]])",
+          "\n      PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n        PhysicalProject(col3=[$2])",
+          "\n          PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -132,25 +140,26 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM a WHERE col3 IN (SELECT col3 FROM b WHERE col2='foo') AND col3 IN 
(SELECT col3 FROM b WHERE col2='bar') AND col3 IN (SELECT col3 FROM b WHERE 
col2='lorem')",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(col1=[$0], col2=[$1])",
-          "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(col1=[$0], col2=[$1])",
           "\n    PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
           "\n      PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
-          "\n          PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n            PhysicalTableScan(table=[[default, a]])",
+          "\n        PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
+          "\n            PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalProject(col3=[$2])",
+          "\n              PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
+          "\n                PhysicalTableScan(table=[[default, b]])",
           "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
           "\n          PhysicalProject(col3=[$2])",
-          "\n            PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
+          "\n            PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
           "\n              PhysicalTableScan(table=[[default, b]])",
           "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
           "\n        PhysicalProject(col3=[$2])",
-          "\n          PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
+          "\n          PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
           "\n            PhysicalTableScan(table=[[default, b]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n      PhysicalProject(col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
-          "\n          PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -159,29 +168,30 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM a WHERE col3 IN (SELECT col3 FROM b WHERE col2='foo') AND col3 NOT IN 
(SELECT col3 FROM b WHERE col2='bar') AND col3 IN (SELECT col3 FROM b WHERE 
col2='lorem')",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(col1=[$0], col2=[$1])",
-          "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n    PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n      PhysicalFilter(condition=[IS NOT TRUE($5)])",
-          "\n        PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
-          "\n          PhysicalProject(col1=[$0], col2=[$1], col3=[$2], 
col31=[$2])",
-          "\n            PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
-          "\n                PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n                  PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n                PhysicalProject(col3=[$2])",
-          "\n                  PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(col1=[$0], col2=[$1])",
+          "\n    PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
+          "\n      PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n        PhysicalFilter(condition=[IS NOT TRUE($5)])",
+          "\n          PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
+          "\n            PhysicalProject(col1=[$0], col2=[$1], col3=[$2], 
col31=[$2])",
+          "\n              PhysicalJoin(condition=[=($2, $3)], 
joinType=[semi])",
+          "\n                
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
+          "\n                  PhysicalProject(col1=[$0], col2=[$1], 
col3=[$2])",
+          "\n                    PhysicalTableScan(table=[[default, a]])",
+          "\n                
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n                  PhysicalProject(col3=[$2])",
+          "\n                    PhysicalFilter(condition=[=($1, 
_UTF-8'foo')])",
+          "\n                      PhysicalTableScan(table=[[default, b]])",
+          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n              PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
+          "\n                PhysicalProject(col3=[$2], $f1=[true])",
+          "\n                  PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
           "\n                    PhysicalTableScan(table=[[default, b]])",
-          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n            PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
-          "\n              PhysicalProject(col3=[$2], $f1=[true])",
-          "\n                PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
-          "\n                  PhysicalTableScan(table=[[default, b]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n      PhysicalProject(col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
-          "\n          PhysicalTableScan(table=[[default, b]])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n        PhysicalProject(col3=[$2])",
+          "\n          PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
+          "\n            PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -190,30 +200,31 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM a WHERE col3 IN (SELECT col3 FROM b WHERE col2='foo') AND col3 NOT IN 
(SELECT col3 FROM a WHERE col2='bar') AND col3 IN (SELECT col3 FROM b WHERE 
col2='lorem')",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(col1=[$0], col2=[$1])",
-          "\n  PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n    PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n      PhysicalFilter(condition=[IS NOT TRUE($5)])",
-          "\n        PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
-          "\n          PhysicalProject(col1=[$0], col2=[$1], col3=[$2], 
col31=[$2])",
-          "\n            PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
-          "\n                PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
-          "\n                  PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n                PhysicalProject(col3=[$2])",
-          "\n                  PhysicalFilter(condition=[=($1, _UTF-8'foo')])",
-          "\n                    PhysicalTableScan(table=[[default, b]])",
-          "\n          PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[FINAL])",
-          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n              PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[LEAF])",
-          "\n                PhysicalProject(col3=[$2], $f1=[true])",
-          "\n                  PhysicalFilter(condition=[=($1, _UTF-8'bar')])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(col1=[$0], col2=[$1])",
+          "\n    PhysicalJoin(condition=[=($2, $3)], joinType=[semi])",
+          "\n      PhysicalProject(col1=[$0], col2=[$1], col3=[$2])",
+          "\n        PhysicalFilter(condition=[IS NOT TRUE($5)])",
+          "\n          PhysicalJoin(condition=[=($3, $4)], joinType=[left])",
+          "\n            PhysicalProject(col1=[$0], col2=[$1], col3=[$2], 
col31=[$2])",
+          "\n              PhysicalJoin(condition=[=($2, $3)], 
joinType=[semi])",
+          "\n                
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])",
+          "\n                  PhysicalProject(col1=[$0], col2=[$1], 
col3=[$2])",
           "\n                    PhysicalTableScan(table=[[default, a]])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n      PhysicalProject(col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
-          "\n          PhysicalTableScan(table=[[default, b]])",
+          "\n                
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n                  PhysicalProject(col3=[$2])",
+          "\n                    PhysicalFilter(condition=[=($1, 
_UTF-8'foo')])",
+          "\n                      PhysicalTableScan(table=[[default, b]])",
+          "\n            PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[FINAL])",
+          "\n              
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n                PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[LEAF])",
+          "\n                  PhysicalProject(col3=[$2], $f1=[true])",
+          "\n                    PhysicalFilter(condition=[=($1, 
_UTF-8'bar')])",
+          "\n                      PhysicalTableScan(table=[[default, a]])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n        PhysicalProject(col3=[$2])",
+          "\n          PhysicalFilter(condition=[=($1, _UTF-8'lorem')])",
+          "\n            PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       }
@@ -226,19 +237,20 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 IN 
(SELECT col2 FROM a WHERE col3 = 'bar')",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
           "\n  PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
-          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n      PhysicalProject(col1=[$0], col2=[$1])",
-          "\n        PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
+          "\n      PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n        PhysicalProject(col1=[$0], col2=[$1])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\n      PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n        PhysicalProject(col2=[$1])",
+          "\n          PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
           "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
           "\n      PhysicalProject(col2=[$1])",
-          "\n        PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER 
NOT NULL)])",
+          "\n        PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER 
NOT NULL)])",
           "\n          PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n    PhysicalProject(col2=[$1])",
-          "\n      PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER 
NOT NULL)])",
-          "\n        PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -247,28 +259,29 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
col2 FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 NOT 
IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a 
WHERE col3 = 'lorem')",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
-          "\n  PhysicalProject(col1=[$0], col2=[$1])",
-          "\n    PhysicalFilter(condition=[IS NOT TRUE($4)])",
-          "\n      PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
-          "\n        PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
-          "\n          PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
-          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n              PhysicalProject(col1=[$0], col2=[$1])",
-          "\n                PhysicalTableScan(table=[[default, a]])",
-          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n              PhysicalProject(col2=[$1])",
-          "\n                PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
+          "\n    PhysicalProject(col1=[$0], col2=[$1])",
+          "\n      PhysicalFilter(condition=[IS NOT TRUE($4)])",
+          "\n        PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
+          "\n          PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
+          "\n            PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
+          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                PhysicalProject(col1=[$0], col2=[$1])",
           "\n                  PhysicalTableScan(table=[[default, a]])",
-          "\n        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n          PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
-          "\n            PhysicalProject(col2=[$1], $f1=[true])",
-          "\n              PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
-          "\n                PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n    PhysicalProject(col2=[$1])",
-          "\n      PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER 
NOT NULL)])",
-          "\n        PhysicalTableScan(table=[[default, a]])",
+          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                PhysicalProject(col2=[$1])",
+          "\n                  PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
+          "\n                    PhysicalTableScan(table=[[default, a]])",
+          "\n          PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n            PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
+          "\n              PhysicalProject(col2=[$1], $f1=[true])",
+          "\n                PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
+          "\n                  PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n      PhysicalProject(col2=[$1])",
+          "\n        PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'lorem'):INTEGER NOT NULL)])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -277,31 +290,32 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, 
COUNT(*) FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 
NOT IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a 
WHERE col3 = 'lorem') GROUP BY col1",
         "output": [
           "Execution Plan",
-          "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
-          "\n  PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n    PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[LEAF])",
-          "\n      PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
-          "\n        PhysicalProject(col1=[$0], col2=[$1])",
-          "\n          PhysicalFilter(condition=[IS NOT TRUE($4)])",
-          "\n            PhysicalJoin(condition=[=($2, $3)], joinType=[left])",
-          "\n              PhysicalProject(col1=[$0], col2=[$1], col21=[$1])",
-          "\n                PhysicalJoin(condition=[=($1, $2)], 
joinType=[semi])",
-          "\n                  
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                    PhysicalProject(col1=[$0], col2=[$1])",
-          "\n                      PhysicalTableScan(table=[[default, a]])",
-          "\n                  
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                    PhysicalProject(col2=[$1])",
-          "\n                      PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
+          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n      PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[LEAF])",
+          "\n        PhysicalJoin(condition=[=($1, $2)], joinType=[semi])",
+          "\n          PhysicalProject(col1=[$0], col2=[$1])",
+          "\n            PhysicalFilter(condition=[IS NOT TRUE($4)])",
+          "\n              PhysicalJoin(condition=[=($2, $3)], 
joinType=[left])",
+          "\n                PhysicalProject(col1=[$0], col2=[$1], 
col21=[$1])",
+          "\n                  PhysicalJoin(condition=[=($1, $2)], 
joinType=[semi])",
+          "\n                    
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                      PhysicalProject(col1=[$0], col2=[$1])",
           "\n                        PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
-          "\n                  PhysicalProject(col2=[$1], $f1=[true])",
-          "\n                    PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
-          "\n                      PhysicalTableScan(table=[[default, a]])",
-          "\n        PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n          PhysicalProject(col2=[$1])",
-          "\n            PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'lorem'):INTEGER NOT NULL)])",
-          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\n                    
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                      PhysicalProject(col2=[$1])",
+          "\n                        PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'foo'):INTEGER NOT NULL)])",
+          "\n                          PhysicalTableScan(table=[[default, 
a]])",
+          "\n                
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                  PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], 
aggType=[DIRECT])",
+          "\n                    PhysicalProject(col2=[$1], $f1=[true])",
+          "\n                      PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
+          "\n                        PhysicalTableScan(table=[[default, a]])",
+          "\n          PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n            PhysicalProject(col2=[$1])",
+          "\n              PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'lorem'):INTEGER NOT NULL)])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       }
@@ -314,14 +328,15 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_leaf_return_final_result='true', is_enable_group_trim='true') */ 
col1, COUNT(DISTINCT col2) AS cnt FROM a WHERE col3 >= 0 GROUP BY col1 ORDER BY 
cnt DESC LIMIT 10",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
-          "\n      PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], 
aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n          PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true], 
collations=[[1 DESC]], limit=[10])",
-          "\n            PhysicalFilter(condition=[>=($2, 0)])",
-          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])",
+          "\n        PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], 
aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n            PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true], 
collations=[[1 DESC]], limit=[10])",
+          "\n              PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -330,15 +345,16 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a 
WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
-          "\n      PhysicalProject(cnt=[$1])",
-          "\n        PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], 
aggType=[FINAL], limit=[10])",
-          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
-          "\n            PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])",
-          "\n              PhysicalFilter(condition=[>=($2, 0)])",
-          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])",
+          "\n        PhysicalProject(cnt=[$1])",
+          "\n          PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], limit=[10])",
+          "\n            
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])",
+          "\n              PhysicalAggregate(group=[{0}], 
agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])",
+          "\n                PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                  PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -347,14 +363,15 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE 
col3 >= 0 LIMIT 10",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(fetch=[10])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(fetch=[10])",
-          "\n      PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], 
limit=[10])",
-          "\n        
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
-          "\n          PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], 
limit=[10])",
-          "\n            PhysicalFilter(condition=[>=($2, 0)])",
-          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(fetch=[10])",
+          "\n        PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], 
limit=[10])",
+          "\n          
PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])",
+          "\n            PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], 
limit=[10])",
+          "\n              PhysicalFilter(condition=[>=($2, 0)])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       }
@@ -367,24 +384,25 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR WITH tmp AS 
(SELECT col2 FROM a WHERE col1 = 'foo' UNION ALL SELECT col2 FROM a WHERE col3 
= 'bar'), tmp2 AS (SELECT DISTINCT col2 FROM tmp) SELECT COUNT(*), col3 FROM a 
WHERE col2 IN (SELECT col2 FROM tmp2) GROUP BY col3",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(EXPR$0=[$1], col3=[$0])",
-          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
-          "\n    PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
-          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[LEAF])",
-          "\n        PhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n          PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n            PhysicalProject(col2=[$1], col3=[$2])",
-          "\n              PhysicalTableScan(table=[[default, a]])",
-          "\n          PhysicalAggregate(group=[{0}], aggType=[DIRECT])",
-          "\n            PhysicalUnion(all=[true])",
-          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                PhysicalProject(col2=[$1])",
-          "\n                  PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n                    PhysicalTableScan(table=[[default, a]])",
-          "\n              
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n                PhysicalProject(col2=[$1])",
-          "\n                  PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
-          "\n                    PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(EXPR$0=[$1], col3=[$0])",
+          "\n    PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], 
aggType=[FINAL])",
+          "\n      PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], 
distKeys=[[0]])",
+          "\n        PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[LEAF])",
+          "\n          PhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n            
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n              PhysicalProject(col2=[$1], col3=[$2])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\n            PhysicalAggregate(group=[{0}], aggType=[DIRECT])",
+          "\n              PhysicalUnion(all=[true])",
+          "\n                
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                  PhysicalProject(col2=[$1])",
+          "\n                    PhysicalFilter(condition=[=($0, 
_UTF-8'foo')])",
+          "\n                      PhysicalTableScan(table=[[default, a]])",
+          "\n                
PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n                  PhysicalProject(col2=[$1])",
+          "\n                    PhysicalFilter(condition=[=($2, 
CAST(_UTF-8'bar'):INTEGER NOT NULL)])",
+          "\n                      PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       }
@@ -397,11 +415,12 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, 
col3 FROM a WHERE col1 = 'foo' ORDER BY col2",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$0], dir0=[ASC])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalProject(col2=[$1], col3=[$2])",
-          "\n      PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n        PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[ASC])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalProject(col2=[$1], col3=[$2])",
+          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -410,12 +429,13 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, 
col3 FROM a WHERE col1 = 'foo' ORDER BY col2 LIMIT 10",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
-          "\n      PhysicalProject(col2=[$1], col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])",
+          "\n        PhysicalProject(col2=[$1], col3=[$2])",
+          "\n          PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -424,12 +444,13 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, 
col3 FROM a WHERE col1 = 'foo' ORDER BY col2 LIMIT 10, 11",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])",
-          "\n      PhysicalProject(col2=[$1], col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])",
+          "\n        PhysicalProject(col2=[$1], col3=[$2])",
+          "\n          PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -438,12 +459,13 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, 
col3 FROM a WHERE col1 = 'foo' LIMIT 10, 11",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(offset=[10], fetch=[11])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(fetch=[21])",
-          "\n      PhysicalProject(col2=[$1], col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(offset=[10], fetch=[11])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(fetch=[21])",
+          "\n        PhysicalProject(col2=[$1], col3=[$2])",
+          "\n          PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -452,11 +474,12 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, 
COUNT(*) as cnt FROM a GROUP BY col2 ORDER BY cnt LIMIT 100",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT])",
-          "\n        PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
+          "\n        PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT])",
+          "\n          PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -465,11 +488,12 @@
         "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, 
COUNT(*) as cnt FROM b GROUP BY col2 ORDER BY cnt LIMIT 100",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n  PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
-          "\n    PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
-          "\n      PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT])",
-          "\n        PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
+          "\n    PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])",
+          "\n      PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])",
+          "\n        PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT])",
+          "\n          PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       }
@@ -482,10 +506,11 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo'",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(fetch=[100000])",
-          "\n  PhysicalProject(col2=[$1], col3=[$2])",
-          "\n    PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n      PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(fetch=[100000])",
+          "\n    PhysicalProject(col2=[$1], col3=[$2])",
+          "\n      PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n        PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -494,9 +519,10 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR SELECT col2, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col2",
         "output": [
           "Execution Plan",
-          "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT], 
limit=[100000])",
-          "\n  PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n    PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalAggregate(group=[{1}], agg#0=[COUNT()], 
aggType=[DIRECT], limit=[100000])",
+          "\n    PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n      PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -505,14 +531,15 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR WITH tmp AS (SELECT col1, col2, col3, COUNT(*) FROM a WHERE col1 = 
'foo' GROUP BY col1, col2, col3) SELECT * FROM (SELECT ROW_NUMBER() OVER 
(PARTITION BY col2 ORDER BY col3) as rnk, col1 FROM tmp) WHERE rnk = 1",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(rnk=[$3], col1=[$0])",
-          "\n  PhysicalFilter(condition=[=($3, 1)])",
-          "\n    PhysicalWindow(window#0=[window(partition {1} order by [2] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
-          "\n      PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
-          "\n        PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n          PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], 
limit=[100000])",
-          "\n            PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n              PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(rnk=[$3], col1=[$0])",
+          "\n    PhysicalFilter(condition=[=($3, 1)])",
+          "\n      PhysicalWindow(window#0=[window(partition {1} order by [2] 
rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
+          "\n        PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])",
+          "\n          
PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n            PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], 
limit=[100000])",
+          "\n              PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       },
@@ -521,12 +548,13 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR WITH tmp AS (SELECT col1, col2, col3, COUNT(*) FROM a WHERE col1 = 
'foo' GROUP BY col1, col2, col3 ORDER BY col2) SELECT * FROM tmp LIMIT 100,400",
         "output": [
           "Execution Plan",
-          "\nPhysicalSort(offset=[100], fetch=[400])",
-          "\n  PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], 
aggType=[FINAL])",
-          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n      PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], 
aggType=[LEAF], limit=[100000])",
-          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n          PhysicalTableScan(table=[[default, a]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalSort(offset=[100], fetch=[400])",
+          "\n    PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], 
aggType=[FINAL])",
+          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n        PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], 
aggType=[LEAF], limit=[100000])",
+          "\n          PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
           "\n"
         ]
       }
@@ -539,16 +567,17 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT col1 
FROM b)",
         "output": [
           "Execution Plan",
-          "\nPhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(fetch=[100000])",
-          "\n      PhysicalProject(col2=[$1], col3=[$2])",
-          "\n        PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n          PhysicalTableScan(table=[[default, a]])",
-          "\n  PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n    PhysicalSort(fetch=[100000])",
-          "\n      PhysicalProject(col1=[$0])",
-          "\n        PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalJoin(condition=[=($0, $2)], joinType=[semi])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(fetch=[100000])",
+          "\n        PhysicalProject(col2=[$1], col3=[$2])",
+          "\n          PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n            PhysicalTableScan(table=[[default, a]])",
+          "\n    PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n      PhysicalSort(fetch=[100000])",
+          "\n        PhysicalProject(col1=[$0])",
+          "\n          PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       },
@@ -557,18 +586,19 @@
         "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN 
PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT 
col1 FROM b) GROUP BY col2",
         "output": [
           "Execution Plan",
-          "\nPhysicalProject(EXPR$0=[$1], col2=[$0])",
-          "\n  PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[DIRECT])",
-          "\n    PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
-          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n        PhysicalSort(fetch=[100000])",
-          "\n          PhysicalProject(col2=[$1])",
-          "\n            PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
-          "\n              PhysicalTableScan(table=[[default, a]])",
-          "\n      PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
-          "\n        PhysicalSort(fetch=[100000])",
-          "\n          PhysicalProject(col1=[$0])",
-          "\n            PhysicalTableScan(table=[[default, b]])",
+          "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n  PhysicalProject(EXPR$0=[$1], col2=[$0])",
+          "\n    PhysicalAggregate(group=[{0}], agg#0=[COUNT()], 
aggType=[DIRECT])",
+          "\n      PhysicalJoin(condition=[=($0, $1)], joinType=[semi])",
+          "\n        PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n          PhysicalSort(fetch=[100000])",
+          "\n            PhysicalProject(col2=[$1])",
+          "\n              PhysicalFilter(condition=[=($0, _UTF-8'foo')])",
+          "\n                PhysicalTableScan(table=[[default, a]])",
+          "\n        PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])",
+          "\n          PhysicalSort(fetch=[100000])",
+          "\n            PhysicalProject(col1=[$0])",
+          "\n              PhysicalTableScan(table=[[default, b]])",
           "\n"
         ]
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to