This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2de0888e96 [multistage] Bug Fixes and Improvements to Physical Optimizer (#15813) 2de0888e96 is described below commit 2de0888e96e8d8ae0b322cfdc7ca7ce466b03bce Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Fri May 16 17:29:37 2025 -0500 [multistage] Bug Fixes and Improvements to Physical Optimizer (#15813) --- .../planner/physical/v2/ExchangeStrategy.java | 66 ++++++- .../v2/PlanFragmentAndMailboxAssignment.java | 25 +-- .../planner/physical/v2/RelToPRelConverter.java | 4 +- .../physical/v2/nodes/PhysicalAggregate.java | 15 +- .../physical/v2/nodes/PhysicalExchange.java | 29 +-- .../planner/physical/v2/ExchangeStrategyTest.java | 69 +++++++ .../resources/queries/PhysicalOptimizerPlans.json | 205 +++++++++++++-------- 7 files changed, 281 insertions(+), 132 deletions(-) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java index 82441f47aa..43d7974750 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java @@ -18,6 +18,11 @@ */ package org.apache.pinot.query.planner.physical.v2; +import java.util.List; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.RelDistributions; + + /** * Defines how data is transferred across an Exchange. */ @@ -25,17 +30,17 @@ public enum ExchangeStrategy { /** * There's a single stream in the receiver, so each stream in the sender sends data to the same. */ - SINGLETON_EXCHANGE, + SINGLETON_EXCHANGE(false), /** * stream-ID X sends data to stream-ID X. This cannot be modeled by PARTITIONING_EXCHANGE because the fan-out for * this type of exchange is 1:1. */ - IDENTITY_EXCHANGE, + IDENTITY_EXCHANGE(false), /** * Each stream will partition the outgoing stream based on a set of keys and a hash function. * Fanout for this type of exchange is 1:all. */ - PARTITIONING_EXCHANGE, + PARTITIONING_EXCHANGE(true), /** * stream-ID X will sub-partition: i.e. divide the stream so that the data is sent to the streams * {@code X, X + F, X + 2*F, ...}. Here F is the sub-partitioning factor. Records are assigned based on a @@ -43,25 +48,70 @@ public enum ExchangeStrategy { * partition counts divides the other. * <b>Note:</b> This is different and better than partitioning exchange because the fanout is F, and not N * (N*F). */ - SUB_PARTITIONING_HASH_EXCHANGE, + SUB_PARTITIONING_HASH_EXCHANGE(true), /** * Same as above but records are sub-partitioned in a round-robin way. This will increase parallelism but lose * data partitioning. */ - SUB_PARTITIONING_RR_EXCHANGE, + SUB_PARTITIONING_RR_EXCHANGE(false), /** * Similar to sub-partitioning, except it does the inverse and merges partitions. Partitions are merged in a way * that we still preserve partitions, but only change the partition count. i.e. if current partition count is 16, * and we want 8 partitions, then partition-0 in the receiver will receive data from partition-0 and partition-8 * in the sender. */ - COALESCING_PARTITIONING_EXCHANGE, + COALESCING_PARTITIONING_EXCHANGE(true), /** * Each stream will send data to all receiving streams. */ - BROADCAST_EXCHANGE, + BROADCAST_EXCHANGE(false), /** * Records are sent randomly from a given worker in the sender to some worker in the receiver. */ - RANDOM_EXCHANGE + RANDOM_EXCHANGE(false); + + /** + * This is true when the Exchange Strategy is such that it requires a List<Integer> representing the + * distribution keys. The list must be non-empty. + */ + private final boolean _requireKeys; + + /** + * See {@link #_requireKeys}. + */ + public boolean isRequireKeys() { + return _requireKeys; + } + + ExchangeStrategy(boolean requireKeys) { + _requireKeys = requireKeys; + } + + public static RelDistribution getRelDistribution(ExchangeStrategy exchangeStrategy, List<Integer> keys) { + if (exchangeStrategy.isRequireKeys() && keys.isEmpty()) { + throw new IllegalStateException(String.format("ExchangeStrategy=%s requires distribution keys, but none found", + exchangeStrategy)); + } else if (!exchangeStrategy.isRequireKeys() && !keys.isEmpty()) { + throw new IllegalStateException(String.format( + "ExchangeStrategy=%s does not require distribution keys but found %s", exchangeStrategy, keys)); + } + switch (exchangeStrategy) { + case PARTITIONING_EXCHANGE: + case SUB_PARTITIONING_HASH_EXCHANGE: + case COALESCING_PARTITIONING_EXCHANGE: + return RelDistributions.hash(keys); + case IDENTITY_EXCHANGE: + return RelDistributions.hash(List.of()); + case BROADCAST_EXCHANGE: + return RelDistributions.BROADCAST_DISTRIBUTED; + case SINGLETON_EXCHANGE: + return RelDistributions.SINGLETON; + case SUB_PARTITIONING_RR_EXCHANGE: + return RelDistributions.ROUND_ROBIN_DISTRIBUTED; + case RANDOM_EXCHANGE: + return RelDistributions.RANDOM_DISTRIBUTED; + default: + throw new IllegalStateException(String.format("Unexpected exchange strategy: %s", exchangeStrategy)); + } + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java index a7733b1a28..88761f0bcf 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java @@ -116,7 +116,8 @@ public class PlanFragmentAndMailboxAssignment { int senderFragmentId = context._planFragmentMap.size(); final DataSchema inputFragmentSchema = PRelToPlanNodeConverter.toDataSchema( pRelNode.getPRelInput(0).unwrap().getRowType()); - RelDistribution.Type distributionType = inferDistributionType(physicalExchange.getExchangeStrategy()); + RelDistribution.Type distributionType = ExchangeStrategy.getRelDistribution( + physicalExchange.getExchangeStrategy(), physicalExchange.getDistributionKeys()).getType(); List<PlanNode> inputs = new ArrayList<>(); MailboxSendNode sendNode = new MailboxSendNode(senderFragmentId, inputFragmentSchema, inputs, currentFragmentId, PinotRelExchangeType.getDefaultExchangeType(), distributionType, physicalExchange.getDistributionKeys(), @@ -156,27 +157,6 @@ public class PlanFragmentAndMailboxAssignment { return fragment; } - private RelDistribution.Type inferDistributionType(ExchangeStrategy desc) { - RelDistribution.Type distributionType; - switch (desc) { - case PARTITIONING_EXCHANGE: - distributionType = RelDistribution.Type.HASH_DISTRIBUTED; - break; - case IDENTITY_EXCHANGE: - distributionType = RelDistribution.Type.HASH_DISTRIBUTED; - break; - case SINGLETON_EXCHANGE: - distributionType = RelDistribution.Type.SINGLETON; - break; - case BROADCAST_EXCHANGE: - distributionType = RelDistribution.Type.BROADCAST_DISTRIBUTED; - break; - default: - throw new IllegalStateException(""); - } - return distributionType; - } - private Map<Integer, QueryServerInstance> createWorkerMap(List<String> workers, Context context) { Map<Integer, QueryServerInstance> mp = new HashMap<>(); for (String instance : workers) { @@ -225,6 +205,7 @@ public class PlanFragmentAndMailboxAssignment { .put(senderStageId, new SharedMailboxInfos(mailboxInfoListForReceiver)); break; } + case RANDOM_EXCHANGE: case PARTITIONING_EXCHANGE: case BROADCAST_EXCHANGE: { MailboxInfos mailboxInfoListForSender = new MailboxInfos(createMailboxInfo(receiverWorkers)); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java index 67a24c75fe..f3cd36452e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java @@ -94,9 +94,11 @@ public class RelToPRelConverter { } else if (relNode instanceof PinotLogicalAggregate) { Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input of agg. Found: %s", inputs); PinotLogicalAggregate aggRel = (PinotLogicalAggregate) relNode; + // Use AggType.DIRECT here because at this point aggregation split hasn't happened yet. + AggregateNode.AggType aggType = AggregateNode.AggType.DIRECT; return new PhysicalAggregate(aggRel.getCluster(), aggRel.getTraitSet(), aggRel.getHints(), aggRel.getGroupSet(), aggRel.getGroupSets(), aggRel.getAggCallList(), nodeIdGenerator.get(), inputs.get(0), null, false, - AggregateNode.AggType.DIRECT, false, List.of(), 0); + aggType, aggRel.isLeafReturnFinalResult(), aggRel.getCollations(), aggRel.getLimit()); } else if (relNode instanceof Join) { Preconditions.checkState(relNode.getInputs().size() == 2, "Expected exactly 2 inputs to join. Found: %s", inputs); Join join = (Join) relNode; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java index 1306c8d794..149795eb46 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalAggregate.java @@ -24,6 +24,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.hint.RelHint; @@ -51,7 +52,7 @@ public class PhysicalAggregate extends Aggregate implements PRelNode { public PhysicalAggregate(RelOptCluster cluster, RelTraitSet traitSet, List<RelHint> hints, ImmutableBitSet groupSet, @Nullable List<ImmutableBitSet> groupSets, List<AggregateCall> aggCalls, int nodeId, PRelNode pRelInput, @Nullable PinotDataDistribution pinotDataDistribution, boolean leafStage, - AggregateNode.AggType aggType, boolean leafReturnFinalResult, List<RelFieldCollation> collations, + AggregateNode.AggType aggType, boolean leafReturnFinalResult, @Nullable List<RelFieldCollation> collations, int limit) { super(cluster, traitSet, hints, pRelInput.unwrap(), groupSet, groupSets, aggCalls); _nodeId = nodeId; @@ -60,7 +61,7 @@ public class PhysicalAggregate extends Aggregate implements PRelNode { _leafStage = leafStage; _aggType = aggType; _leafReturnFinalResult = leafReturnFinalResult; - _collations = collations; + _collations = collations == null ? List.of() : collations; _limit = limit; } @@ -114,6 +115,16 @@ public class PhysicalAggregate extends Aggregate implements PRelNode { _collations, _limit); } + @Override + public RelWriter explainTerms(RelWriter pw) { + RelWriter relWriter = super.explainTerms(pw); + relWriter.item("aggType", _aggType); + relWriter.itemIf("leafReturnFinalResult", true, _leafReturnFinalResult); + relWriter.itemIf("collations", _collations, !_collations.isEmpty()); + relWriter.itemIf("limit", _limit, _limit > 0); + return relWriter; + } + public AggregateNode.AggType getAggType() { return _aggType; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java index 66846faff7..b00d786f00 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/nodes/PhysicalExchange.java @@ -26,10 +26,10 @@ import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelDistribution; -import org.apache.calcite.rel.RelDistributions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Exchange; +import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType; import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait; import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTraitDef; @@ -81,7 +81,7 @@ public class PhysicalExchange extends Exchange implements PRelNode { List<Integer> distributionKeys, ExchangeStrategy exchangeStrategy, @Nullable RelCollation relCollation, PinotExecStrategyTrait execStrategyTrait) { super(input.unwrap().getCluster(), EMPTY_TRAIT_SET.plus(execStrategyTrait), input.unwrap(), - getRelDistribution(exchangeStrategy, distributionKeys)); + ExchangeStrategy.getRelDistribution(exchangeStrategy, distributionKeys)); _nodeId = nodeId; _pRelInputs = Collections.singletonList(input); _pinotDataDistribution = pinotDataDistribution; @@ -161,26 +161,9 @@ public class PhysicalExchange extends Exchange implements PRelNode { @Override public RelWriter explainTerms(RelWriter pw) { return pw.item("input", input) .item("exchangeStrategy", _exchangeStrategy) - .item("distKeys", _distributionKeys) - .item("execStrategy", getRelExchangeType()) - .item("collation", _relCollation); - } - - private static RelDistribution getRelDistribution(ExchangeStrategy exchangeStrategy, List<Integer> keys) { - switch (exchangeStrategy) { - case IDENTITY_EXCHANGE: - case PARTITIONING_EXCHANGE: - case SUB_PARTITIONING_HASH_EXCHANGE: - case COALESCING_PARTITIONING_EXCHANGE: - return RelDistributions.hash(keys); - case BROADCAST_EXCHANGE: - return RelDistributions.BROADCAST_DISTRIBUTED; - case SINGLETON_EXCHANGE: - return RelDistributions.SINGLETON; - case SUB_PARTITIONING_RR_EXCHANGE: - return RelDistributions.ROUND_ROBIN_DISTRIBUTED; - default: - throw new IllegalStateException(String.format("Unexpected exchange strategy: %s", exchangeStrategy)); - } + .itemIf("distKeys", _distributionKeys, CollectionUtils.isNotEmpty(_distributionKeys)) + .itemIf("execStrategy", getRelExchangeType(), + getRelExchangeType() != PinotRelExchangeType.getDefaultExchangeType()) + .itemIf("collation", _relCollation, CollectionUtils.isNotEmpty(_relCollation.getFieldCollations())); } } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java new file mode 100644 index 0000000000..b0bc6ea11c --- /dev/null +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategyTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.physical.v2; + +import java.util.List; +import org.apache.calcite.rel.RelDistribution; +import org.testng.annotations.Test; + +import static org.testng.Assert.*; + + +public class ExchangeStrategyTest { + @Test + public void testGetRelDistribution() { + // Ensure each exchange strategy can be mapped to a RelDistribution. + for (ExchangeStrategy exchangeStrategy : ExchangeStrategy.values()) { + List<Integer> keys = exchangeStrategy.isRequireKeys() ? List.of(1) : List.of(); + assertNotNull(ExchangeStrategy.getRelDistribution(exchangeStrategy, keys)); + } + // Manual check for each exchange strategy. + // Start with hash types + List<ExchangeStrategy> hashExchangeStrategies = List.of(ExchangeStrategy.PARTITIONING_EXCHANGE, + ExchangeStrategy.SUB_PARTITIONING_HASH_EXCHANGE, ExchangeStrategy.COALESCING_PARTITIONING_EXCHANGE); + for (ExchangeStrategy exchangeStrategy : hashExchangeStrategies) { + assertEquals(ExchangeStrategy.getRelDistribution(exchangeStrategy, List.of(1)).getType(), + RelDistribution.Type.HASH_DISTRIBUTED); + } + // Singleton exchange + assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.SINGLETON_EXCHANGE, List.of()).getType(), + RelDistribution.Type.SINGLETON); + // Random exchange + assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.RANDOM_EXCHANGE, List.of()).getType(), + RelDistribution.Type.RANDOM_DISTRIBUTED); + // Broadcast exchange + assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.BROADCAST_EXCHANGE, List.of()).getType(), + RelDistribution.Type.BROADCAST_DISTRIBUTED); + // Sub-partitioning round robin exchange + assertEquals(ExchangeStrategy.getRelDistribution(ExchangeStrategy.SUB_PARTITIONING_RR_EXCHANGE, + List.of()).getType(), RelDistribution.Type.ROUND_ROBIN_DISTRIBUTED); + } + + @Test + public void testGetRelDistributionInvalid() { + // Test case when empty keys list but strategy requires keys + assertThrows(IllegalStateException.class, () -> { + ExchangeStrategy.getRelDistribution(ExchangeStrategy.PARTITIONING_EXCHANGE, List.of()); + }); + // Test case when non-empty keys list but strategy does not accept keys + assertThrows(IllegalStateException.class, () -> { + ExchangeStrategy.getRelDistribution(ExchangeStrategy.IDENTITY_EXCHANGE, List.of(1)); + }); + } +} diff --git a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json index af9048bec3..acab86c39c 100644 --- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json +++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json @@ -7,13 +7,13 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$0], dir0=[ASC])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalProject(col1=[$0], ts=[$1], col3=[$3])", "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col1=[$0], ts=[$7])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, b]])", "\n" @@ -25,13 +25,13 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$0], dir0=[ASC])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalProject(value1=[$0], ts1=[$1], col3=[$3])", "\n PhysicalJoin(condition=[=($0, $2)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col1=[$0], ts=[$7])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, b]])", "\n" @@ -43,9 +43,9 @@ "output": [ "Execution Plan", "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]])", "\n PhysicalTableScan(table=[[default, b]])", "\n" ] @@ -56,10 +56,10 @@ "output": [ "Execution Plan", "\nPhysicalJoin(condition=[=($0, $10)], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalFilter(condition=[>=($2, 0)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[1]])", "\n PhysicalTableScan(table=[[default, b]])", "\n" ] @@ -70,10 +70,10 @@ "output": [ "Execution Plan", "\nPhysicalJoin(condition=[AND(=($0, $10), >($2, $11))], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalFilter(condition=[>=($2, 0)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[BROADCAST_EXCHANGE])", "\n PhysicalTableScan(table=[[default, b]])", "\n" ] @@ -84,9 +84,9 @@ "output": [ "Execution Plan", "\nPhysicalJoin(condition=[AND(=($0, $9), =($1, $10))], joinType=[inner])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", "\n PhysicalTableScan(table=[[default, b]])", "\n" ] @@ -102,10 +102,10 @@ "Execution Plan", "\nPhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalTableScan(table=[[default, b]])", "\n" @@ -118,10 +118,10 @@ "Execution Plan", "\nPhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col3=[$2])", "\n PhysicalTableScan(table=[[default, b]])", "\n" @@ -136,18 +136,18 @@ "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", "\n PhysicalTableScan(table=[[default, b]])", @@ -166,19 +166,19 @@ "\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2], col31=[$2])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", "\n PhysicalProject(col3=[$2], $f1=[true])", "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", "\n PhysicalTableScan(table=[[default, b]])", @@ -197,20 +197,20 @@ "\n PhysicalJoin(condition=[=($3, $4)], joinType=[left])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2], col31=[$2])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[2]])", "\n PhysicalProject(col1=[$0], col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, b]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[LEAF])", "\n PhysicalProject(col3=[$2], $f1=[true])", "\n PhysicalFilter(condition=[=($1, _UTF-8'bar')])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", "\n PhysicalProject(col3=[$2])", "\n PhysicalFilter(condition=[=($1, _UTF-8'lorem')])", "\n PhysicalTableScan(table=[[default, b]])", @@ -228,14 +228,14 @@ "Execution Plan", "\nPhysicalJoin(condition=[=($1, $2)], joinType=[semi])", "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", @@ -253,19 +253,19 @@ "\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])", "\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])", "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", "\n PhysicalProject(col2=[$1], $f1=[true])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", @@ -277,28 +277,28 @@ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT col1, COUNT(*) FROM a WHERE col2 IN (SELECT col2 FROM a WHERE col3 = 'foo') AND col2 NOT IN (SELECT col2 FROM a WHERE col3 = 'bar') AND col2 IN (SELECT col2 FROM a WHERE col3 = 'lorem') GROUP BY col1", "output": [ "Execution Plan", - "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()])", + "\nPhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[LEAF])", "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalFilter(condition=[IS NOT TRUE($4)])", "\n PhysicalJoin(condition=[=($2, $3)], joinType=[left])", "\n PhysicalProject(col1=[$0], col2=[$1], col21=[$1])", "\n PhysicalJoin(condition=[=($1, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col1=[$0], col2=[$1])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'foo'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", + "\n PhysicalAggregate(group=[{0}], agg#0=[MIN($1)], aggType=[DIRECT])", "\n PhysicalProject(col2=[$1], $f1=[true])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'lorem'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", @@ -307,6 +307,59 @@ } ] }, + "physical_opt_group_trim_enabled": { + "queries": [ + { + "description": "SQL hint based group by optimization with partitioned aggregated values and group trim enabled", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_leaf_return_final_result='true', is_enable_group_trim='true') */ col1, COUNT(DISTINCT col2) AS cnt FROM a WHERE col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10", + "output": [ + "Execution Plan", + "\nPhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$1], dir0=[DESC], fetch=[10])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], leafReturnFinalResult=[true], limit=[10])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], leafReturnFinalResult=[true], collations=[[1 DESC]], limit=[10])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n" + ] + }, + { + "description": "SQL hint based group by optimization with group trim enabled without returning group key", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_enable_group_trim='true') */ COUNT(DISTINCT col2) AS cnt FROM a WHERE a.col3 >= 0 GROUP BY col1 ORDER BY cnt DESC LIMIT 10", + "output": [ + "Execution Plan", + "\nPhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(sort0=[$0], dir0=[DESC], fetch=[10])", + "\n PhysicalProject(cnt=[$1])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[FINAL], limit=[10])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{0}], agg#0=[DISTINCTCOUNT($1)], aggType=[LEAF], collations=[[1 DESC]], limit=[10])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n" + ] + }, + { + "description": "SQL hint based distinct optimization with group trim enabled", + "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT /*+ aggOptions(is_enable_group_trim='true') */ DISTINCT col1, col2 FROM a WHERE col3 >= 0 LIMIT 10", + "output": [ + "Execution Plan", + "\nPhysicalSort(fetch=[10])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalSort(fetch=[10])", + "\n PhysicalAggregate(group=[{0, 1}], aggType=[FINAL], limit=[10])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0, 1]])", + "\n PhysicalAggregate(group=[{0, 1}], aggType=[LEAF], limit=[10])", + "\n PhysicalFilter(condition=[>=($2, 0)])", + "\n PhysicalTableScan(table=[[default, a]])", + "\n" + ] + } + ] + }, "physical_opt_misc_auto_identity": { "queries": [ { @@ -315,20 +368,20 @@ "output": [ "Execution Plan", "\nPhysicalProject(EXPR$0=[$1], col3=[$0])", - "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)])", - "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT($1)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[PARTITIONING_EXCHANGE], distKeys=[[0]])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[LEAF])", "\n PhysicalJoin(condition=[=($0, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalAggregate(group=[{0}])", + "\n PhysicalAggregate(group=[{0}], aggType=[DIRECT])", "\n PhysicalUnion(all=[true])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($2, CAST(_UTF-8'bar'):INTEGER NOT NULL)])", "\n PhysicalTableScan(table=[[default, a]])", @@ -345,7 +398,7 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$0], dir0=[ASC])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", @@ -358,7 +411,7 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[10])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", @@ -372,7 +425,7 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$0], dir0=[ASC], offset=[10], fetch=[11])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(sort0=[$0], dir0=[ASC], fetch=[21])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", @@ -386,7 +439,7 @@ "output": [ "Execution Plan", "\nPhysicalSort(offset=[10], fetch=[11])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(fetch=[21])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", @@ -400,9 +453,9 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT])", "\n PhysicalTableScan(table=[[default, a]])", "\n" ] @@ -413,9 +466,9 @@ "output": [ "Execution Plan", "\nPhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[IDENTITY_EXCHANGE])", "\n PhysicalSort(sort0=[$1], dir0=[ASC], fetch=[100])", - "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()])", + "\n PhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT])", "\n PhysicalTableScan(table=[[default, b]])", "\n" ] @@ -441,7 +494,7 @@ "sql": "SET usePhysicalOptimizer=true; SET useLiteMode=true; EXPLAIN PLAN FOR SELECT col2, COUNT(*) FROM a WHERE col1 = 'foo' GROUP BY col2", "output": [ "Execution Plan", - "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()])", + "\nPhysicalAggregate(group=[{1}], agg#0=[COUNT()], aggType=[DIRECT], limit=[100000])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", "\n" @@ -455,9 +508,9 @@ "\nPhysicalProject(rnk=[$3], col1=[$0])", "\n PhysicalFilter(condition=[=($3, 1)])", "\n PhysicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])", - "\n PhysicalAggregate(group=[{0, 1, 2}])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0, 1, 2}])", + "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{0, 1, 2}], aggType=[LEAF], limit=[100000])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", "\n" @@ -469,9 +522,9 @@ "output": [ "Execution Plan", "\nPhysicalSort(offset=[100], fetch=[400])", - "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", - "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()])", + "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT($3)], aggType=[FINAL])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", + "\n PhysicalAggregate(group=[{0, 1, 2}], agg#0=[COUNT()], aggType=[LEAF], limit=[100000])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", "\n" @@ -487,12 +540,12 @@ "output": [ "Execution Plan", "\nPhysicalJoin(condition=[=($0, $2)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(fetch=[100000])", "\n PhysicalProject(col2=[$1], col3=[$2])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(fetch=[100000])", "\n PhysicalProject(col1=[$0])", "\n PhysicalTableScan(table=[[default, b]])", @@ -505,14 +558,14 @@ "output": [ "Execution Plan", "\nPhysicalProject(EXPR$0=[$1], col2=[$0])", - "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()])", + "\n PhysicalAggregate(group=[{0}], agg#0=[COUNT()], aggType=[DIRECT])", "\n PhysicalJoin(condition=[=($0, $1)], joinType=[semi])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(fetch=[100000])", "\n PhysicalProject(col2=[$1])", "\n PhysicalFilter(condition=[=($0, _UTF-8'foo')])", "\n PhysicalTableScan(table=[[default, a]])", - "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE], distKeys=[[]], execStrategy=[STREAMING], collation=[[]])", + "\n PhysicalExchange(exchangeStrategy=[SINGLETON_EXCHANGE])", "\n PhysicalSort(fetch=[100000])", "\n PhysicalProject(col1=[$0])", "\n PhysicalTableScan(table=[[default, b]])", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org