This is an automated email from the ASF dual-hosted git repository. chrispeck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ea9407819d [multistage] Cleanup Plan Fragmenter Logic + Mailbox Receive Sort Fixes (#15943) ea9407819d is described below commit ea9407819dbc8ac7b3b53dc9deb768be91c972db Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Mon Jun 2 16:59:49 2025 -0500 [multistage] Cleanup Plan Fragmenter Logic + Mailbox Receive Sort Fixes (#15943) * [multistage] Cleanup Plan Fragmenter Logic * minor refactor * cleanup the cleanup * bug fixes * undo collation fix in lite-mode --- .../query/context/PhysicalPlannerContext.java | 5 + .../v2/PlanFragmentAndMailboxAssignment.java | 145 ++--- .../physical/v2/opt/PhysicalOptRuleSet.java | 2 + .../v2/opt/rules/RootExchangeInsertRule.java | 78 +++ .../resources/queries/PhysicalOptimizerPlans.json | 604 +++++++++++---------- 5 files changed, 484 insertions(+), 350 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index 363eaa2055..66809336e1 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -82,6 +82,7 @@ public class PhysicalPlannerContext { _instanceId = instanceId; _queryOptions = queryOptions == null ? Map.of() : queryOptions; _useLiteMode = PhysicalPlannerContext.useLiteMode(queryOptions); + _instanceIdToQueryServerInstance.put(instanceId, getBrokerQueryServerInstance()); } public Supplier<Integer> getNodeIdGenerator() { @@ -128,6 +129,10 @@ public class PhysicalPlannerContext { return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER, "false")); } + private QueryServerInstance getBrokerQueryServerInstance() { + return new QueryServerInstance(_instanceId, _hostName, _port, _port); + } + private static boolean useLiteMode(@Nullable Map<String, String> queryOptions) { if (queryOptions == null) { return false; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java index 88761f0bcf..abaa363daf 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java @@ -20,7 +20,6 @@ package org.apache.pinot.query.planner.physical.v2; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -28,9 +27,7 @@ import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; -import org.apache.calcite.rel.core.Exchange; import org.apache.calcite.rel.core.TableScan; -import org.apache.commons.collections4.MapUtils; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; import org.apache.pinot.common.utils.DataSchema; @@ -38,6 +35,7 @@ import org.apache.pinot.query.context.PhysicalPlannerContext; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata; import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; @@ -48,112 +46,133 @@ import org.apache.pinot.query.routing.QueryServerInstance; import org.apache.pinot.query.routing.SharedMailboxInfos; +/** + * <h1>Responsibilities</h1> + * This does the following: + * <ul> + * <li>Splits plan around PhysicalExchange nodes to create plan fragments.</li> + * <li>Converts PRelNodes to PlanNodes.</li> + * <li> + * Creates mailboxes for connecting plan fragments. This is done simply based on the workers in the send/receive + * plan nodes, and the exchange strategy (identity, partitioning, etc.). + * </li> + * <li> + * Creates metadata for each plan fragment, which includes the scanned tables, unavailable segments, etc. + * </li> + * </ul> + * <h1>Design Note</h1> + * This class is completely un-opinionated. The old optimizer had a lot of custom logic added to mailbox assignment, + * but this class instead doesn't do any special handling, apart from assigning mailboxes based on the exchange + * strategy. This is an important and conscious design choice, because it ensures division of responsibilities and + * allows optimizer rules like worker assignment to completely own their responsibilities. This is also important for + * keeping the optimizer maximally pluggable. (e.g. you can swap out the default worker assignment rule with a + * custom rule like the LiteMode worker assignment rule). + */ public class PlanFragmentAndMailboxAssignment { private static final int ROOT_FRAGMENT_ID = 0; - private static final int FIRST_NON_ROOT_FRAGMENT_ID = 1; public Result compute(PRelNode rootPRelNode, PhysicalPlannerContext physicalPlannerContext) { - Preconditions.checkState(!(rootPRelNode.unwrap() instanceof Exchange), "root node should never be exchange"); - final DataSchema rootDataSchema = PRelToPlanNodeConverter.toDataSchema(rootPRelNode.unwrap().getRowType()); - // Create input fragment's send node. - MailboxSendNode sendNode = new MailboxSendNode(FIRST_NON_ROOT_FRAGMENT_ID, rootDataSchema, new ArrayList<>(), - ROOT_FRAGMENT_ID, PinotRelExchangeType.getDefaultExchangeType(), RelDistribution.Type.SINGLETON, - null, false, null, false); - // Create root receive node. - MailboxReceiveNode rootReceiveNode = new MailboxReceiveNode(ROOT_FRAGMENT_ID, rootDataSchema, - FIRST_NON_ROOT_FRAGMENT_ID, PinotRelExchangeType.getDefaultExchangeType(), - RelDistribution.Type.BROADCAST_DISTRIBUTED, null, null, false, false, sendNode); // Create the first two fragments. Context context = new Context(physicalPlannerContext); - PlanFragment rootFragment = createFragment(ROOT_FRAGMENT_ID, rootReceiveNode, new ArrayList<>(), context); - PlanFragment firstInputFragment = createFragment(FIRST_NON_ROOT_FRAGMENT_ID, sendNode, new ArrayList<>(), context); - rootFragment.getChildren().add(firstInputFragment); - QueryServerInstance brokerInstance = new QueryServerInstance(physicalPlannerContext.getInstanceId(), - physicalPlannerContext.getHostName(), physicalPlannerContext.getPort(), physicalPlannerContext.getPort()); - computeMailboxInfos(FIRST_NON_ROOT_FRAGMENT_ID, ROOT_FRAGMENT_ID, - createWorkerMap(rootPRelNode.getPinotDataDistributionOrThrow().getWorkers(), context), - ImmutableMap.of(0, brokerInstance), ExchangeStrategy.SINGLETON_EXCHANGE, context); // Traverse entire tree. - context._fragmentMetadataMap.get(ROOT_FRAGMENT_ID).setWorkerIdToServerInstanceMap(ImmutableMap.of( - 0, brokerInstance)); - visit(rootPRelNode, sendNode, firstInputFragment, context); + process(rootPRelNode, null, ROOT_FRAGMENT_ID, context); Result result = new Result(); result._fragmentMetadataMap = context._fragmentMetadataMap; result._planFragmentMap = context._planFragmentMap; return result; } - /** - * Invariants: 1. Parent PlanNode does not have current node in input yet. 2. This node is NOT the fragment root. This - * is because each fragment root is a MailboxSendNode. - */ - private void visit(PRelNode pRelNode, @Nullable PlanNode parent, PlanFragment currentFragment, Context context) { - int currentFragmentId = currentFragment.getFragmentId(); - DispatchablePlanMetadata fragmentMetadata = context._fragmentMetadataMap.get(currentFragmentId); - if (MapUtils.isEmpty(fragmentMetadata.getWorkerIdToServerInstanceMap())) { - // TODO: This is quite a complex invariant. - fragmentMetadata.setWorkerIdToServerInstanceMap(createWorkerMap( - pRelNode.getPinotDataDistributionOrThrow().getWorkers(), context)); - } + private void process(PRelNode pRelNode, @Nullable PlanNode parent, int currentFragmentId, Context context) { if (pRelNode.unwrap() instanceof TableScan) { - TableScanMetadata tableScanMetadata = Objects.requireNonNull(pRelNode.getTableScanMetadata(), - "No metadata in table scan PRelNode"); - String tableName = tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow(); - if (!tableScanMetadata.getUnavailableSegmentsMap().isEmpty()) { - fragmentMetadata.addUnavailableSegments(tableName, - tableScanMetadata.getUnavailableSegmentsMap().get(tableName)); - } - fragmentMetadata.addScannedTable(tableName); - fragmentMetadata.setWorkerIdToSegmentsMap(tableScanMetadata.getWorkedIdToSegmentsMap()); - NodeHint nodeHint = NodeHint.fromRelHints(((TableScan) pRelNode.unwrap()).getHints()); - fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS)); - if (tableScanMetadata.getTimeBoundaryInfo() != null) { - fragmentMetadata.setTimeBoundaryInfo(tableScanMetadata.getTimeBoundaryInfo()); - } + processTableScan((PhysicalTableScan) pRelNode.unwrap(), currentFragmentId, context); } if (pRelNode.unwrap() instanceof PhysicalExchange) { + // Split an exchange into two fragments: one for the sender and one for the receiver. + // The sender fragment will have a MailboxSendNode and receiver a MailboxReceiveNode. + // It is possible that the receiver fragment doesn't exist yet (e.g. when PhysicalExchange is the root node). + // In that case, we also create it here. If it exists already, we simply re-use it. PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap(); - int senderFragmentId = context._planFragmentMap.size(); + PlanFragment receiverFragment = context._planFragmentMap.get(currentFragmentId); + int senderFragmentId = context._planFragmentMap.size() + (receiverFragment == null ? 1 : 0); final DataSchema inputFragmentSchema = PRelToPlanNodeConverter.toDataSchema( pRelNode.getPRelInput(0).unwrap().getRowType()); RelDistribution.Type distributionType = ExchangeStrategy.getRelDistribution( physicalExchange.getExchangeStrategy(), physicalExchange.getDistributionKeys()).getType(); - List<PlanNode> inputs = new ArrayList<>(); - MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, inputFragmentSchema, inputs, currentFragmentId, - PinotRelExchangeType.getDefaultExchangeType(), distributionType, physicalExchange.getDistributionKeys(), - false, physicalExchange.getRelCollation().getFieldCollations(), false /* todo: set sortOnSender */); + MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, inputFragmentSchema, new ArrayList<>(), + currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(), distributionType, + physicalExchange.getDistributionKeys(), false, physicalExchange.getRelCollation().getFieldCollations(), + false /* sort on sender */); MailboxReceiveNode receiveNode = new MailboxReceiveNode(currentFragmentId, inputFragmentSchema, senderFragmentId, PinotRelExchangeType.getDefaultExchangeType(), distributionType, physicalExchange.getDistributionKeys(), physicalExchange.getRelCollation().getFieldCollations(), - false /* TODO: set sort on receiver */, false /* TODO: set sort on sender */, sendNode); - PlanFragment newPlanFragment = createFragment(senderFragmentId, sendNode, new ArrayList<>(), context); + !physicalExchange.getRelCollation().getFieldCollations().isEmpty(), false, sendNode); + if (receiverFragment == null) { + /* + * If the root node is an exchange, then the root fragment will not exist yet. We create it here. + */ + receiverFragment = createFragment(currentFragmentId, receiveNode, new ArrayList<>(), context, + pRelNode.getPinotDataDistributionOrThrow().getWorkers()); + } + PlanFragment newPlanFragment = createFragment(senderFragmentId, sendNode, new ArrayList<>(), context, + physicalExchange.getPRelInputs().get(0).getPinotDataDistributionOrThrow().getWorkers()); Map<Integer, QueryServerInstance> senderWorkers = createWorkerMap(pRelNode.getPRelInput(0) .getPinotDataDistributionOrThrow().getWorkers(), context); Map<Integer, QueryServerInstance> receiverWorkers = createWorkerMap(pRelNode.getPinotDataDistributionOrThrow() .getWorkers(), context); computeMailboxInfos(senderFragmentId, currentFragmentId, senderWorkers, receiverWorkers, physicalExchange.getExchangeStrategy(), context); - currentFragment.getChildren().add(newPlanFragment); - visit(pRelNode.getPRelInput(0), sendNode, newPlanFragment, context); + context._planFragmentMap.get(currentFragmentId).getChildren().add(newPlanFragment); + process(pRelNode.getPRelInput(0), sendNode, newPlanFragment.getFragmentId(), context); if (parent != null) { parent.getInputs().add(receiveNode); } return; } + // Convert PRelNode to PlanNode, and create parent/input PlanNode tree. PlanNode planNode = PRelToPlanNodeConverter.toPlanNode(pRelNode, currentFragmentId); + if (context._planFragmentMap.isEmpty()) { + /* + * If the root-node is NOT an exchange, then we create the root fragment here. If it's an exchange, it will be + * created in the process of handling the exchange. + */ + createFragment(ROOT_FRAGMENT_ID, planNode, new ArrayList<>(), context, + pRelNode.getPinotDataDistributionOrThrow().getWorkers()); + } for (PRelNode input : pRelNode.getPRelInputs()) { - visit(input, planNode, currentFragment, context); + process(input, planNode, currentFragmentId, context); } if (parent != null) { parent.getInputs().add(planNode); } } + private void processTableScan(PhysicalTableScan tableScan, int currentFragmentId, Context context) { + DispatchablePlanMetadata fragmentMetadata = context._fragmentMetadataMap.get(currentFragmentId); + TableScanMetadata tableScanMetadata = Objects.requireNonNull(tableScan.getTableScanMetadata(), + "No metadata in table scan PRelNode"); + String tableName = tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow(); + if (!tableScanMetadata.getUnavailableSegmentsMap().isEmpty()) { + fragmentMetadata.addUnavailableSegments(tableName, + tableScanMetadata.getUnavailableSegmentsMap().get(tableName)); + } + fragmentMetadata.addScannedTable(tableName); + fragmentMetadata.setWorkerIdToSegmentsMap(tableScanMetadata.getWorkedIdToSegmentsMap()); + NodeHint nodeHint = NodeHint.fromRelHints(tableScan.getHints()); + fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS)); + if (tableScanMetadata.getTimeBoundaryInfo() != null) { + fragmentMetadata.setTimeBoundaryInfo(tableScanMetadata.getTimeBoundaryInfo()); + } + } + private PlanFragment createFragment(int fragmentId, PlanNode planNode, List<PlanFragment> inputFragments, - Context context) { + Context context, List<String> workers) { + // track new plan fragment PlanFragment fragment = new PlanFragment(fragmentId, planNode, inputFragments); context._planFragmentMap.put(fragmentId, fragment); - context._fragmentMetadataMap.put(fragmentId, new DispatchablePlanMetadata()); + // add fragment metadata + DispatchablePlanMetadata fragmentMetadata = new DispatchablePlanMetadata(); + fragmentMetadata.setWorkerIdToServerInstanceMap(createWorkerMap(workers, context)); + context._fragmentMetadataMap.put(fragmentId, fragmentMetadata); return fragment; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java index aa8420e198..c3dd8f9de8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/PhysicalOptRuleSet.java @@ -28,6 +28,7 @@ import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageBoundaryRul import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule; import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeSortInsertRule; import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeWorkerAssignmentRule; +import org.apache.pinot.query.planner.physical.v2.opt.rules.RootExchangeInsertRule; import org.apache.pinot.query.planner.physical.v2.opt.rules.SortPushdownRule; import org.apache.pinot.query.planner.physical.v2.opt.rules.WorkerExchangeAssignmentRule; @@ -48,6 +49,7 @@ public class PhysicalOptRuleSet { if (context.isUseLiteMode()) { transformers.add(create(new LiteModeSortInsertRule(context), RuleExecutors.Type.POST_ORDER, context)); } + transformers.add(new RootExchangeInsertRule(context)); return transformers; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java new file mode 100644 index 0000000000..714f096a31 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/RootExchangeInsertRule.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.physical.v2.opt.rules; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.calcite.rel.RelCollation; +import org.apache.calcite.rel.RelDistribution; +import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait; +import org.apache.pinot.query.context.PhysicalPlannerContext; +import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy; +import org.apache.pinot.query.planner.physical.v2.PRelNode; +import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalExchange; +import org.apache.pinot.query.planner.physical.v2.opt.PRelNodeTransformer; + + +/** + * Adds an exchange node at the root of the plan if the root node is not already located as a singleton on the broker. + * This is because the entire data needs to be returned by the broker to the client. + */ +public class RootExchangeInsertRule implements PRelNodeTransformer { + private final PhysicalPlannerContext _context; + + public RootExchangeInsertRule(PhysicalPlannerContext context) { + _context = context; + } + + @Override + public PRelNode execute(PRelNode currentNode) { + PinotDataDistribution rootDataDistribution = currentNode.getPinotDataDistributionOrThrow(); + List<String> workers = List.of(brokerWorkerId()); + if (rootDataDistribution.getWorkers().equals(workers)) { + // If the root node is already distributed to the broker, no need to insert an exchange. + return currentNode; + } + PinotDataDistribution pinotDataDistribution = new PinotDataDistribution(RelDistribution.Type.SINGLETON, + workers, workers.hashCode(), null, inferCollation(currentNode)); + return new PhysicalExchange(nodeId(), currentNode, pinotDataDistribution, List.of(), + ExchangeStrategy.SINGLETON_EXCHANGE, null, PinotExecStrategyTrait.getDefaultExecStrategy()); + } + + private String brokerWorkerId() { + return String.format("0@%s", _context.getInstanceId()); + } + + private int nodeId() { + return _context.getNodeIdGenerator().get(); + } + + /** + * If the current node is distributed to a single worker, inherit the collation trait from it. Otherwise, return null. + */ + @Nullable + private RelCollation inferCollation(PRelNode currentNode) { + // Infer collation from the current node if needed. + if (currentNode.getPinotDataDistributionOrThrow().getWorkers().size() != 1) { + return null; + } + return currentNode.getPinotDataDistributionOrThrow().getCollation(); + } +} diff --git a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json index acab86c39c..d9ef04fa2a 100644 --- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json +++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json @@ -6,16 +6,17 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT a.col1, a.ts, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$0], dir0=[ASC])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalProject(col1=[$0], ts=[$1], col3=[$3])", - "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col1=[$0], ts=[$7])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col1=[$0], ts=[$1], col3=[$3])", + "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col1=[$0], ts=[$7])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -24,16 +25,17 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT a.col1 AS value1, a.ts AS ts1, b.col3 FROM a JOIN b ON a.col1 = b.col2 ORDER BY a.col1", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$0], dir0=[ASC])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])", - "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col1=[$0], ts=[$7])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])", + "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col1=[$0], ts=[$7])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -42,11 +44,12 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalJoin(condition=[=($0, $10)], joinType=[inner])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -55,12 +58,13 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalFilter(condition=[>=($2, 0)])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalJoin(condition=[=($0, $10)], joinType=[inner])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -69,12 +73,13 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 >= 0 AND a.col3 > b.col3", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[AND(=($0, $10), >($2, $11))], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalFilter(condition=[>=($2, 0)])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalJoin(condition=[AND(=($0, $10), >($2, $11))], joinType=[inner])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -83,11 +88,12 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM a JOIN b on a.col1 = b.col1 AND a.col2 = b.col2", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[AND(=($0, $9), =($1, $10))], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalJoin(condition=[AND(=($0, $9), =($1, $10))], joinType=[inner])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] } @@ -100,14 +106,15 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)", "output": [ "Execution Plan", - "\nPhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -116,14 +123,15 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM b WHERE col3 IN (SELECT col3 FROM b)", "output": [ "Execution Plan", - "\nPhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, b]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -132,25 +140,26 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b WHERE col2='foo') AND col3 IN (SELECT col3 FROM b WHERE col2='bar') AND col3 IN (SELECT col3 FROM b WHERE col2='lorem')", "output": [ "Execution Plan", - "\nPhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, b]])", "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", "\n PhysicalTableScan(table=[[default, b]])", "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", - "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -159,29 +168,30 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b WHERE col2='foo') AND col3 NOT IN (SELECT col3 FROM b WHERE col2='bar') AND col3 IN (SELECT col3 FROM b WHERE col2='lorem')", "output": [ "Execution Plan", - "\nPhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[IS NOT TRUE($5)])", - "\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2], col31=[$2])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[IS NOT TRUE($5)])", + "\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2], col31=[$2])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, b]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", + "\n PhysicalProject(col3=[$2], $f1=[true])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", - "\n PhysicalProject(col3=[$2], $f1=[true])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", - "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", - "\n PhysicalTableScan(table=[[default, b]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -190,30 +200,31 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b WHERE col2='foo') AND col3 NOT IN (SELECT col3 FROM a WHERE col2='bar') AND col3 IN (SELECT col3 FROM b WHERE col2='lorem')", "output": [ "Execution Plan", - "\nPhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[IS NOT TRUE($5)])", - "\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2], col31=[$2])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", - "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[FINAL])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[LEAF])", - "\n PhysicalProject(col3=[$2], $f1=[true])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[IS NOT TRUE($5)])", + "\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2], col31=[$2])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", + "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalProject(col3=[$2])", - "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", - "\n PhysicalTableScan(table=[[default, b]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, b]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[LEAF])", + "\n PhysicalProject(col3=[$2], $f1=[true])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalProject(col3=[$2])", + "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] } @@ -226,19 +237,20 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 IN (SELECT col2 FROM a WHERE col3 = 'bar')", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", - "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -247,28 +259,29 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 NOT IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a WHERE col3 = 'lorem')", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalFilter(condition=[IS NOT TRUE($4)])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])", - "\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])", - "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalFilter(condition=[IS NOT TRUE($4)])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])", + "\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])", + "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", - "\n PhysicalProject(col2=[$1], $f1=[true])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER NOT NULL)])", - "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", + "\n PhysicalProject(col2=[$1], $f1=[true])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -277,31 +290,32 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, COUNT(*) FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 NOT IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a WHERE col3 = 'lorem') GROUP BY col1", "output": [ "Execution Plan", - "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[LEAF])", - "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalFilter(condition=[IS NOT TRUE($4)])", - "\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])", - "\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])", - "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col1=[$0], col2=[$1])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[LEAF])", + "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\n PhysicalProject(col1=[$0], col2=[$1])", + "\n PhysicalFilter(condition=[IS NOT TRUE($4)])", + "\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])", + "\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])", + "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", - "\n PhysicalProject(col2=[$1], $f1=[true])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER NOT NULL)])", - "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", + "\n PhysicalProject(col2=[$1], $f1=[true])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] } @@ -314,14 +328,15 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_leaf_return_final_result='true', is_enable_group_trim='true') */ col1, COUNT(DISTINCT col2) AS cnt FROM a WHERE col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])", - "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true], collations=[[1 DESC]], limit=[10])", - "\n PhysicalFilter(condition=[>=($2, 0)])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true], collations=[[1 DESC]], limit=[10])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -330,15 +345,16 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])", - "\n PhysicalProject(cnt=[$1])", - "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], limit=[10])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])", - "\n PhysicalFilter(condition=[>=($2, 0)])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])", + "\n PhysicalProject(cnt=[$1])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], limit=[10])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -347,14 +363,15 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10", "output": [ "Execution Plan", - "\nPhysicalSort(fetch=[10])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(fetch=[10])", - "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], limit=[10])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", - "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], limit=[10])", - "\n PhysicalFilter(condition=[>=($2, 0)])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[10])", + "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], limit=[10])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", + "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], limit=[10])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] } @@ -367,24 +384,25 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR WITH tmp AS (SELECT col2 FROM a WHERE col1 = 'foo' UNION ALL SELECT col2 FROM a WHERE col3 = 'bar'), tmp2 AS (SELECT DISTINCT col2 FROM tmp) SELECT COUNT(*), col3 FROM a WHERE col2 IN (SELECT col2 FROM tmp2) GROUP BY col3", "output": [ "Execution Plan", - "\nPhysicalProject(EXPR$0=[$1], col3=[$0])", - "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", - "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[LEAF])", - "\n PhysicalJoin(condition=[=($0, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalAggregate(group=[{0}], aggType=[DIRECT])", - "\n PhysicalUnion(all=[true])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(EXPR$0=[$1], col3=[$0])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[LEAF])", + "\n PhysicalJoin(condition=[=($0, $2)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalAggregate(group=[{0}], aggType=[DIRECT])", + "\n PhysicalUnion(all=[true])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] } @@ -397,11 +415,12 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' ORDER BY col2", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$0], dir0=[ASC])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -410,12 +429,13 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' ORDER BY col2 LIMIT 10", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -424,12 +444,13 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' ORDER BY col2 LIMIT 10, 11", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -438,12 +459,13 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' LIMIT 10, 11", "output": [ "Execution Plan", - "\nPhysicalSort(offset=[10], fetch=[11])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(fetch=[21])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(offset=[10], fetch=[11])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[21])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -452,11 +474,12 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, COUNT(*) as cnt FROM a GROUP BY col2 ORDER BY cnt LIMIT 100", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -465,11 +488,12 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col2, COUNT(*) as cnt FROM b GROUP BY col2 ORDER BY cnt LIMIT 100", "output": [ "Execution Plan", - "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", - "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] } @@ -482,10 +506,11 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo'", "output": [ "Execution Plan", - "\nPhysicalSort(fetch=[100000])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[100000])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -494,9 +519,10 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT col2, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col2", "output": [ "Execution Plan", - "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT], limit=[100000])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT], limit=[100000])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -505,14 +531,15 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR WITH tmp AS (SELECT col1, col2, col3, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col1, col2, col3) SELECT * FROM (SELECT ROW_NUMBER() OVER (PARTITION BY col2 ORDER BY col3) as rnk, col1 FROM tmp) WHERE rnk = 1", "output": [ "Execution Plan", - "\nPhysicalProject(rnk=[$3], col1=[$0])", - "\n PhysicalFilter(condition=[=($3, 1)])", - "\n PhysicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", - "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], limit=[100000])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(rnk=[$3], col1=[$0])", + "\n PhysicalFilter(condition=[=($3, 1)])", + "\n PhysicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", + "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], limit=[100000])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] }, @@ -521,12 +548,13 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR WITH tmp AS (SELECT col1, col2, col3, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col1, col2, col3 ORDER BY col2) SELECT * FROM tmp LIMIT 100,400", "output": [ "Execution Plan", - "\nPhysicalSort(offset=[100], fetch=[400])", - "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], aggType=[FINAL])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(offset=[100], fetch=[400])", + "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", "\n" ] } @@ -539,16 +567,17 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT col2, col3 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT col1 FROM b)", "output": [ "Execution Plan", - "\nPhysicalJoin(condition=[=($0, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(fetch=[100000])", - "\n PhysicalProject(col2=[$1], col3=[$2])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(fetch=[100000])", - "\n PhysicalProject(col1=[$0])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalJoin(condition=[=($0, $2)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[100000])", + "\n PhysicalProject(col2=[$1], col3=[$2])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[100000])", + "\n PhysicalProject(col1=[$0])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] }, @@ -557,18 +586,19 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT COUNT(*), col2 FROM a WHERE col1 = 'foo' AND col2 IN (SELECT col1 FROM b) GROUP BY col2", "output": [ "Execution Plan", - "\nPhysicalProject(EXPR$0=[$1], col2=[$0])", - "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[DIRECT])", - "\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(fetch=[100000])", - "\n PhysicalProject(col2=[$1])", - "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", - "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", - "\n PhysicalSort(fetch=[100000])", - "\n PhysicalProject(col1=[$0])", - "\n PhysicalTableScan(table=[[default, b]])", + "\nPhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalProject(EXPR$0=[$1], col2=[$0])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[DIRECT])", + "\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[100000])", + "\n PhysicalProject(col2=[$1])", + "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[100000])", + "\n PhysicalProject(col1=[$0])", + "\n PhysicalTableScan(table=[[default, b]])", "\n" ] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org