This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c1dd1fc331 [fix](nereids): fix all bugs in `mergeGroup()`. (#16079) c1dd1fc331 is described below commit c1dd1fc33146bded9d3e17392fb70cba2e82565b Author: jakevin <jakevin...@gmail.com> AuthorDate: Thu Jan 19 19:15:05 2023 +0800 [fix](nereids): fix all bugs in `mergeGroup()`. (#16079) * [fix](Nereids): fix mergeGroup() * polish code * fix replace children of PhysicalEnforcer * delete `deleteBestPlan` * delete `getInputProperties` * after merge GroupExpression, clear owner Group --- .../doris/nereids/jobs/cascades/ApplyRuleJob.java | 3 +- .../nereids/jobs/cascades/CostAndEnforcerJob.java | 4 ++ .../nereids/jobs/cascades/DeriveStatsJob.java | 4 +- .../java/org/apache/doris/nereids/memo/Group.java | 63 ++++++++++++++++--- .../apache/doris/nereids/memo/GroupExpression.java | 46 ++++++++++++++ .../java/org/apache/doris/nereids/memo/Memo.java | 44 ++++++++----- .../org/apache/doris/nereids/memo/MemoTest.java | 73 +++++++++++++++------- 7 files changed, 188 insertions(+), 49 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/ApplyRuleJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/ApplyRuleJob.java index b4d7f09867..1f653b22f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/ApplyRuleJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/ApplyRuleJob.java @@ -58,7 +58,8 @@ public class ApplyRuleJob extends Job { @Override public void execute() throws AnalysisException { - if (groupExpression.hasApplied(rule)) { + if (groupExpression.hasApplied(rule) + || groupExpression.isUnused()) { return; } countJobExecutionTimesOfGroupExpressions(groupExpression); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java index f9d43f19aa..2f1c26472d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java @@ -104,6 +104,10 @@ public class CostAndEnforcerJob extends Job implements Cloneable { */ @Override public void execute() { + if (groupExpression.isUnused()) { + return; + } + countJobExecutionTimesOfGroupExpressions(groupExpression); // Do init logic of root plan/groupExpr of `subplan`, only run once per task. if (curChildIndex == -1) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java index 66638c5023..157d1e6067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java @@ -58,10 +58,10 @@ public class DeriveStatsJob extends Job { @Override public void execute() { - countJobExecutionTimesOfGroupExpressions(groupExpression); - if (groupExpression.isStatDerived()) { + if (groupExpression.isStatDerived() || groupExpression.isUnused()) { return; } + countJobExecutionTimesOfGroupExpressions(groupExpression); if (!deriveChildren && groupExpression.arity() > 0) { pushJob(new DeriveStatsJob(groupExpression, true, context)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java index afb77f4188..18e755709c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; import org.apache.doris.nereids.util.TreeStringUtils; import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.StatsDeriveResult; @@ -34,10 +35,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.ArrayList; -import java.util.HashSet; import java.util.IdentityHashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.function.Function; @@ -213,6 +215,23 @@ public class Group { lowestCostPlans.put(newProperty, pair); } + /** + * replace oldGroupExpression with newGroupExpression in lowestCostPlans. + */ + public void replaceBestPlanGroupExpr(GroupExpression oldGroupExpression, GroupExpression newGroupExpression) { + Map<PhysicalProperties, Pair<Double, GroupExpression>> needReplaceBestExpressions = Maps.newHashMap(); + for (Iterator<Entry<PhysicalProperties, Pair<Double, GroupExpression>>> iterator = + lowestCostPlans.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry<PhysicalProperties, Pair<Double, GroupExpression>> entry = iterator.next(); + Pair<Double, GroupExpression> pair = entry.getValue(); + if (pair.second.equals(oldGroupExpression)) { + needReplaceBestExpressions.put(entry.getKey(), Pair.of(pair.first, newGroupExpression)); + iterator.remove(); + } + } + lowestCostPlans.putAll(needReplaceBestExpressions); + } + public StatsDeriveResult getStatistics() { return statistics; } @@ -262,26 +281,54 @@ public class Group { * @param target the new owner group of expressions */ public void mergeTo(Group target) { - // move parentExpressions Ownership + // move parentExpressions Ownership parentExpressions.keySet().forEach(target::addParentExpression); + // PhysicalEnforcer isn't in groupExpressions, so mergeGroup() can't replace its children. + // So we need to manually replace the children of PhysicalEnforcer in here. + parentExpressions.keySet().stream().filter(ge -> ge.getPlan() instanceof PhysicalDistribute) + .forEach(ge -> ge.children().set(0, target)); parentExpressions.clear(); // move LogicalExpression PhysicalExpression Ownership - HashSet<GroupExpression> logicalSet = new HashSet<>(target.getLogicalExpressions()); - logicalExpressions.stream().filter(ge -> !logicalSet.contains(ge)).forEach(target::addLogicalExpression); + Map<GroupExpression, GroupExpression> logicalSet = target.getLogicalExpressions().stream() + .collect(Collectors.toMap(Function.identity(), Function.identity())); + for (GroupExpression logicalExpression : logicalExpressions) { + GroupExpression existGroupExpr = logicalSet.get(logicalExpression); + if (existGroupExpr != null) { + Preconditions.checkState(logicalExpression != existGroupExpr, "must not equals"); + // lowCostPlans must be physical GroupExpression, don't need to replaceBestPlanGroupExpr + logicalExpression.mergeToNotOwnerRemove(existGroupExpr); + } else { + target.addLogicalExpression(logicalExpression); + } + } logicalExpressions.clear(); // movePhysicalExpressionOwnership - HashSet<GroupExpression> physicalSet = new HashSet<>(target.getPhysicalExpressions()); - physicalExpressions.stream().filter(ge -> !physicalSet.contains(ge)).forEach(target::addGroupExpression); + Map<GroupExpression, GroupExpression> physicalSet = target.getPhysicalExpressions().stream() + .collect(Collectors.toMap(Function.identity(), Function.identity())); + for (GroupExpression physicalExpression : physicalExpressions) { + GroupExpression existGroupExpr = physicalSet.get(physicalExpression); + if (existGroupExpr != null) { + Preconditions.checkState(physicalExpression != existGroupExpr, "must not equals"); + physicalExpression.getOwnerGroup().replaceBestPlanGroupExpr(physicalExpression, existGroupExpr); + physicalExpression.mergeToNotOwnerRemove(existGroupExpr); + } else { + target.addPhysicalExpression(physicalExpression); + } + } physicalExpressions.clear(); - // moveLowestCostPlansOwnership + // Above we already replaceBestPlanGroupExpr, but we still need to moveLowestCostPlansOwnership. + // Because PhysicalEnforcer don't exist in physicalExpressions, so above `replaceBestPlanGroupExpr` can't + // move PhysicalEnforcer in lowestCostPlans. Following code can move PhysicalEnforcer in lowestCostPlans. lowestCostPlans.forEach((physicalProperties, costAndGroupExpr) -> { GroupExpression bestGroupExpression = costAndGroupExpr.second; - // change into target group. if (bestGroupExpression.getOwnerGroup() == this || bestGroupExpression.getOwnerGroup() == null) { + // move PhysicalEnforcer into target + Preconditions.checkState(bestGroupExpression.getPlan() instanceof PhysicalDistribute); bestGroupExpression.setOwnerGroup(target); } + // move lowestCostPlans Ownership if (!target.lowestCostPlans.containsKey(physicalProperties)) { target.lowestCostPlans.put(physicalProperties, costAndGroupExpr); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java index a6f3de8763..f1374e2d80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java @@ -66,6 +66,9 @@ public class GroupExpression { // value is the request physical properties private final Map<PhysicalProperties, PhysicalProperties> requestPropertiesMap; + // After mergeGroup(), source Group was cleaned up, but it may be in the Job Stack. So use this to mark and skip it. + private boolean isUnused = false; + public GroupExpression(Plan plan) { this(plan, Lists.newArrayList()); } @@ -163,6 +166,22 @@ public class GroupExpression { this.statDerived = statDerived; } + /** + * Check this GroupExpression isUnused. See detail of `isUnused` in its comment. + */ + public boolean isUnused() { + if (isUnused) { + Preconditions.checkState(children.isEmpty() || ownerGroup == null); + return true; + } + Preconditions.checkState(ownerGroup != null); + return false; + } + + public void setUnused(boolean isUnused) { + this.isUnused = isUnused; + } + public Map<PhysicalProperties, Pair<Double, List<PhysicalProperties>>> getLowestCostTable() { return lowestCostTable; } @@ -175,6 +194,7 @@ public class GroupExpression { /** * Add a (outputProperties) -> (cost, childrenInputProperties) in lowestCostTable. * if the outputProperties exists, will be covered. + * * @return true if lowest cost table change. */ public boolean updateLowestCostTable(PhysicalProperties outputProperties, @@ -209,6 +229,32 @@ public class GroupExpression { this.requestPropertiesMap.put(requiredPropertySet, outputPropertySet); } + /** + * Merge GroupExpression. + */ + public void mergeTo(GroupExpression target) { + this.ownerGroup.removeGroupExpression(this); + this.mergeToNotOwnerRemove(target); + } + + /** + * Merge GroupExpression, but owner don't remove this GroupExpression. + */ + public void mergeToNotOwnerRemove(GroupExpression target) { + // LowestCostTable + this.getLowestCostTable() + .forEach((properties, pair) -> target.updateLowestCostTable(properties, pair.second, pair.first)); + // requestPropertiesMap + target.requestPropertiesMap.putAll(this.requestPropertiesMap); + // ruleMasks + target.ruleMasks.or(this.ruleMasks); + + // clear + this.children.forEach(child -> child.removeParentExpression(this)); + this.children.clear(); + this.ownerGroup = null; + } + public double getCost() { return cost; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java index 83ba68f88a..c4e7465a88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Memo.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatsDeriveResult; @@ -444,29 +445,40 @@ public class Memo { } } GROUP_MERGE_TRACER.log(GroupMergeEvent.of(source, destination, needReplaceChild)); - for (GroupExpression groupExpression : needReplaceChild) { - // After change GroupExpression children, the hashcode will change, - // so need to reinsert into map. - groupExpressions.remove(groupExpression); - Utils.replaceList(groupExpression.children(), source, destination); - - GroupExpression that = groupExpressions.get(groupExpression); - if (that != null && that.getOwnerGroup() != null - && !that.getOwnerGroup().equals(groupExpression.getOwnerGroup())) { - // remove groupExpression from its owner group to avoid adding it to that.getOwnerGroup() - // that.getOwnerGroup() already has this groupExpression. - Group ownerGroup = groupExpression.getOwnerGroup(); - groupExpression.getOwnerGroup().removeGroupExpression(groupExpression); - mergeGroup(ownerGroup, that.getOwnerGroup()); + + Map<Group, Group> needMergeGroupPairs = Maps.newHashMap(); + for (GroupExpression reinsertGroupExpr : needReplaceChild) { + // After change GroupExpression children, hashcode will change, so need to reinsert into map. + groupExpressions.remove(reinsertGroupExpr); + Utils.replaceList(reinsertGroupExpr.children(), source, destination); + + GroupExpression existGroupExpr = groupExpressions.get(reinsertGroupExpr); + if (existGroupExpr != null) { + Preconditions.checkState(existGroupExpr.getOwnerGroup() != null); + // remove reinsertGroupExpr from its owner group to avoid adding it to existGroupExpr.getOwnerGroup() + // existGroupExpr.getOwnerGroup() already has this reinsertGroupExpr. + reinsertGroupExpr.setUnused(true); + if (existGroupExpr.getOwnerGroup().equals(reinsertGroupExpr.getOwnerGroup())) { + // reinsertGroupExpr & existGroupExpr are in same Group, so merge them. + if (reinsertGroupExpr.getPlan() instanceof PhysicalPlan) { + reinsertGroupExpr.getOwnerGroup().replaceBestPlanGroupExpr(reinsertGroupExpr, existGroupExpr); + } + // existingGroupExpression merge the state of reinsertGroupExpr + reinsertGroupExpr.mergeTo(existGroupExpr); + } else { + // reinsertGroupExpr & existGroupExpr aren't in same group, need to merge their OwnerGroup. + needMergeGroupPairs.put(reinsertGroupExpr.getOwnerGroup(), existGroupExpr.getOwnerGroup()); + } } else { - groupExpressions.put(groupExpression, groupExpression); + groupExpressions.put(reinsertGroupExpr, reinsertGroupExpr); } } if (!source.equals(destination)) { - // TODO: stats and other source.mergeTo(destination); groups.remove(source.getGroupId()); } + + needMergeGroupPairs.forEach(this::mergeGroup); return destination; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java index 5853a7dc4a..7ddf8e9b1e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java @@ -53,7 +53,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Map; import java.util.Objects; @@ -68,33 +67,63 @@ class MemoTest implements PatternMatchSupported { private final LogicalJoin<LogicalJoin<LogicalOlapScan, LogicalOlapScan>, LogicalOlapScan> logicalJoinABC = new LogicalJoin<>( JoinType.INNER_JOIN, logicalJoinAB, PlanConstructor.newLogicalOlapScan(2, "C", 0)); + /* + * ┌─────────────────────────┐ ┌───────────┐ + * │ ┌─────┐ ┌─────┐ │ │ ┌─────┐ │ + * │ │0┌─┐ │ │1┌─┐ │ │ │ │1┌─┐ │ │ + * │ │ └┼┘ │ │ └┼┘ │ │ │ │ └┼┘ │ │ + * │ └──┼──┘ └──┼──┘ │ │ └──┼──┘ │ + * │Memo │ │ ├────►│Memo │ │ + * │ ┌──▼──┐ ┌──▼──┐ │ │ ┌──▼──┐ │ + * │ │ src │ │ dst │ │ │ │ dst │ │ + * │ │2 │ │3 │ │ │ │3 │ │ + * │ └─────┘ └─────┘ │ │ └─────┘ │ + * └─────────────────────────┘ └───────────┘ + */ @Test - void mergeGroup() { + void testMergeGroup() { + Group srcGroup = new Group(new GroupId(2), new GroupExpression(new FakePlan()), + new LogicalProperties(ArrayList::new)); + Group dstGroup = new Group(new GroupId(3), new GroupExpression(new FakePlan()), + new LogicalProperties(ArrayList::new)); + + FakePlan fakePlan = new FakePlan(); + GroupExpression srcParentExpression = new GroupExpression(fakePlan, Lists.newArrayList(srcGroup)); + Group srcParentGroup = new Group(new GroupId(0), srcParentExpression, new LogicalProperties(ArrayList::new)); + srcParentGroup.setBestPlan(srcParentExpression, Double.MIN_VALUE, PhysicalProperties.ANY); + GroupExpression dstParentExpression = new GroupExpression(fakePlan, Lists.newArrayList(dstGroup)); + Group dstParentGroup = new Group(new GroupId(1), dstParentExpression, new LogicalProperties(ArrayList::new)); + Memo memo = new Memo(); - GroupId gid2 = new GroupId(2); - Group srcGroup = new Group(gid2, new GroupExpression(new FakePlan()), new LogicalProperties(ArrayList::new)); - GroupId gid3 = new GroupId(3); - Group dstGroup = new Group(gid3, new GroupExpression(new FakePlan()), new LogicalProperties(ArrayList::new)); - FakePlan d = new FakePlan(); - GroupExpression ge1 = new GroupExpression(d, Arrays.asList(srcGroup)); - GroupId gid0 = new GroupId(0); - Group g1 = new Group(gid0, ge1, new LogicalProperties(ArrayList::new)); - g1.setBestPlan(ge1, Double.MIN_VALUE, PhysicalProperties.ANY); - GroupExpression ge2 = new GroupExpression(d, Arrays.asList(dstGroup)); - GroupId gid1 = new GroupId(1); - Group g2 = new Group(gid1, ge2, new LogicalProperties(ArrayList::new)); Map<GroupId, Group> groups = Deencapsulation.getField(memo, "groups"); - groups.put(gid2, srcGroup); - groups.put(gid3, dstGroup); - groups.put(gid0, g1); - groups.put(gid1, g2); + groups.put(srcGroup.getGroupId(), srcGroup); + groups.put(dstGroup.getGroupId(), dstGroup); + groups.put(srcParentGroup.getGroupId(), srcParentGroup); + groups.put(dstParentGroup.getGroupId(), dstParentGroup); Map<GroupExpression, GroupExpression> groupExpressions = Deencapsulation.getField(memo, "groupExpressions"); - groupExpressions.put(ge1, ge1); - groupExpressions.put(ge2, ge2); + groupExpressions.put(srcParentExpression, srcParentExpression); + groupExpressions.put(dstParentExpression, dstParentExpression); + memo.mergeGroup(srcGroup, dstGroup); - Assertions.assertNull(g1.getBestPlan(PhysicalProperties.ANY)); - Assertions.assertEquals(ge1.getOwnerGroup(), g2); + + // check + Assertions.assertEquals(0, srcGroup.getParentGroupExpressions().size()); + Assertions.assertEquals(0, srcGroup.getPhysicalExpressions().size()); + Assertions.assertEquals(0, srcGroup.getLogicalExpressions().size()); + + Assertions.assertEquals(0, srcParentGroup.getParentGroupExpressions().size()); + Assertions.assertEquals(0, srcParentGroup.getPhysicalExpressions().size()); + Assertions.assertEquals(0, srcParentGroup.getLogicalExpressions().size()); + + // TODO: add root test. + // Assertions.assertEquals(memo.getRoot(), dstParentGroup); + + Assertions.assertEquals(2, dstGroup.getPhysicalExpressions().size()); + Assertions.assertEquals(1, dstParentGroup.getPhysicalExpressions().size()); + + Assertions.assertNull(srcParentExpression.getOwnerGroup()); + Assertions.assertEquals(0, srcParentExpression.arity()); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org