This is an automated email from the ASF dual-hosted git repository.

zhangstar333 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9a40cba5f0b [feature](profile)Enable merging of incomplete profiles. 
(#39560) (#42953)
9a40cba5f0b is described below

commit 9a40cba5f0b28b7f0334a9f1bb1ce2ac3be88cf2
Author: Mryange <yanxuech...@selectdb.com>
AuthorDate: Wed Nov 13 16:39:45 2024 +0800

    [feature](profile)Enable merging of incomplete profiles. (#39560) (#42953)
    
    ## Proposed changes
    https://github.com/apache/doris/pull/39560
    ```
    java.lang.IndexOutOfBoundsException: Index 0 out of bounds for length 0
            at 
jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) ~[?:?]
            at 
jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) 
~[?:?]
            at 
jdk.internal.util.Preconditions.checkIndex(Preconditions.java:266) ~[?:?]
            at java.util.Objects.checkIndex(Objects.java:359) ~[?:?]
            at java.util.ArrayList.get(ArrayList.java:427) ~[?:?]
            at 
org.apache.doris.common.profile.ExecutionProfile.getPipelineAggregatedProfile(ExecutionProfile.java:142)
 ~[doris-fe.jar:1.2-SNAPSHOT]
    ```
    
    In the past, we needed to ensure that profiles were complete before
    merging. Now, this allows incomplete profiles to be merged, with missing
    profiles being marked in the merged profile.
    ```
                                    -  ProjectionTime:  avg  0ns,  max  0ns,  
min  0ns
                                    -  RowsProduced:  sum  0,  avg  0,  max  0, 
 min  0
                                    -  
WaitForDependency[SORT_OPERATOR_DEPENDENCY]Time:  avg  15min2sec,  max  
15min2sec,  min  15min2sec
                      Pipeline  :  3(miss  profile):
                      Pipeline  :  4(instance_num=48):
                          LOCAL_EXCHANGE_SINK_OPERATOR  (PASSTHROUGH)  (id=-14):
                                -  CloseTime:  avg  0ns,  max  0ns,  min  0ns
                                -  ExecTime:  avg  29.410us,  max  43.336us,  
min
    ```
---
 .../doris/common/profile/ExecutionProfile.java     | 102 +++++++++++++++------
 1 file changed, 74 insertions(+), 28 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index a7a05ee12fd..7828a38e6eb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -41,7 +41,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * root is used to collect profile of a complete query plan(including query or 
load).
@@ -74,6 +74,7 @@ public class ExecutionProfile {
 
     // use to merge profile from multi be
     private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> 
multiBeProfile = null;
+    private ReentrantReadWriteLock multiBeProfileLock = new 
ReentrantReadWriteLock();
 
     // Not serialize this property, it is only used to get profile id.
     private SummaryProfile summaryProfile;
@@ -97,8 +98,7 @@ public class ExecutionProfile {
             RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + 
i);
             fragmentProfiles.put(fragmentId, runtimeProfile);
             fragmentsProfile.addChild(runtimeProfile);
-            multiBeProfile.put(fragmentId,
-                    new ConcurrentHashMap<TNetworkAddress, 
List<RuntimeProfile>>());
+            multiBeProfile.put(fragmentId, Maps.newHashMap());
             fragmentIdBeNum.put(fragmentId, 0);
             seqNoToFragmentId.put(i, fragmentId);
             ++i;
@@ -108,24 +108,54 @@ public class ExecutionProfile {
     }
 
     private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
-        Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = 
multiBeProfile.get(fragmentId);
-        List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
-        int pipelineSize = 0;
-        for (List<RuntimeProfile> profiles : multiPipeline.values()) {
-            pipelineSize = profiles.size();
-            break;
-        }
-        for (int pipelineIdx = 0; pipelineIdx < pipelineSize; pipelineIdx++) {
-            List<RuntimeProfile> allPipelineTask = new 
ArrayList<RuntimeProfile>();
-            for (List<RuntimeProfile> pipelines : multiPipeline.values()) {
-                RuntimeProfile pipeline = pipelines.get(pipelineIdx);
-                for (Pair<RuntimeProfile, Boolean> runtimeProfile : 
pipeline.getChildList()) {
-                    allPipelineTask.add(runtimeProfile.first);
+        multiBeProfileLock.readLock().lock();
+        try {
+            // A fragment in the BE contains multiple pipelines, and each 
pipeline contains
+            // multiple pipeline tasks.
+            Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = 
multiBeProfile.get(fragmentId);
+            List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
+            int pipelineSize = -1;
+            for (TNetworkAddress beAddress : multiPipeline.keySet()) {
+                List<RuntimeProfile> profileSingleBE = 
multiPipeline.get(beAddress);
+                // Check that within the same fragment across all BEs, there 
should be the same
+                // number of pipelines.
+                if (pipelineSize == -1) {
+                    pipelineSize = profileSingleBE.size();
+                } else {
+                    if (pipelineSize != profileSingleBE.size()) {
+                        LOG.warn("The profile sizes of the two BE are 
different, {} vs {}", pipelineSize,
+                                profileSingleBE.size());
+                        pipelineSize = Math.max(pipelineSize, 
profileSingleBE.size());
+                    }
+                }
+            }
+            for (int pipelineIdx = 0; pipelineIdx < pipelineSize; 
pipelineIdx++) {
+                List<RuntimeProfile> allPipelineTask = new 
ArrayList<RuntimeProfile>();
+                for (List<RuntimeProfile> profileSingleBE : 
multiPipeline.values()) {
+                    RuntimeProfile pipeline = profileSingleBE.get(pipelineIdx);
+                    for (Pair<RuntimeProfile, Boolean> pipelineTaskProfile : 
pipeline.getChildList()) {
+                        allPipelineTask.add(pipelineTaskProfile.first);
+                    }
                 }
+                if (allPipelineTask.isEmpty()) {
+                    LOG.warn("None of the BEs have pipeline task profiles in 
fragmentId:{}  , pipelineIdx:{}",
+                            fragmentId, pipelineIdx);
+                }
+                allPipelines.add(allPipelineTask);
             }
-            allPipelines.add(allPipelineTask);
+            return allPipelines;
+        } finally {
+            multiBeProfileLock.readLock().unlock();
+        }
+    }
+
+    void setMultiBeProfile(int fragmentId, TNetworkAddress backendHBAddress, 
List<RuntimeProfile> taskProfile) {
+        multiBeProfileLock.writeLock().lock();
+        try {
+            multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
+        } finally {
+            multiBeProfileLock.writeLock().unlock();
         }
-        return allPipelines;
     }
 
     private RuntimeProfile getPipelineAggregatedProfile(Map<Integer, String> 
planNodeMap) {
@@ -136,11 +166,20 @@ public class ExecutionProfile {
             List<List<RuntimeProfile>> allPipelines = 
getMultiBeProfile(seqNoToFragmentId.get(i));
             int pipelineIdx = 0;
             for (List<RuntimeProfile> allPipelineTask : allPipelines) {
-                RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
-                        "Pipeline : " + pipelineIdx + "(instance_num="
-                                + allPipelineTask.size() + ")",
-                        allPipelineTask.get(0).nodeId());
-                RuntimeProfile.mergeProfiles(allPipelineTask, 
mergedpipelineProfile, planNodeMap);
+                RuntimeProfile mergedpipelineProfile = null;
+                if (allPipelineTask.isEmpty()) {
+                    // It is possible that the profile collection may be 
incomplete, so only part of
+                    // the profile will be merged here.
+                    mergedpipelineProfile = new RuntimeProfile(
+                            "Pipeline : " + pipelineIdx + "(miss profile)",
+                            -pipelineIdx);
+                } else {
+                    mergedpipelineProfile = new RuntimeProfile(
+                            "Pipeline : " + pipelineIdx + "(instance_num="
+                                    + allPipelineTask.size() + ")",
+                            allPipelineTask.get(0).nodeId());
+                    RuntimeProfile.mergeProfiles(allPipelineTask, 
mergedpipelineProfile, planNodeMap);
+                }
                 newFragmentProfile.addChild(mergedpipelineProfile);
                 pipelineIdx++;
                 
fragmentsProfile.rowsProducedMap.putAll(mergedpipelineProfile.rowsProducedMap);
@@ -208,7 +247,11 @@ public class ExecutionProfile {
                     pipelineIdx++;
                 }
                 RuntimeProfile profileNode = new RuntimeProfile(name);
-                taskProfile.add(profileNode);
+                // The taskprofile is used to save the profile of the 
pipeline, without
+                // considering the FragmentLevel.
+                if (!(pipelineProfile.isSetIsFragmentLevel() && 
pipelineProfile.is_fragment_level)) {
+                    taskProfile.add(profileNode);
+                }
                 if (!pipelineProfile.isSetProfile()) {
                     LOG.warn("Profile is not set, {}", 
DebugUtil.printId(profile.getQueryId()));
                     return new Status(TStatusCode.INVALID_ARGUMENT, "Profile 
is not set");
@@ -218,7 +261,7 @@ public class ExecutionProfile {
                 profileNode.setIsDone(isDone);
                 fragmentProfiles.get(fragmentId).addChild(profileNode);
             }
-            multiBeProfile.get(fragmentId).put(backendHBAddress, taskProfile);
+            setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
         }
 
         if (profile.isSetLoadChannelProfiles()) {
@@ -256,7 +299,11 @@ public class ExecutionProfile {
                 pipelineIdx++;
             }
             RuntimeProfile profile = new RuntimeProfile(name);
-            taskProfile.add(profile);
+            // The taskprofile is used to save the profile of the pipeline, 
without
+            // considering the FragmentLevel.
+            if (!(param.isSetIsFragmentLevel() && param.is_fragment_level)) {
+                taskProfile.add(profile);
+            }
             if (param.isSetProfile()) {
                 profile.update(param.profile);
             }
@@ -271,8 +318,7 @@ public class ExecutionProfile {
         if (params.isSetLoadChannelProfile()) {
             loadChannelProfile.update(params.loadChannelProfile);
         }
-
-        
multiBeProfile.get(params.fragment_id).put(backend.getHeartbeatAddress(), 
taskProfile);
+        setMultiBeProfile(params.fragment_id, backend.getHeartbeatAddress(), 
taskProfile);
     }
 
     public synchronized void addFragmentBackend(PlanFragmentId fragmentId, 
Long backendId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to