This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 991b6a72741 [feature](Nereids): add ColumnPruningPostProcessor. (#32800) (#34077) 991b6a72741 is described below commit 991b6a727413d1e22d291f92cb4a1974bebbaa1a Author: jakevin <jakevin...@gmail.com> AuthorDate: Thu Apr 25 11:26:14 2024 +0800 [feature](Nereids): add ColumnPruningPostProcessor. (#32800) (#34077) (cherry picked from commit 5970f983db5dc95341c16f3459d883ce0f9676b3) --- .../processor/post/ColumnPruningPostProcessor.java | 107 +++++++++++++++++++++ .../nereids/processor/post/PlanPostProcessors.java | 1 + .../doris/nereids/processor/post/TopNScanOpt.java | 6 +- .../trees/plans/physical/AbstractPhysicalPlan.java | 2 +- .../ColumnPruningPostProcessorTest.java | 62 ++++++++++++ .../data/nereids_ssb_shape_sf100_p0/shape/q3.4.out | 13 +-- .../data/nereids_ssb_shape_sf100_p0/shape/q4.3.out | 25 ++--- 7 files changed, 195 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java new file mode 100644 index 00000000000..40d25ddd748 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/ColumnPruningPostProcessor.java @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.processor.post; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.annotation.DependsRules; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; +import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Prune column for Join-Cluster + */ +@DependsRules({ + MergeProjectPostProcessor.class +}) +public class ColumnPruningPostProcessor extends PlanPostProcessor { + @Override + public PhysicalProject visitPhysicalProject(PhysicalProject<? extends Plan> project, CascadesContext ctx) { + Plan child = project.child(); + Plan newChild = child.accept(this, ctx); + if (newChild instanceof AbstractPhysicalJoin) { + AbstractPhysicalJoin<? extends Plan, ? extends Plan> join = (AbstractPhysicalJoin) newChild; + Plan left = join.left(); + Plan right = join.right(); + Set<Slot> leftOutput = left.getOutputSet(); + Set<Slot> rightOutput = right.getOutputSet(); + + Set<Slot> usedSlots = project.getProjects().stream().flatMap(ne -> ne.getInputSlots().stream()) + .collect(Collectors.toSet()); + + Stream.concat(join.getHashJoinConjuncts().stream(), join.getOtherJoinConjuncts().stream()) + .flatMap(expr -> expr.getInputSlots().stream()) + .forEach(usedSlots::add); + join.getMarkJoinSlotReference().ifPresent(usedSlots::add); + + List<NamedExpression> leftNewProjections = new ArrayList<>(); + List<NamedExpression> rightNewProjections = new ArrayList<>(); + + for (Slot usedSlot : usedSlots) { + if (leftOutput.contains(usedSlot)) { + leftNewProjections.add(usedSlot); + } else if (rightOutput.contains(usedSlot)) { + rightNewProjections.add(usedSlot); + } + } + + Plan newLeft; + if (left instanceof PhysicalDistribute) { + newLeft = leftNewProjections.size() != leftOutput.size() && !leftNewProjections.isEmpty() + ? left.withChildren(new PhysicalProject<>(leftNewProjections, + left.getLogicalProperties(), left.child(0))) + : left; + } else { + newLeft = leftNewProjections.size() != leftOutput.size() && !leftNewProjections.isEmpty() + ? new PhysicalProject<>(leftNewProjections, left.getLogicalProperties(), + left).copyStatsAndGroupIdFrom((AbstractPhysicalPlan) left) + : left; + } + Plan newRight; + if (right instanceof PhysicalDistribute) { + newRight = rightNewProjections.size() != rightOutput.size() && !rightNewProjections.isEmpty() + ? right.withChildren(new PhysicalProject<>(rightNewProjections, + right.getLogicalProperties(), right.child(0))) + : right; + } else { + newRight = rightNewProjections.size() != rightOutput.size() && !rightNewProjections.isEmpty() + ? new PhysicalProject<>(rightNewProjections, right.getLogicalProperties(), + right).copyStatsAndGroupIdFrom((AbstractPhysicalPlan) right) + : right; + } + + if (newLeft != left || newRight != right) { + return (PhysicalProject) project.withChildren(join.withChildren(newLeft, newRight)); + } else { + return project; + } + } + return project; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index c6e4b245447..c5b2cf8456c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -59,6 +59,7 @@ public class PlanPostProcessors { // add processor if we need Builder<PlanPostProcessor> builder = ImmutableList.builder(); builder.add(new PushdownFilterThroughProject()); + builder.add(new ColumnPruningPostProcessor()); builder.add(new MergeProjectPostProcessor()); builder.add(new RecomputeLogicalPropertiesProcessor()); builder.add(new AddOffsetIntoDistribute()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java index 458682273d1..7201934b031 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java @@ -79,7 +79,8 @@ public class TopNScanOpt extends PlanPostProcessor { Plan child = topN.child(); topN = rewriteTopN(topN); if (child != topN.child()) { - topN = ((PhysicalTopN<? extends Plan>) topN.withChildren(child)).copyStatsAndGroupIdFrom(topN); + topN = (PhysicalTopN<? extends Plan>) ((PhysicalTopN<? extends Plan>) topN.withChildren( + child)).copyStatsAndGroupIdFrom(topN); } return topN; } else if (topN.getSortPhase() == SortPhase.MERGE_SORT) { @@ -94,7 +95,8 @@ public class TopNScanOpt extends PlanPostProcessor { if (topN.getSortPhase() == SortPhase.LOCAL_SORT) { PhysicalTopN<? extends Plan> rewrittenTopN = rewriteTopN(topN.getPhysicalTopN()); if (topN.getPhysicalTopN() != rewrittenTopN) { - topN = topN.withPhysicalTopN(rewrittenTopN).copyStatsAndGroupIdFrom(topN); + topN = (PhysicalDeferMaterializeTopN<? extends Plan>) topN.withPhysicalTopN(rewrittenTopN) + .copyStatsAndGroupIdFrom(topN); } return topN; } else if (topN.getSortPhase() == SortPhase.MERGE_SORT) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index 6a0ccd09341..fc8a3c27c24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -66,7 +66,7 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi return this; } - public <T extends AbstractPhysicalPlan> T copyStatsAndGroupIdFrom(T from) { + public <T extends AbstractPhysicalPlan> AbstractPhysicalPlan copyStatsAndGroupIdFrom(T from) { T newPlan = (T) withPhysicalPropertiesAndStats( from.getPhysicalProperties(), from.getStats()); newPlan.setMutableState(MutableState.KEY_GROUP, from.getGroupIdAsString()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java new file mode 100644 index 00000000000..0e65aa6d582 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/ColumnPruningPostProcessorTest.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.postprocess; + +import org.apache.doris.nereids.processor.post.ColumnPruningPostProcessor; +import org.apache.doris.nereids.rules.rewrite.InferFilterNotNull; +import org.apache.doris.nereids.trees.plans.JoinType; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.util.LogicalPlanBuilder; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.nereids.util.PlanConstructor; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class ColumnPruningPostProcessorTest implements MemoPatternMatchSupported { + private final LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0); + private final LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0); + + @Test + void test() { + LogicalPlan plan = new LogicalPlanBuilder(scan1) + .join(scan2, JoinType.INNER_JOIN, ImmutableList.of()) + .project(ImmutableList.of(0, 2)) + .build(); + + PhysicalPlan physicalPlan = PlanChecker.from(MemoTestUtils.createConnectContext(), plan) + .applyTopDown(new InferFilterNotNull()) + .implement() + .getPhysicalPlan(); + + ColumnPruningPostProcessor processor = new ColumnPruningPostProcessor(); + PhysicalPlan newPlan = (PhysicalPlan) physicalPlan.accept(processor, null); + + Assertions.assertTrue(newPlan instanceof PhysicalProject); + Assertions.assertTrue(newPlan.child(0) instanceof PhysicalNestedLoopJoin); + Assertions.assertTrue(newPlan.child(0).child(0) instanceof PhysicalProject); + Assertions.assertTrue(newPlan.child(0).child(1) instanceof PhysicalProject); + } +} diff --git a/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q3.4.out b/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q3.4.out index 6e73de650a5..8cfeb063775 100644 --- a/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q3.4.out +++ b/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q3.4.out @@ -12,13 +12,14 @@ PhysicalResultSink ------------------PhysicalDistribute --------------------PhysicalProject ----------------------hashJoin[INNER_JOIN](lineorder.lo_orderdate = dates.d_datekey) -------------------------hashJoin[INNER_JOIN](lineorder.lo_suppkey = supplier.s_suppkey) ---------------------------PhysicalProject -----------------------------PhysicalOlapScan[lineorder] ---------------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------hashJoin[INNER_JOIN](lineorder.lo_suppkey = supplier.s_suppkey) ----------------------------PhysicalProject -------------------------------filter(s_city IN ('UNITED KI1', 'UNITED KI5')) ---------------------------------PhysicalOlapScan[supplier] +------------------------------PhysicalOlapScan[lineorder] +----------------------------PhysicalDistribute +------------------------------PhysicalProject +--------------------------------filter(s_city IN ('UNITED KI1', 'UNITED KI5')) +----------------------------------PhysicalOlapScan[supplier] ------------------------PhysicalDistribute --------------------------PhysicalProject ----------------------------filter((dates.d_yearmonth = 'Dec1997')) diff --git a/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out b/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out index 04f46355d63..a25a04a7d31 100644 --- a/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out +++ b/regression-test/data/nereids_ssb_shape_sf100_p0/shape/q4.3.out @@ -14,19 +14,20 @@ PhysicalResultSink ------------------PhysicalDistribute --------------------PhysicalProject ----------------------hashJoin[INNER_JOIN](lineorder.lo_orderdate = dates.d_datekey) -------------------------hashJoin[INNER_JOIN](lineorder.lo_partkey = part.p_partkey) ---------------------------PhysicalDistribute -----------------------------hashJoin[INNER_JOIN](lineorder.lo_suppkey = supplier.s_suppkey) -------------------------------PhysicalProject ---------------------------------PhysicalOlapScan[lineorder] -------------------------------PhysicalDistribute +------------------------PhysicalProject +--------------------------hashJoin[INNER_JOIN](lineorder.lo_partkey = part.p_partkey) +----------------------------PhysicalDistribute +------------------------------hashJoin[INNER_JOIN](lineorder.lo_suppkey = supplier.s_suppkey) --------------------------------PhysicalProject -----------------------------------filter((supplier.s_nation = 'UNITED STATES')) -------------------------------------PhysicalOlapScan[supplier] ---------------------------PhysicalDistribute -----------------------------PhysicalProject -------------------------------filter((part.p_category = 'MFGR#14')) ---------------------------------PhysicalOlapScan[part] +----------------------------------PhysicalOlapScan[lineorder] +--------------------------------PhysicalDistribute +----------------------------------PhysicalProject +------------------------------------filter((supplier.s_nation = 'UNITED STATES')) +--------------------------------------PhysicalOlapScan[supplier] +----------------------------PhysicalDistribute +------------------------------PhysicalProject +--------------------------------filter((part.p_category = 'MFGR#14')) +----------------------------------PhysicalOlapScan[part] ------------------------PhysicalDistribute --------------------------PhysicalProject ----------------------------filter(d_year IN (1997, 1998)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org