This is an automated email from the ASF dual-hosted git repository. englefly pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f1bf64ef81f [feat](nereids)set actual row count in physical plan according to merged profile (branch-3.0) (#42254) f1bf64ef81f is described below commit f1bf64ef81ffc2a2b0627354324382e6a9c30cdd Author: minghong <engle...@gmail.com> AuthorDate: Tue Oct 22 19:01:49 2024 +0800 [feat](nereids)set actual row count in physical plan according to merged profile (branch-3.0) (#42254) ## Proposed changes pick #40361 Issue Number: close #xxx <!--Describe your changes.--> --- .../doris/common/profile/ExecutionProfile.java | 1 + .../org/apache/doris/common/profile/Profile.java | 78 +++++++++++++++++----- .../doris/common/profile/SummaryProfile.java | 2 - .../apache/doris/common/util/RuntimeProfile.java | 15 +++++ .../doris/nereids/trees/plans/AbstractPlan.java | 4 ++ .../trees/plans/physical/AbstractPhysicalJoin.java | 3 +- .../trees/plans/physical/PhysicalCTEProducer.java | 1 + .../plans/physical/PhysicalHashAggregate.java | 2 +- .../trees/plans/physical/PhysicalQuickSort.java | 4 +- .../nereids/trees/plans/physical/PhysicalTopN.java | 1 + .../trees/plans/physical/PhysicalUnion.java | 4 +- .../trees/plans/physical/PhysicalWindow.java | 3 +- .../java/org/apache/doris/qe/StmtExecutor.java | 4 ++ .../org/apache/doris/statistics/Statistics.java | 44 +++++++----- 14 files changed, 126 insertions(+), 40 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index d4a00939fe7..a7a05ee12fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -143,6 +143,7 @@ public class ExecutionProfile { RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap); newFragmentProfile.addChild(mergedpipelineProfile); pipelineIdx++; + fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap); } } return fragmentsProfile; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 25c6979be7f..1fbc0fdb1d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -23,8 +23,11 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.AbstractPlan; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; +import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.planner.Planner; @@ -46,6 +49,8 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.zip.Deflater; @@ -108,6 +113,10 @@ public class Profile { // Profile size is the size of profile file private long profileSize = 0; + private PhysicalPlan physicalPlan; + public Map<String, Long> rowsProducedMap = new HashMap<>(); + private List<PhysicalRelation> physicalRelations = new ArrayList<>(); + private String changedSessionVarCache = ""; // Need default constructor for read from storage @@ -276,20 +285,8 @@ public class Profile { if (planner instanceof NereidsPlanner) { NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner); - StringBuilder builder = new StringBuilder(); - builder.append("\n"); - builder.append(nereidsPlanner.getPhysicalPlan() - .treeString()); - builder.append("\n"); - for (PhysicalRelation relation : nereidsPlanner.getPhysicalRelations()) { - if (relation.getStats() != null) { - builder.append(relation).append("\n") - .append(relation.getStats().printColumnStats()); - } - } - summaryInfo.put(SummaryProfile.PHYSICAL_PLAN, - builder.toString().replace("\n", "\n ")); - + physicalPlan = nereidsPlanner.getPhysicalPlan(); + physicalRelations.addAll(nereidsPlanner.getPhysicalRelations()); FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans(); if (distributedPlans != null) { summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN, @@ -382,15 +379,43 @@ public class Profile { // Only generate merged profile for select, insert into select. // Not support broker load now. + RuntimeProfile mergedProfile = null; if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) { try { - builder.append("\n MergedProfile \n"); - this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " "); + mergedProfile = this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap); + this.rowsProducedMap.putAll(mergedProfile.rowsProducedMap); + if (physicalPlan != null) { + updateActualRowCountOnPhysicalPlan(physicalPlan); + } } catch (Throwable aggProfileException) { LOG.warn("build merged simple profile {} failed", this.id, aggProfileException); + } + } + + if (physicalPlan != null) { + builder.append("\nPhysical Plan \n"); + StringBuilder physcialPlanBuilder = new StringBuilder(); + physcialPlanBuilder.append(physicalPlan.treeString()); + physcialPlanBuilder.append("\n"); + for (PhysicalRelation relation : physicalRelations) { + if (relation.getStats() != null) { + physcialPlanBuilder.append(relation).append("\n") + .append(relation.getStats().printColumnStats()); + } + } + builder.append( + physcialPlanBuilder.toString().replace("\n", "\n ")); + } + + if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) { + builder.append("\nMergedProfile \n"); + if (mergedProfile != null) { + mergedProfile.prettyPrint(builder, " "); + } else { builder.append("build merged simple profile failed"); } } + try { // For load task, they will have multiple execution_profiles. for (ExecutionProfile executionProfile : executionProfiles) { @@ -681,4 +706,25 @@ public class Profile { return; } + + public PhysicalPlan getPhysicalPlan() { + return physicalPlan; + } + + public void setPhysicalPlan(PhysicalPlan physicalPlan) { + this.physicalPlan = physicalPlan; + } + + private void updateActualRowCountOnPhysicalPlan(Plan plan) { + if (plan == null || rowsProducedMap.isEmpty()) { + return; + } + Long actualRowCount = rowsProducedMap.get(String.valueOf(((AbstractPlan) plan).getId())); + if (actualRowCount != null) { + ((AbstractPlan) plan).updateActualRowCount(actualRowCount); + } + for (Plan child : plan.children()) { + updateActualRowCountOnPhysicalPlan(child); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 5728f79f1e6..8356bc34a13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -66,7 +66,6 @@ public class SummaryProfile { public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num"; public static final String TRACE_ID = "Trace ID"; public static final String WORKLOAD_GROUP = "Workload Group"; - public static final String PHYSICAL_PLAN = "Physical Plan"; public static final String DISTRIBUTED_PLAN = "Distributed Plan"; public static final String SYSTEM_MESSAGE = "System Message"; public static final String EXECUTED_BY_FRONTEND = "Executed By Frontend"; @@ -129,7 +128,6 @@ public class SummaryProfile { START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_CATALOG, DEFAULT_DB, SQL_STATEMENT); public static final ImmutableList<String> SUMMARY_KEYS = new ImmutableList.Builder<String>() .addAll(SUMMARY_CAPTIONS) - .add(PHYSICAL_PLAN) .add(DISTRIBUTED_PLAN) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 60207b49172..3ffc303a6db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -40,12 +40,15 @@ import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Formatter; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * It is accessed by two kinds of thread, one is to create this RuntimeProfile @@ -100,6 +103,8 @@ public class RuntimeProfile { @SerializedName(value = "nodeid") private int nodeid = -1; + public Map<String, Long> rowsProducedMap = new HashMap<>(); + public RuntimeProfile() { init(); } @@ -494,6 +499,7 @@ public class RuntimeProfile { // RuntimeProfile has at least one counter named TotalTime, should exclude it. if (newCreatedMergedChildProfile.counterMap.size() > 1) { simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap); + simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap); } } } @@ -504,6 +510,12 @@ public class RuntimeProfile { return; } RuntimeProfile templateProfile = profiles.get(0); + Pattern pattern = Pattern.compile("nereids_id=(\\d+)"); + Matcher matcher = pattern.matcher(templateProfile.getName()); + String nereidsId = null; + if (matcher.find()) { + nereidsId = matcher.group(1); + } Set<String> childCounterSet = templateProfile.childCounterMap.get(parentCounterName); if (childCounterSet == null) { return; @@ -517,6 +529,9 @@ public class RuntimeProfile { Counter orgCounter = profile.counterMap.get(childCounterName); aggCounter.addCounter(orgCounter); } + if (nereidsId != null && childCounterName.equals("RowsProduced")) { + simpleProfile.rowsProducedMap.put(nereidsId, aggCounter.sum.getValue()); + } if (simpleProfile.counterMap.containsKey(parentCounterName)) { simpleProfile.addCounter(childCounterName, aggCounter, parentCounterName); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java index 9dfca3195d6..eb65048050f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java @@ -226,4 +226,8 @@ public abstract class AbstractPlan extends AbstractTreeNode<Plan> implements Pla } return ancestors; } + + public void updateActualRowCount(long actualRowCount) { + statistics.setActualRowCount(actualRowCount); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java index 5f4bb5aa5f4..194f6356045 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java @@ -267,8 +267,9 @@ public abstract class AbstractPhysicalJoin< @Override public String toString() { - List<Object> args = Lists.newArrayList("type", joinType, + List<Object> args = Lists.newArrayList( "stats", statistics, + "type", joinType, "hashCondition", hashJoinConjuncts, "otherCondition", otherJoinConjuncts, "markCondition", markJoinConjuncts); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java index 53ff3e30257..568b8e6660a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalCTEProducer.java @@ -89,6 +89,7 @@ public class PhysicalCTEProducer<CHILD_TYPE extends Plan> extends PhysicalUnary< @Override public String toString() { return Utils.toSqlString("PhysicalCTEProducer[" + id.asInt() + "]", + "stats", statistics, "cteId", cteId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java index 72c69a18bee..404c30fe379 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashAggregate.java @@ -199,9 +199,9 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan> extends PhysicalUnar TopnPushInfo topnPushInfo = (TopnPushInfo) getMutableState( MutableState.KEY_PUSH_TOPN_TO_AGG).orElseGet(() -> null); return Utils.toSqlString("PhysicalHashAggregate[" + id.asInt() + "]" + getGroupIdWithPrefix(), + "stats", statistics, "aggPhase", aggregateParam.aggPhase, "aggMode", aggregateParam.aggMode, - "stats", statistics, "maybeUseStreaming", maybeUsingStream, "groupByExpr", groupByExpressions, "outputExpr", outputExpressions, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java index c1973668c7d..0e377b46d23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalQuickSort.java @@ -107,8 +107,8 @@ public class PhysicalQuickSort<CHILD_TYPE extends Plan> extends AbstractPhysical @Override public String toString() { return Utils.toSqlString("PhysicalQuickSort[" + id.asInt() + "]" + getGroupIdWithPrefix(), - "orderKeys", orderKeys, - "phase", phase.toString(), "stats", statistics + "stats", statistics, "orderKeys", orderKeys, + "phase", phase.toString() ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java index 96dc709bbde..c387a58dd0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java @@ -143,6 +143,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort< @Override public String toString() { return Utils.toSqlString("PhysicalTopN[" + id.asInt() + "]" + getGroupIdWithPrefix(), + "stats", statistics, "limit", limit, "offset", offset, "orderKeys", orderKeys, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java index ba20c926705..2a81698812a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalUnion.java @@ -89,11 +89,11 @@ public class PhysicalUnion extends PhysicalSetOperation implements Union { @Override public String toString() { return Utils.toSqlString("PhysicalUnion" + "[" + id.asInt() + "]" + getGroupIdWithPrefix(), + "stats", statistics, "qualifier", qualifier, "outputs", outputs, "regularChildrenOutputs", regularChildrenOutputs, - "constantExprsList", constantExprsList, - "stats", statistics); + "constantExprsList", constantExprsList); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java index b1703f47496..7e6fd48f02d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalWindow.java @@ -105,8 +105,9 @@ public class PhysicalWindow<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD @Override public String toString() { return Utils.toSqlString("PhysicalWindow[" + id.asInt() + "]" + getGroupIdWithPrefix(), + "stats", statistics, "windowFrameGroup", windowFrameGroup, - "requiredProperties", requireProperties, "stats", statistics + "requiredProperties", requireProperties ); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 70da91bc66d..16600536aac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1223,6 +1223,10 @@ public class StmtExecutor { // failed, the insert stmt should be success try { profile.updateSummary(getSummaryInfo(isFinished), isFinished, this.planner); + if (planner instanceof NereidsPlanner) { + NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner); + profile.setPhysicalPlan(nereidsPlanner.getPhysicalPlan()); + } } catch (Throwable t) { LOG.warn("failed to update profile, ignore this error", t); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java index 162dab5d136..f4552a2560d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/Statistics.java @@ -46,12 +46,19 @@ public class Statistics { private double deltaRowCount = 0.0; + private long actualRowCount = -1L; + + public Statistics(double rowCount, Map<Expression, ColumnStatistic> expressionToColumnStats) { + this(rowCount, 1, expressionToColumnStats); + } + public Statistics(Statistics another) { this.rowCount = another.rowCount; this.widthInJoinCluster = another.widthInJoinCluster; this.expressionToColumnStats = new HashMap<>(another.expressionToColumnStats); this.tupleSize = another.tupleSize; this.deltaRowCount = another.getDeltaRowCount(); + this.actualRowCount = another.actualRowCount; } public Statistics(double rowCount, int widthInJoinCluster, @@ -62,10 +69,6 @@ public class Statistics { this.deltaRowCount = deltaRowCount; } - public Statistics(double rowCount, Map<Expression, ColumnStatistic> expressionToColumnStats) { - this(rowCount, 1, expressionToColumnStats, 0); - } - public Statistics(double rowCount, int widthInJoinCluster, Map<Expression, ColumnStatistic> expressionToColumnStats) { this(rowCount, widthInJoinCluster, expressionToColumnStats, 0); @@ -193,21 +196,24 @@ public class Statistics { @Override public String toString() { + StringBuilder builder = new StringBuilder(); if (Double.isNaN(rowCount)) { - return "NaN"; - } - if (Double.POSITIVE_INFINITY == rowCount) { - return "Infinite"; - } - if (Double.NEGATIVE_INFINITY == rowCount) { - return "-Infinite"; + builder.append("NaN"); + } else if (Double.POSITIVE_INFINITY == rowCount) { + builder.append("Infinite"); + } else if (Double.NEGATIVE_INFINITY == rowCount) { + builder.append("-Infinite"); + } else { + DecimalFormat format = new DecimalFormat("#,###.##"); + builder.append(format.format(rowCount)); } - DecimalFormat format = new DecimalFormat("#,###.##"); - String rows = format.format(rowCount); if (deltaRowCount > 0) { - rows = rows + "(" + format.format(deltaRowCount) + ")"; + builder.append("(").append((long) deltaRowCount).append(")"); } - return rows; + if (actualRowCount != -1) { + builder.append(" actualRows=").append(actualRowCount); + } + return builder.toString(); } public String printColumnStats() { @@ -292,4 +298,12 @@ public class Statistics { public void setDeltaRowCount(double deltaRowCount) { this.deltaRowCount = deltaRowCount; } + + public long getActualRowCount() { + return actualRowCount; + } + + public void setActualRowCount(long actualRowCount) { + this.actualRowCount = actualRowCount; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org