This is an automated email from the ASF dual-hosted git repository. zhangstar333 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9a40cba5f0b [feature](profile)Enable merging of incomplete profiles. (#39560) (#42953) 9a40cba5f0b is described below commit 9a40cba5f0b28b7f0334a9f1bb1ce2ac3be88cf2 Author: Mryange <yanxuech...@selectdb.com> AuthorDate: Wed Nov 13 16:39:45 2024 +0800 [feature](profile)Enable merging of incomplete profiles. (#39560) (#42953) ## Proposed changes https://github.com/apache/doris/pull/39560 ``` java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0 at jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[?:?] at jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) ~[?:?] at jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) ~[?:?] at java.util.Objects.checkIndex(Objects.java:359) ~[?:?] at java.util.ArrayList.get(ArrayList.java:427) ~[?:?] at org.apache.doris.common.profile.ExecutionProfile.getPipelineAggregatedProfile(ExecutionProfile.java:142) ~[doris-fe.jar:1.2-SNAPSHOT] ``` In the past, we needed to ensure that profiles were complete before merging. Now, this allows incomplete profiles to be merged, with missing profiles being marked in the merged profile. ``` - ProjectionTime: avg 0ns, max 0ns, min 0ns - RowsProduced: sum 0, avg 0, max 0, min 0 - WaitForDependency[SORT_OPERATOR_DEPENDENCY]Time: avg 15min2sec, max 15min2sec, min 15min2sec Pipeline : 3(miss profile): Pipeline : 4(instance_num=48): LOCAL_EXCHANGE_SINK_OPERATOR (PASSTHROUGH) (id=-14): - CloseTime: avg 0ns, max 0ns, min 0ns - ExecTime: avg 29.410us, max 43.336us, min ``` --- .../doris/common/profile/ExecutionProfile.java | 102 +++++++++++++++------ 1 file changed, 74 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index a7a05ee12fd..7828a38e6eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -41,7 +41,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * root is used to collect profile of a complete query plan(including query or load). @@ -74,6 +74,7 @@ public class ExecutionProfile { // use to merge profile from multi be private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = null; + private ReentrantReadWriteLock multiBeProfileLock = new ReentrantReadWriteLock(); // Not serialize this property, it is only used to get profile id. private SummaryProfile summaryProfile; @@ -97,8 +98,7 @@ public class ExecutionProfile { RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + i); fragmentProfiles.put(fragmentId, runtimeProfile); fragmentsProfile.addChild(runtimeProfile); - multiBeProfile.put(fragmentId, - new ConcurrentHashMap<TNetworkAddress, List<RuntimeProfile>>()); + multiBeProfile.put(fragmentId, Maps.newHashMap()); fragmentIdBeNum.put(fragmentId, 0); seqNoToFragmentId.put(i, fragmentId); ++i; @@ -108,24 +108,54 @@ public class ExecutionProfile { } private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) { - Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(fragmentId); - List<List<RuntimeProfile>> allPipelines = Lists.newArrayList(); - int pipelineSize = 0; - for (List<RuntimeProfile> profiles : multiPipeline.values()) { - pipelineSize = profiles.size(); - break; - } - for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) { - List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>(); - for (List<RuntimeProfile> pipelines : multiPipeline.values()) { - RuntimeProfile pipeline = pipelines.get(pipelineIdx); - for (Pair<RuntimeProfile, Boolean> runtimeProfile : pipeline.getChildList()) { - allPipelineTask.add(runtimeProfile.first); + multiBeProfileLock.readLock().lock(); + try { + // A fragment in the BE contains multiple pipelines, and each pipeline contains + // multiple pipeline tasks. + Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = multiBeProfile.get(fragmentId); + List<List<RuntimeProfile>> allPipelines = Lists.newArrayList(); + int pipelineSize = -1; + for (TNetworkAddress beAddress : multiPipeline.keySet()) { + List<RuntimeProfile> profileSingleBE = multiPipeline.get(beAddress); + // Check that within the same fragment across all BEs, there should be the same + // number of pipelines. + if (pipelineSize == -1) { + pipelineSize = profileSingleBE.size(); + } else { + if (pipelineSize != profileSingleBE.size()) { + LOG.warn("The profile sizes of the two BE are different, {} vs {}", pipelineSize, + profileSingleBE.size()); + pipelineSize = Math.max(pipelineSize, profileSingleBE.size()); + } + } + } + for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) { + List<RuntimeProfile> allPipelineTask = new ArrayList<RuntimeProfile>(); + for (List<RuntimeProfile> profileSingleBE : multiPipeline.values()) { + RuntimeProfile pipeline = profileSingleBE.get(pipelineIdx); + for (Pair<RuntimeProfile, Boolean> pipelineTaskProfile : pipeline.getChildList()) { + allPipelineTask.add(pipelineTaskProfile.first); + } } + if (allPipelineTask.isEmpty()) { + LOG.warn("None of the BEs have pipeline task profiles in fragmentId:{} , pipelineIdx:{}", + fragmentId, pipelineIdx); + } + allPipelines.add(allPipelineTask); } - allPipelines.add(allPipelineTask); + return allPipelines; + } finally { + multiBeProfileLock.readLock().unlock(); + } + } + + void setMultiBeProfile(int fragmentId, TNetworkAddress backendHBAddress, List<RuntimeProfile> taskProfile) { + multiBeProfileLock.writeLock().lock(); + try { + multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile); + } finally { + multiBeProfileLock.writeLock().unlock(); } - return allPipelines; } private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> planNodeMap) { @@ -136,11 +166,20 @@ public class ExecutionProfile { List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(seqNoToFragmentId.get(i)); int pipelineIdx = 0; for (List<RuntimeProfile> allPipelineTask : allPipelines) { - RuntimeProfile mergedpipelineProfile = new RuntimeProfile( - "Pipeline : " + pipelineIdx + "(instance_num=" - + allPipelineTask.size() + ")", - allPipelineTask.get(0).nodeId()); - RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap); + RuntimeProfile mergedpipelineProfile = null; + if (allPipelineTask.isEmpty()) { + // It is possible that the profile collection may be incomplete, so only part of + // the profile will be merged here. + mergedpipelineProfile = new RuntimeProfile( + "Pipeline : " + pipelineIdx + "(miss profile)", + -pipelineIdx); + } else { + mergedpipelineProfile = new RuntimeProfile( + "Pipeline : " + pipelineIdx + "(instance_num=" + + allPipelineTask.size() + ")", + allPipelineTask.get(0).nodeId()); + RuntimeProfile.mergeProfiles(allPipelineTask, mergedpipelineProfile, planNodeMap); + } newFragmentProfile.addChild(mergedpipelineProfile); pipelineIdx++; fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap); @@ -208,7 +247,11 @@ public class ExecutionProfile { pipelineIdx++; } RuntimeProfile profileNode = new RuntimeProfile(name); - taskProfile.add(profileNode); + // The taskprofile is used to save the profile of the pipeline, without + // considering the FragmentLevel. + if (!(pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level)) { + taskProfile.add(profileNode); + } if (!pipelineProfile.isSetProfile()) { LOG.warn("Profile is not set, {}", DebugUtil.printId(profile.getQueryId())); return new Status(TStatusCode.INVALID_ARGUMENT, "Profile is not set"); @@ -218,7 +261,7 @@ public class ExecutionProfile { profileNode.setIsDone(isDone); fragmentProfiles.get(fragmentId).addChild(profileNode); } - multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile); + setMultiBeProfile(fragmentId, backendHBAddress, taskProfile); } if (profile.isSetLoadChannelProfiles()) { @@ -256,7 +299,11 @@ public class ExecutionProfile { pipelineIdx++; } RuntimeProfile profile = new RuntimeProfile(name); - taskProfile.add(profile); + // The taskprofile is used to save the profile of the pipeline, without + // considering the FragmentLevel. + if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) { + taskProfile.add(profile); + } if (param.isSetProfile()) { profile.update(param.profile); } @@ -271,8 +318,7 @@ public class ExecutionProfile { if (params.isSetLoadChannelProfile()) { loadChannelProfile.update(params.loadChannelProfile); } - - multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), taskProfile); + setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(), taskProfile); } public synchronized void addFragmentBackend(PlanFragmentId fragmentId, Long backendId) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org