This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/2.1-tmp by this push:
     new 16f8afc4083 [refactor](coordinator) split profile logic and instance 
report logic (#32010)
16f8afc4083 is described below

commit 16f8afc4083d84f3ce4a4317e2a558b907ff535d
Author: yiguolei <676222...@qq.com>
AuthorDate: Wed Apr 3 18:48:19 2024 +0800

    [refactor](coordinator) split profile logic and instance report logic 
(#32010)
    
    Co-authored-by: yiguolei <yiguo...@gmail.com>
---
 .../main/java/org/apache/doris/common/Config.java  |   1 +
 .../main/java/org/apache/doris/common/Status.java  |   5 +
 .../doris/common/profile/ExecutionProfile.java     | 309 +++++------
 .../org/apache/doris/common/profile/Profile.java   |  80 ++-
 .../doris/common/profile/SummaryProfile.java       |  16 +-
 .../apache/doris/common/util/ProfileManager.java   |  82 ++-
 .../apache/doris/common/util/RuntimeProfile.java   |  34 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  21 +-
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |  16 +-
 .../nereids/trees/plans/commands/LoadCommand.java  |   4 +-
 .../java/org/apache/doris/qe/CoordInterface.java   |   2 -
 .../main/java/org/apache/doris/qe/Coordinator.java | 603 +++++++++++----------
 .../java/org/apache/doris/qe/PointQueryExec.java   |   6 -
 .../main/java/org/apache/doris/qe/QeProcessor.java |   2 -
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |  79 ++-
 .../java/org/apache/doris/qe/SessionVariable.java  |   9 +
 .../java/org/apache/doris/qe/StmtExecutor.java     |  51 +-
 .../org/apache/doris/rpc/BackendServiceClient.java |   3 +-
 .../org/apache/doris/rpc/BackendServiceProxy.java  |   7 +-
 .../doris/common/util/RuntimeProfileTest.java      |   2 +-
 .../java/org/apache/doris/qe/StmtExecutorTest.java |   4 +-
 21 files changed, 715 insertions(+), 621 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 5a6bdf1b6d5..a727c23bb74 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2520,6 +2520,7 @@ public class Config extends ConfigBase {
             "Whether to enable proxy protocol"
     })
     public static boolean enable_proxy_protocol = false;
+    public static int profile_async_collect_expire_time_secs = 5;
 
 
     
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 1961f9b8cc5..555a82751ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -128,4 +128,9 @@ public class Status {
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return "Status [errorCode=" + errorCode + ", errorMsg=" + errorMsg + 
"]";
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 24bd2355c56..f339da82924 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -17,33 +17,31 @@
 
 package org.apache.doris.common.profile;
 
-import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
-import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.thrift.TDetailedReportParams;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUnit;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * ExecutionProfile is used to collect profile of a complete query 
plan(including query or load).
+ * root is used to collect profile of a complete query plan(including query or 
load).
  * Need to call addToProfileAsChild() to add it to the root profile.
  * It has the following structure:
  *  Execution Profile:
@@ -59,53 +57,62 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class ExecutionProfile {
     private static final Logger LOG = 
LogManager.getLogger(ExecutionProfile.class);
 
+    private final TUniqueId queryId;
+    private boolean isFinished = false;
+    private long startTime = 0L;
+    private long queryFinishTime = 0L;
     // The root profile of this execution task
-    private RuntimeProfile executionProfile;
+    private RuntimeProfile root;
     // Profiles for each fragment. And the InstanceProfile is the child of 
fragment profile.
     // Which will be added to fragment profile when calling 
Coordinator::sendFragment()
-    private List<RuntimeProfile> fragmentProfiles;
+    // Could not use array list because fragment id is not continuous, planner 
may cut fragment
+    // during planning.
+    private Map<Integer, RuntimeProfile> fragmentProfiles;
     // Profile for load channels. Only for load job.
     private RuntimeProfile loadChannelProfile;
-    // A countdown latch to mark the completion of each instance.
-    // use for old pipeline
-    // instance id -> dummy value
-    private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal;
+    // FragmentId -> InstanceId -> RuntimeProfile
+    private Map<PlanFragmentId, Map<TUniqueId, RuntimeProfile>> 
fragmentInstancesProfiles;
+    private boolean isPipelineXProfile = false;
 
-    // A countdown latch to mark the completion of each fragment. use for 
pipelineX
-    // fragmentId -> dummy value
-    private MarkedCountDownLatch<Integer, Long> profileFragmentDoneSignal;
-
-    // fragmentId -> The number of BE without 'done.
-    private Map<Integer, Integer> befragmentDone;
+    // use to merge profile from multi be
+    private Map<Integer, Map<TNetworkAddress, List<RuntimeProfile>>> 
multiBeProfile = null;
 
-    // lock befragmentDone
-    private ReadWriteLock lock;
+    // Not serialize this property, it is only used to get profile id.
+    private SummaryProfile summaryProfile;
 
-    // use to merge profile from multi be
-    private List<Map<TNetworkAddress, List<RuntimeProfile>>> multiBeProfile = 
null;
+    // BE only has instance id, does not have fragmentid, so should use this 
map to find fragmentid.
+    private Map<TUniqueId, PlanFragmentId> instanceIdToFragmentId;
+    private Map<Integer, Integer> fragmentIdBeNum;
+    private Map<Integer, Integer> seqNoToFragmentId;
 
-    public ExecutionProfile(TUniqueId queryId, int fragmentNum) {
-        executionProfile = new RuntimeProfile("Execution Profile " + 
DebugUtil.printId(queryId));
+    public ExecutionProfile(TUniqueId queryId, List<PlanFragment> fragments) {
+        this.queryId = queryId;
+        root = new RuntimeProfile("Execution Profile " + 
DebugUtil.printId(queryId));
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
-        executionProfile.addChild(fragmentsProfile);
-        fragmentProfiles = Lists.newArrayList();
-        multiBeProfile = Lists.newArrayList();
-        for (int i = 0; i < fragmentNum; i++) {
-            fragmentProfiles.add(new RuntimeProfile("Fragment " + i));
-            fragmentsProfile.addChild(fragmentProfiles.get(i));
-            multiBeProfile.add(new ConcurrentHashMap<TNetworkAddress, 
List<RuntimeProfile>>());
+        root.addChild(fragmentsProfile);
+        fragmentProfiles = Maps.newHashMap();
+        multiBeProfile = Maps.newHashMap();
+        fragmentIdBeNum = Maps.newHashMap();
+        seqNoToFragmentId = Maps.newHashMap();
+        int i = 0;
+        for (PlanFragment planFragment : fragments) {
+            RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + 
i);
+            fragmentProfiles.put(planFragment.getFragmentId().asInt(), 
runtimeProfile);
+            fragmentsProfile.addChild(runtimeProfile);
+            multiBeProfile.put(planFragment.getFragmentId().asInt(),
+                    new ConcurrentHashMap<TNetworkAddress, 
List<RuntimeProfile>>());
+            fragmentIdBeNum.put(planFragment.getFragmentId().asInt(), 0);
+            seqNoToFragmentId.put(i, planFragment.getFragmentId().asInt());
+            ++i;
         }
         loadChannelProfile = new RuntimeProfile("LoadChannels");
-        executionProfile.addChild(loadChannelProfile);
+        root.addChild(loadChannelProfile);
+        fragmentInstancesProfiles = Maps.newHashMap();
+        instanceIdToFragmentId = Maps.newHashMap();
     }
 
-    public void addMultiBeProfileByPipelineX(int profileFragmentId, 
TNetworkAddress address,
-            List<RuntimeProfile> taskProfile) {
-        multiBeProfile.get(profileFragmentId).put(address, taskProfile);
-    }
-
-    private List<List<RuntimeProfile>> getMultiBeProfile(int 
profileFragmentId) {
-        Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = 
multiBeProfile.get(profileFragmentId);
+    private List<List<RuntimeProfile>> getMultiBeProfile(int fragmentId) {
+        Map<TNetworkAddress, List<RuntimeProfile>> multiPipeline = 
multiBeProfile.get(fragmentId);
         List<List<RuntimeProfile>> allPipelines = Lists.newArrayList();
         int pipelineSize = 0;
         for (List<RuntimeProfile> profiles : multiPipeline.values()) {
@@ -130,7 +137,7 @@ public class ExecutionProfile {
         for (int i = 0; i < fragmentProfiles.size(); ++i) {
             RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " 
+ i);
             fragmentsProfile.addChild(newFragmentProfile);
-            List<List<RuntimeProfile>> allPipelines = getMultiBeProfile(i);
+            List<List<RuntimeProfile>> allPipelines = 
getMultiBeProfile(seqNoToFragmentId.get(i));
             int pipelineIdx = 0;
             for (List<RuntimeProfile> allPipelineTask : allPipelines) {
                 RuntimeProfile mergedpipelineProfile = new RuntimeProfile(
@@ -148,7 +155,7 @@ public class ExecutionProfile {
     private RuntimeProfile getNonPipelineXAggregatedProfile(Map<Integer, 
String> planNodeMap) {
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
         for (int i = 0; i < fragmentProfiles.size(); ++i) {
-            RuntimeProfile oldFragmentProfile = fragmentProfiles.get(i);
+            RuntimeProfile oldFragmentProfile = 
fragmentProfiles.get(seqNoToFragmentId.get(i));
             RuntimeProfile newFragmentProfile = new RuntimeProfile("Fragment " 
+ i);
             fragmentsProfile.addChild(newFragmentProfile);
             List<RuntimeProfile> allInstanceProfiles = new 
ArrayList<RuntimeProfile>();
@@ -164,7 +171,7 @@ public class ExecutionProfile {
     }
 
     public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> 
planNodeMap) {
-        if (enablePipelineX()) {
+        if (isPipelineXProfile) {
             /*
              * Fragment 0
              * ---Pipeline 0
@@ -211,143 +218,143 @@ public class ExecutionProfile {
         }
     }
 
-    public RuntimeProfile getExecutionProfile() {
-        return executionProfile;
-    }
-
-    public RuntimeProfile getLoadChannelProfile() {
-        return loadChannelProfile;
-    }
-
-    public List<RuntimeProfile> getFragmentProfiles() {
-        return fragmentProfiles;
+    public RuntimeProfile getRoot() {
+        return root;
     }
 
-    public void addToProfileAsChild(RuntimeProfile rootProfile) {
-        rootProfile.addChild(executionProfile);
+    public void setPipelineX() {
+        this.isPipelineXProfile = true;
     }
 
-    public void markInstances(Set<TUniqueId> instanceIds) {
-        profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size());
-        for (TUniqueId instanceId : instanceIds) {
-            profileDoneSignal.addMark(instanceId, -1L /* value is meaningless 
*/);
+    // The execution profile is maintained in ProfileManager, if it is 
finished, then should
+    // remove it from it as soon as possible.
+    public void update(long startTime, boolean isFinished) {
+        if (this.isFinished) {
+            return;
         }
-    }
-
-    private boolean enablePipelineX() {
-        return profileFragmentDoneSignal != null;
-    }
-
-    public void markFragments(int fragments) {
-        profileFragmentDoneSignal = new MarkedCountDownLatch<>(fragments);
-        lock = new ReentrantReadWriteLock();
-        befragmentDone = new HashMap<>();
-        for (int fragmentId = 0; fragmentId < fragments; fragmentId++) {
-            profileFragmentDoneSignal.addMark(fragmentId, -1L /* value is 
meaningless */);
-            befragmentDone.put(fragmentId, 0);
+        this.isFinished = isFinished;
+        this.startTime = startTime;
+        if (startTime > 0) {
+            root.getCounterTotalTime().setValue(TUnit.TIME_MS, 
TimeUtils.getElapsedTimeMs(startTime));
         }
-    }
 
-    public void addFragments(int fragmentId) {
-        lock.writeLock().lock();
-        try {
-            befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) + 1);
-        } finally {
-            lock.writeLock().unlock();
+        for (RuntimeProfile fragmentProfile : fragmentProfiles.values()) {
+            fragmentProfile.sortChildren();
         }
     }
 
-    public void update(long startTime, boolean isFinished) {
-        if (startTime > 0) {
-            executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, 
TimeUtils.getElapsedTimeMs(startTime));
-        }
-        // Wait for all backends to finish reporting when writing profile last 
time.
-        if (isFinished && profileDoneSignal != null) {
-            try {
-                profileDoneSignal.await(2, TimeUnit.SECONDS);
-            } catch (InterruptedException e1) {
-                LOG.warn("signal await error", e1);
+    public void updateProfile(TReportExecStatusParams params, TNetworkAddress 
address) {
+        if (isPipelineXProfile) {
+            int pipelineIdx = 0;
+            List<RuntimeProfile> taskProfile = Lists.newArrayList();
+            for (TDetailedReportParams param : params.detailed_report) {
+                String name = "Pipeline :" + pipelineIdx + " "
+                        + " (host=" + address + ")";
+                RuntimeProfile profile = new RuntimeProfile(name);
+                taskProfile.add(profile);
+                if (param.isSetProfile()) {
+                    profile.update(param.profile);
+                }
+                if (params.done) {
+                    profile.setIsDone(true);
+                }
+                pipelineIdx++;
+                fragmentProfiles.get(params.fragment_id).addChild(profile);
             }
-        }
-
-        if (isFinished && profileFragmentDoneSignal != null) {
-            try {
-                profileFragmentDoneSignal.await(2, TimeUnit.SECONDS);
-            } catch (InterruptedException e1) {
-                LOG.warn("signal await error", e1);
+            // TODO ygl: is this right? there maybe multi Backends, what does
+            // update load profile do???
+            if (params.isSetLoadChannelProfile()) {
+                loadChannelProfile.update(params.loadChannelProfile);
+            }
+            multiBeProfile.get(params.fragment_id).put(address, taskProfile);
+        } else {
+            PlanFragmentId fragmentId = 
instanceIdToFragmentId.get(params.fragment_instance_id);
+            if (fragmentId == null) {
+                LOG.warn("Could not find related fragment for instance {}",
+                        DebugUtil.printId(params.fragment_instance_id));
+                return;
+            }
+            // Do not use fragment id in params, because non-pipeline engine 
will set it to -1
+            Map<TUniqueId, RuntimeProfile> instanceProfiles = 
fragmentInstancesProfiles.get(fragmentId);
+            if (instanceProfiles == null) {
+                LOG.warn("Could not find related instances for fragment {}", 
fragmentId);
+                return;
+            }
+            RuntimeProfile instanceProfile = 
instanceProfiles.get(params.fragment_instance_id);
+            if (instanceProfile == null) {
+                LOG.warn("Could not find related instance {}", 
params.fragment_instance_id);
+                return;
+            }
+            if (params.isSetProfile()) {
+                instanceProfile.update(params.profile);
+            }
+            if (params.isSetDone() && params.isDone()) {
+                instanceProfile.setIsDone(true);
+            }
+            if (params.isSetLoadChannelProfile()) {
+                loadChannelProfile.update(params.loadChannelProfile);
             }
-        }
-
-        for (RuntimeProfile fragmentProfile : fragmentProfiles) {
-            fragmentProfile.sortChildren();
         }
     }
 
-    public void onCancel() {
-        if (profileDoneSignal != null) {
-            // count down to zero to notify all objects waiting for this
-            profileDoneSignal.countDownToZero(new Status());
+    // MultiInstances may update the profile concurrently
+    public synchronized void addInstanceProfile(PlanFragmentId fragmentId, 
TUniqueId instanceId,
+            RuntimeProfile instanceProfile) {
+        Map<TUniqueId, RuntimeProfile> instanceProfiles = 
fragmentInstancesProfiles.get(fragmentId);
+        if (instanceProfiles == null) {
+            instanceProfiles = Maps.newHashMap();
+            fragmentInstancesProfiles.put(fragmentId, instanceProfiles);
         }
-        if (profileFragmentDoneSignal != null) {
-            profileFragmentDoneSignal.countDownToZero(new Status());
+        RuntimeProfile existingInstanceProfile = 
instanceProfiles.get(instanceId);
+        if (existingInstanceProfile == null) {
+            instanceProfiles.put(instanceId, instanceProfile);
+            instanceIdToFragmentId.put(instanceId, fragmentId);
+            fragmentProfiles.get(fragmentId.asInt()).addChild(instanceProfile);
+            return;
         }
     }
 
-    public void markOneInstanceDone(TUniqueId fragmentInstanceId) {
-        if (profileDoneSignal != null) {
-            if (!profileDoneSignal.markedCountDown(fragmentInstanceId, -1L)) {
-                LOG.warn("Mark instance {} done failed", 
DebugUtil.printId(fragmentInstanceId));
-            }
-        }
+    public synchronized void addFragmentBackend(PlanFragmentId fragmentId, 
Long backendId) {
+        fragmentIdBeNum.put(fragmentId.asInt(), 
fragmentIdBeNum.get(fragmentId.asInt()) + 1);
     }
 
-    public void markOneFragmentDone(int fragmentId) {
-        if (profileFragmentDoneSignal != null) {
-            lock.writeLock().lock();
-            try {
-                befragmentDone.put(fragmentId, befragmentDone.get(fragmentId) 
- 1);
-                if (befragmentDone.get(fragmentId) == 0) {
-                    if (!profileFragmentDoneSignal.markedCountDown(fragmentId, 
-1L)) {
-                        LOG.warn("Mark fragment {} done failed", fragmentId);
-                    }
-                }
-            } finally {
-                lock.writeLock().unlock();
-            }
-        }
+    public TUniqueId getQueryId() {
+        return queryId;
     }
 
-    public boolean awaitAllInstancesDone(long waitTimeS) throws 
InterruptedException {
-        if (profileDoneSignal == null) {
-            return true;
+    // Check all fragments's child, if all finished, then this execution 
profile is finished
+    public boolean isCompleted() {
+        for (Entry<Integer, RuntimeProfile> element : 
fragmentProfiles.entrySet()) {
+            RuntimeProfile fragmentProfile = element.getValue();
+            // If any fragment is empty, it means BE does not report the 
profile, then the total
+            // execution profile is not completed.
+            if (fragmentProfile.isEmpty()
+                    || fragmentProfile.getChildList().size() < 
fragmentIdBeNum.get(element.getKey())) {
+                return false;
+            }
+            for (Pair<RuntimeProfile, Boolean> runtimeProfile : 
fragmentProfile.getChildList()) {
+                // If any child instance profile is not ready, then return 
false.
+                if (!(runtimeProfile.first.getIsDone() || 
runtimeProfile.first.getIsCancel())) {
+                    return false;
+                }
+            }
         }
-        return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
+        return true;
     }
 
-    public boolean awaitAllFragmentsDone(long waitTimeS) throws 
InterruptedException {
-        if (profileFragmentDoneSignal == null) {
-            return true;
-        }
-        return profileFragmentDoneSignal.await(waitTimeS, TimeUnit.SECONDS);
+    public long getQueryFinishTime() {
+        return queryFinishTime;
     }
 
-    public boolean isAllInstancesDone() {
-        if (profileDoneSignal == null) {
-            return true;
-        }
-        return profileDoneSignal.getCount() == 0;
+    public void setQueryFinishTime(long queryFinishTime) {
+        this.queryFinishTime = queryFinishTime;
     }
 
-    public boolean isAllFragmentsDone() {
-        if (profileFragmentDoneSignal == null) {
-            return true;
-        }
-        return profileFragmentDoneSignal.getCount() == 0;
+    public SummaryProfile getSummaryProfile() {
+        return summaryProfile;
     }
 
-    public void addInstanceProfile(int fragmentId, RuntimeProfile 
instanceProfile) {
-        Preconditions.checkArgument(fragmentId < fragmentProfiles.size(),
-                fragmentId + " vs. " + fragmentProfiles.size());
-        fragmentProfiles.get(fragmentId).addChild(instanceProfile);
+    public void setSummaryProfile(SummaryProfile summaryProfile) {
+        this.summaryProfile = summaryProfile;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 5f3ed601630..12ba687bfd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -45,11 +45,16 @@ import java.util.Map;
  * ExecutionProfile1: Fragment 0: Fragment 1: ...
  * ExecutionProfile2: Fragment 0: Fragment 1: ...
  *
+ * ExecutionProfile: Fragment 0: Fragment 1: ...
+ * And also summary profile contains plan information, but execution profile 
is for
+ * be execution time.
+ * StmtExecutor(Profile) ---> Coordinator(ExecutionProfile)
  */
 public class Profile {
     private static final Logger LOG = LogManager.getLogger(Profile.class);
     private static final int MergedProfileLevel = 1;
-    private RuntimeProfile rootProfile;
+    private final String name;
+    private final boolean isPipelineX;
     private SummaryProfile summaryProfile;
     private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
     private boolean isFinished;
@@ -57,51 +62,57 @@ public class Profile {
 
     private int profileLevel = 3;
 
-    public Profile(String name, boolean isEnable) {
-        this.rootProfile = new RuntimeProfile(name);
-        this.summaryProfile = new SummaryProfile(rootProfile);
+    public Profile(String name, boolean isEnable, int profileLevel, boolean 
isPipelineX) {
+        this.name = name;
+        this.isPipelineX = isPipelineX;
+        this.summaryProfile = new SummaryProfile();
         // if disabled, just set isFinished to true, so that update() will do 
nothing
         this.isFinished = !isEnable;
+        this.profileLevel = profileLevel;
     }
 
+    // For load task, the profile contains many execution profiles
     public void addExecutionProfile(ExecutionProfile executionProfile) {
         if (executionProfile == null) {
             LOG.warn("try to set a null excecution profile, it is abnormal", 
new Exception());
             return;
         }
+        if (this.isPipelineX) {
+            executionProfile.setPipelineX();
+        }
+        executionProfile.setSummaryProfile(summaryProfile);
         this.executionProfiles.add(executionProfile);
-        executionProfile.addToProfileAsChild(rootProfile);
     }
 
-    public synchronized void update(long startTime, Map<String, String> 
summaryInfo, boolean isFinished,
-            int profileLevel, Planner planner, boolean isPipelineX) {
+    public List<ExecutionProfile> getExecutionProfiles() {
+        return this.executionProfiles;
+    }
+
+    // This API will also add the profile to ProfileManager, so that we could 
get the profile from ProfileManager.
+    // isFinished ONLY means the cooridnator or stmtexecutor is finished.
+    public synchronized void updateSummary(long startTime, Map<String, String> 
summaryInfo, boolean isFinished,
+            Planner planner) {
         try {
             if (this.isFinished) {
                 return;
             }
             summaryProfile.update(summaryInfo);
             for (ExecutionProfile executionProfile : executionProfiles) {
+                // Tell execution profile the start time
                 executionProfile.update(startTime, isFinished);
             }
-            rootProfile.computeTimeInProfile();
             // Nerids native insert not set planner, so it is null
             if (planner != null) {
                 this.planNodeMap = planner.getExplainStringMap();
             }
-            rootProfile.setIsPipelineX(isPipelineX);
             ProfileManager.getInstance().pushProfile(this);
             this.isFinished = isFinished;
-            this.profileLevel = profileLevel;
         } catch (Throwable t) {
             LOG.warn("update profile failed", t);
             throw t;
         }
     }
 
-    public RuntimeProfile getRootProfile() {
-        return this.rootProfile;
-    }
-
     public SummaryProfile getSummaryProfile() {
         return summaryProfile;
     }
@@ -110,7 +121,7 @@ public class Profile {
         StringBuilder builder = new StringBuilder();
         // add summary to builder
         summaryProfile.prettyPrint(builder);
-        LOG.info(builder.toString());
+        waitProfileCompleteIfNeeded();
         // Only generate merged profile for select, insert into select.
         // Not support broker load now.
         if (this.profileLevel == MergedProfileLevel && 
this.executionProfiles.size() == 1) {
@@ -125,7 +136,7 @@ public class Profile {
         try {
             for (ExecutionProfile executionProfile : executionProfiles) {
                 builder.append("\n");
-                executionProfile.getExecutionProfile().prettyPrint(builder, 
"");
+                executionProfile.getRoot().prettyPrint(builder, "");
             }
         } catch (Throwable aggProfileException) {
             LOG.warn("build profile failed", aggProfileException);
@@ -134,7 +145,44 @@ public class Profile {
         return builder.toString();
     }
 
+    // If the query is already finished, and user wants to get the profile, we 
should check
+    // if BE has reported all profiles, if not, sleep 2s.
+    private void waitProfileCompleteIfNeeded() {
+        if (!this.isFinished) {
+            return;
+        }
+        boolean allCompleted = true;
+        for (ExecutionProfile executionProfile : executionProfiles) {
+            if (!executionProfile.isCompleted()) {
+                allCompleted = false;
+                break;
+            }
+        }
+        if (!allCompleted) {
+            try {
+                Thread.currentThread().sleep(2000);
+            } catch (InterruptedException e) {
+                // Do nothing
+            }
+        }
+    }
+
+    private RuntimeProfile composeRootProfile() {
+
+        RuntimeProfile rootProfile = new RuntimeProfile(name);
+        rootProfile.setIsPipelineX(isPipelineX);
+        rootProfile.addChild(summaryProfile.getSummary());
+        rootProfile.addChild(summaryProfile.getExecutionSummary());
+        for (ExecutionProfile executionProfile : executionProfiles) {
+            rootProfile.addChild(executionProfile.getRoot());
+        }
+        rootProfile.computeTimeInProfile();
+        return rootProfile;
+    }
+
     public String getProfileBrief() {
+        waitProfileCompleteIfNeeded();
+        RuntimeProfile rootProfile = composeRootProfile();
         Gson gson = new GsonBuilder().setPrettyPrinting().create();
         return gson.toJson(rootProfile.toBrief());
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index e9389b48b99..0d07e865d02 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -144,12 +144,22 @@ public class SummaryProfile {
     private long queryFetchResultConsumeTime = 0;
     private long queryWriteResultConsumeTime = 0;
 
-    public SummaryProfile(RuntimeProfile rootProfile) {
+    public SummaryProfile() {
         summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
         executionSummaryProfile = new 
RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
         init();
-        rootProfile.addChild(summaryProfile);
-        rootProfile.addChild(executionSummaryProfile);
+    }
+
+    public String getProfileId() {
+        return this.summaryProfile.getInfoString(PROFILE_ID);
+    }
+
+    public RuntimeProfile getSummary() {
+        return summaryProfile;
+    }
+
+    public RuntimeProfile getExecutionSummary() {
+        return executionSummaryProfile;
     }
 
     private void init() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 8644fea6221..70bb21a27e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -21,12 +21,14 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.AuthenticationException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.profile.MultiProfileTreeBuilder;
 import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.profile.ProfileTreeBuilder;
 import org.apache.doris.common.profile.ProfileTreeNode;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.nereids.stats.StatsErrorEstimator;
+import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -104,6 +106,9 @@ public class ProfileManager {
     // record the order of profiles by queryId
     private Deque<String> queryIdDeque;
     private Map<String, ProfileElement> queryIdToProfileMap; // from QueryId 
to RuntimeProfile
+    // Sometimes one Profile is related with multiple execution 
profiles(Brokerload), so that
+    // execution profile's query id is not related with Profile's query id.
+    private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles;
 
     public static ProfileManager getInstance() {
         if (INSTANCE == null) {
@@ -122,23 +127,56 @@ public class ProfileManager {
         writeLock = lock.writeLock();
         queryIdDeque = new LinkedList<>();
         queryIdToProfileMap = new ConcurrentHashMap<>();
+        queryIdToExecutionProfiles = Maps.newHashMap();
     }
 
-    public ProfileElement createElement(Profile profile) {
+    private ProfileElement createElement(Profile profile) {
         ProfileElement element = new ProfileElement(profile);
         
element.infoStrings.putAll(profile.getSummaryProfile().getAsInfoStings());
-        MultiProfileTreeBuilder builder = new 
MultiProfileTreeBuilder(profile.getRootProfile());
+        // Not init builder any more, we will not maintain it since 2.1.0, 
because the structure
+        // assume that the execution profiles structure is already known 
before execution. But in
+        // PipelineX Engine, it will changed during execution.
+        return element;
+    }
+
+    public void addExecutionProfile(ExecutionProfile executionProfile) {
+        if (executionProfile == null) {
+            return;
+        }
+        writeLock.lock();
         try {
-            builder.build();
-        } catch (Exception e) {
-            element.errMsg = e.getMessage();
+            if 
(queryIdToExecutionProfiles.containsKey(executionProfile.getQueryId())) {
+                return;
+            }
+            queryIdToExecutionProfiles.put(executionProfile.getQueryId(), 
executionProfile);
             if (LOG.isDebugEnabled()) {
-                LOG.debug("failed to build profile tree", e);
+                LOG.debug("Add execution profile {} to profile manager",
+                        DebugUtil.printId(executionProfile.getQueryId()));
             }
-            return element;
+            // Check if there are some query profiles that not finish 
collecting, should
+            // remove them to release memory.
+            if (queryIdToExecutionProfiles.size() > 2 * 
Config.max_query_profile_num) {
+                List<ExecutionProfile> finishOrExpireExecutionProfiles = 
Lists.newArrayList();
+                for (ExecutionProfile tmpProfile : 
queryIdToExecutionProfiles.values()) {
+                    if (System.currentTimeMillis() - 
tmpProfile.getQueryFinishTime()
+                            > Config.profile_async_collect_expire_time_secs * 
1000) {
+                        finishOrExpireExecutionProfiles.add(tmpProfile);
+                    }
+                }
+                for (ExecutionProfile tmp : finishOrExpireExecutionProfiles) {
+                    queryIdToExecutionProfiles.remove(tmp.getQueryId());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Remove expired execution profile {}", 
DebugUtil.printId(tmp.getQueryId()));
+                    }
+                }
+            }
+        } finally {
+            writeLock.unlock();
         }
-        element.builder = builder;
-        return element;
+    }
+
+    public ExecutionProfile getExecutionProfile(TUniqueId queryId) {
+        return this.queryIdToExecutionProfiles.get(queryId);
     }
 
     public void pushProfile(Profile profile) {
@@ -148,14 +186,13 @@ public class ProfileManager {
 
         ProfileElement element = createElement(profile);
         // 'insert into' does have job_id, put all profiles key with query_id
-        String key = element.infoStrings.get(SummaryProfile.PROFILE_ID);
+        String key = element.profile.getSummaryProfile().getProfileId();
         // check when push in, which can ensure every element in the list has 
QUERY_ID column,
         // so there is no need to check when remove element from list.
         if (Strings.isNullOrEmpty(key)) {
             LOG.warn("the key or value of Map is null, "
                     + "may be forget to insert 'QUERY_ID' or 'JOB_ID' column 
into infoStrings");
         }
-
         writeLock.lock();
         // a profile may be updated multiple times in queryIdToProfileMap,
         // and only needs to be inserted into the queryIdDeque for the first 
time.
@@ -163,7 +200,13 @@ public class ProfileManager {
         try {
             if (!queryIdDeque.contains(key)) {
                 if (queryIdDeque.size() >= Config.max_query_profile_num) {
-                    queryIdToProfileMap.remove(queryIdDeque.getFirst());
+                    ProfileElement profileElementRemoved = 
queryIdToProfileMap.remove(queryIdDeque.getFirst());
+                    // If the Profile object is removed from manager, then 
related execution profile is also useless.
+                    if (profileElementRemoved != null) {
+                        for (ExecutionProfile executionProfile : 
profileElementRemoved.profile.getExecutionProfiles()) {
+                            
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+                        }
+                    }
                     queryIdDeque.removeFirst();
                 }
                 queryIdDeque.addLast(key);
@@ -173,6 +216,21 @@ public class ProfileManager {
         }
     }
 
+    public void removeProfile(String profileId) {
+        writeLock.lock();
+        try {
+            ProfileElement profileElementRemoved = 
queryIdToProfileMap.remove(profileId);
+            // If the Profile object is removed from manager, then related 
execution profile is also useless.
+            if (profileElementRemoved != null) {
+                for (ExecutionProfile executionProfile : 
profileElementRemoved.profile.getExecutionProfiles()) {
+                    
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+                }
+            }
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     public List<List<String>> getAllQueries() {
         return getQueryWithType(null);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index eab9b3b6734..372b84fa3f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -26,6 +26,7 @@ import org.apache.doris.thrift.TRuntimeProfileTree;
 import org.apache.doris.thrift.TUnit;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
@@ -69,7 +70,8 @@ public class RuntimeProfile {
     private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock();
 
     private List<String> planNodeInfos = Lists.newArrayList();
-    private String name;
+    // name should not changed.
+    private final String name;
 
     private Long timestamp = -1L;
 
@@ -85,22 +87,24 @@ public class RuntimeProfile {
     private int nodeid = -1;
 
     public RuntimeProfile(String name) {
-        this();
+        this.localTimePercent = 0;
+        if (Strings.isNullOrEmpty(name)) {
+            throw new RuntimeException("Profile name must not be null");
+        }
         this.name = name;
         this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
+        this.counterMap.put("TotalTime", counterTotalTime);
     }
 
     public RuntimeProfile(String name, int nodeId) {
-        this();
+        this.localTimePercent = 0;
+        if (Strings.isNullOrEmpty(name)) {
+            throw new RuntimeException("Profile name must not be null");
+        }
         this.name = name;
         this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 3);
-        this.nodeid = nodeId;
-    }
-
-    public RuntimeProfile() {
-        this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
-        this.localTimePercent = 0;
         this.counterMap.put("TotalTime", counterTotalTime);
+        this.nodeid = nodeId;
     }
 
     public void setIsCancel(Boolean isCancel) {
@@ -143,10 +147,6 @@ public class RuntimeProfile {
         this.isPipelineX = isPipelineX;
     }
 
-    public boolean getIsPipelineX() {
-        return this.isPipelineX;
-    }
-
     public Map<String, Counter> getCounterMap() {
         return counterMap;
     }
@@ -155,6 +155,10 @@ public class RuntimeProfile {
         return childList;
     }
 
+    public boolean isEmpty() {
+        return childList.isEmpty();
+    }
+
     public Map<String, RuntimeProfile> getChildMap() {
         return childMap;
     }
@@ -750,10 +754,6 @@ public class RuntimeProfile {
         }
     }
 
-    public void setName(String name) {
-        this.name = name;
-    }
-
     // Returns the value to which the specified key is mapped;
     // or null if this map contains no mapping for the key.
     public String getInfoString(String key) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 27503d4cc78..02f9bb0a3e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -199,7 +199,9 @@ public class BrokerLoadJob extends BulkLoadJob {
         // divide job into broker loading task by table
         List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
         if (enableProfile) {
-            this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + 
label, true);
+            this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + 
label, true,
+                    
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, 
"3")),
+                    false);
         }
         ProgressManager progressManager = Env.getCurrentProgressManager();
         progressManager.registerProgressSimple(String.valueOf(id));
@@ -329,16 +331,6 @@ public class BrokerLoadJob extends BulkLoadJob {
         }
     }
 
-    private void writeProfile() {
-        if (!enableProfile) {
-            return;
-        }
-        jobProfile.update(createTimestamp, getSummaryInfo(true), true,
-                
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, 
"3")), null, false);
-        // jobProfile has been pushed into ProfileManager, remove reference in 
brokerLoadJob
-        jobProfile = null;
-    }
-
     private Map<String, String> getSummaryInfo(boolean isFinished) {
         long currentTimestamp = System.currentTimeMillis();
         SummaryBuilder builder = new SummaryBuilder();
@@ -410,7 +402,12 @@ public class BrokerLoadJob extends BulkLoadJob {
     @Override
     public void afterVisible(TransactionState txnState, boolean txnOperated) {
         super.afterVisible(txnState, txnOperated);
-        writeProfile();
+        if (!enableProfile) {
+            return;
+        }
+        jobProfile.updateSummary(createTimestamp, getSummaryInfo(true), true, 
null);
+        // jobProfile has been pushed into ProfileManager, remove reference in 
brokerLoadJob
+        jobProfile = null;
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 94fb49d6c85..ef7a07cb9a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -172,8 +172,11 @@ public class LoadLoadingTask extends LoadTask {
         }
 
         try {
-            QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);
+            QeProcessorImpl.INSTANCE.registerQuery(loadId, new 
QeProcessorImpl.QueryInfo(curCoordinator));
             actualExecute(curCoordinator, timeoutS);
+            if (this.jobProfile != null) {
+                curCoordinator.getExecutionProfile().update(beginTime, true);
+            }
         } finally {
             QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
         }
@@ -198,8 +201,6 @@ public class LoadLoadingTask extends LoadTask {
                         
ErrorTabletInfo.fromThrift(curCoordinator.getErrorTabletInfos()
                                 
.stream().limit(Config.max_error_tablet_of_broker_load).collect(Collectors.toList())));
                 curCoordinator.getErrorTabletInfos().clear();
-                // Create profile of this task and add to the job profile.
-                createProfile(curCoordinator);
             } else {
                 throw new LoadException(status.getErrorMsg());
             }
@@ -212,15 +213,6 @@ public class LoadLoadingTask extends LoadTask {
         return jobDeadlineMs - System.currentTimeMillis();
     }
 
-    private void createProfile(Coordinator coord) {
-        if (jobProfile == null) {
-            // No need to gather profile
-            return;
-        }
-        // Summary profile
-        coord.getExecutionProfile().update(beginTime, true);
-    }
-
     @Override
     public void updateRetryInfo() {
         super.updateRetryInfo();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index cea0efc6fe7..45c96bd742a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -127,7 +127,9 @@ public class LoadCommand extends Command implements 
ForwardWithSync {
         if (!Config.enable_nereids_load) {
             throw new AnalysisException("Fallback to legacy planner 
temporary.");
         }
-        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile);
+        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile,
+                ctx.getSessionVariable().profileLevel,
+                ctx.getSessionVariable().getEnablePipelineXEngine());
         profile.getSummaryProfile().setQueryBeginTime();
         if (sourceInfos.size() == 1) {
             plans = ImmutableList.of(new 
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
index 50a6bd6495b..5718e68c6b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -25,8 +25,6 @@ public interface CoordInterface {
 
     public RowBatch getNext() throws Exception;
 
-    public int getInstanceTotalNum();
-
     public void cancel(Types.PPlanFragmentCancelReason cancelReason);
 
     // When call exec or get next data finished, should call this method to 
release
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 1b99da152f9..64b07e518a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -23,9 +23,11 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.Status;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.util.DebugUtil;
@@ -80,7 +82,6 @@ import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TBrokerScanRange;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDescriptorTable;
-import org.apache.doris.thrift.TDetailedReportParams;
 import org.apache.doris.thrift.TErrorTabletInfo;
 import org.apache.doris.thrift.TEsScanRange;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -122,6 +123,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.commons.lang3.tuple.ImmutableTriple;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.logging.log4j.LogManager;
@@ -142,13 +146,13 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 public class Coordinator implements CoordInterface {
     private static final Logger LOG = LogManager.getLogger(Coordinator.class);
@@ -158,6 +162,9 @@ public class Coordinator implements CoordInterface {
     // Random is used to shuffle instances of partitioned
     private static final Random instanceRandom = new SecureRandom();
 
+    private static ExecutorService backendRpcCallbackExecutor = 
ThreadPoolManager.newDaemonProfileThreadPool(32, 100,
+            "backend-rpc-callback", true);
+
     // Overall status of the entire query; set to the first reported fragment 
error
     // status or to CANCELLED, if Cancel() is called.
     Status queryStatus = new Status();
@@ -188,15 +195,12 @@ public class Coordinator implements CoordInterface {
     // coordinator still needs to wait for cleanup on remote fragments (e.g. 
queries
     // with limit)
     // Once this is set to true, errors from remote fragments are ignored.
-    private boolean returnedAllResults;
-
-    private List<RuntimeProfile> fragmentProfile;
+    private boolean returnedAllResults = false;
 
     // populated in computeFragmentExecParams()
     private final Map<PlanFragmentId, FragmentExecParams> 
fragmentExecParamsMap = Maps.newHashMap();
 
     private final List<PlanFragment> fragments;
-    private int instanceTotalNum;
 
     private Map<Long, BackendExecStates> beToExecStates = Maps.newHashMap();
     private Map<Long, PipelineExecContexts> beToPipelineExecCtxs = 
Maps.newHashMap();
@@ -267,6 +271,15 @@ public class Coordinator implements CoordInterface {
 
     private StatsErrorEstimator statsErrorEstimator;
 
+    // A countdown latch to mark the completion of each instance.
+    // use for old pipeline
+    // instance id -> dummy value
+    private MarkedCountDownLatch<TUniqueId, Long> instancesDoneLatch = null;
+
+    // A countdown latch to mark the completion of each fragment. use for 
pipelineX
+    // fragmentid -> backendid
+    private MarkedCountDownLatch<Integer, Long> fragmentsDoneLatch = null;
+
     public void setTWorkloadGroups(List<TPipelineWorkloadGroup> 
tWorkloadGroups) {
         this.tWorkloadGroups = tWorkloadGroups;
     }
@@ -335,7 +348,7 @@ public class Coordinator implements CoordInterface {
         nextInstanceId.setHi(queryId.hi);
         nextInstanceId.setLo(queryId.lo + 1);
         this.assignedRuntimeFilters = planner.getRuntimeFilters();
-        this.executionProfile = new ExecutionProfile(queryId, 
fragments.size());
+        this.executionProfile = new ExecutionProfile(queryId, fragments);
 
     }
 
@@ -357,7 +370,7 @@ public class Coordinator implements CoordInterface {
         this.nextInstanceId = new TUniqueId();
         nextInstanceId.setHi(queryId.hi);
         nextInstanceId.setLo(queryId.lo + 1);
-        this.executionProfile = new ExecutionProfile(queryId, 
fragments.size());
+        this.executionProfile = new ExecutionProfile(queryId, fragments);
     }
 
     private void setFromUserProperty(ConnectContext connectContext) {
@@ -510,11 +523,6 @@ public class Coordinator implements CoordInterface {
         return result;
     }
 
-    @Override
-    public int getInstanceTotalNum() {
-        return instanceTotalNum;
-    }
-
     // Initialize
     private void prepare() {
         for (PlanFragment fragment : fragments) {
@@ -600,7 +608,6 @@ public class Coordinator implements CoordInterface {
         
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId),
 scanRangeNum);
         LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), 
addressToBackendID.keySet());
 
-        executionProfile.markInstances(instanceIds);
         List<TExecPlanFragmentParams> tExecPlanFragmentParams
                 = ((FragmentExecParams) 
this.fragmentExecParamsMap.values().toArray()[0]).toThrift(0);
         TExecPlanFragmentParams fragmentParams = 
tExecPlanFragmentParams.get(0);
@@ -708,11 +715,6 @@ public class Coordinator implements CoordInterface {
             
Env.getCurrentEnv().getProgressManager().addTotalScanNums(String.valueOf(jobId),
 scanRangeNum);
             LOG.info("dispatch load job: {} to {}", 
DebugUtil.printId(queryId), addressToBackendID.keySet());
         }
-        if (enablePipelineXEngine) {
-            executionProfile.markFragments(fragments.size());
-        } else {
-            executionProfile.markInstances(instanceIds);
-        }
 
         if (enablePipelineEngine) {
             sendPipelineCtx();
@@ -775,7 +777,6 @@ public class Coordinator implements CoordInterface {
                 // 1. set up exec states
                 int instanceNum = params.instanceExecParams.size();
                 Preconditions.checkState(instanceNum > 0);
-                instanceTotalNum += instanceNum;
                 List<TExecPlanFragmentParams> tParams = 
params.toThrift(backendIdx);
 
                 // 2. update memory limit for colocate join
@@ -802,8 +803,7 @@ public class Coordinator implements CoordInterface {
                 for (TExecPlanFragmentParams tParam : tParams) {
                     BackendExecState execState =
                             new BackendExecState(fragment.getFragmentId(), 
instanceId++,
-                                    profileFragmentId, tParam, 
this.addressToBackendID,
-                                    executionProfile.getLoadChannelProfile());
+                                    tParam, this.addressToBackendID, 
executionProfile);
                     // Each tParam will set the total number of Fragments that 
need to be executed on the same BE,
                     // and the BE will determine whether all Fragments have 
been executed based on this information.
                     // Notice. load fragment has a small probability that 
FragmentNumOnHost is 0, for unknown reasons.
@@ -870,9 +870,6 @@ public class Coordinator implements CoordInterface {
                 }
                 waitRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send execution start");
             }
-            if (context != null && 
context.getSessionVariable().enableProfile()) {
-                attachInstanceProfileToFragmentProfile();
-            }
         } finally {
             unlock();
         }
@@ -891,8 +888,9 @@ public class Coordinator implements CoordInterface {
             int backendIdx = 0;
             int profileFragmentId = 0;
             beToPipelineExecCtxs.clear();
-            // If #fragments > 1 and BE amount is bigger than 1, use 
twoPhaseExecution with
-            // exec_plan_fragments_prepare and exec_plan_fragments_start,
+            // fragment:backend
+            List<Pair<PlanFragmentId, Long>> backendFragments = 
Lists.newArrayList();
+            // If #fragments >=2, use twoPhaseExecution with 
exec_plan_fragments_prepare and exec_plan_fragments_start,
             // else use exec_plan_fragments directly.
             // we choose #fragments > 1 because in some cases
             // we need ensure that A fragment is already prepared to receive 
data before B fragment sends data.
@@ -918,12 +916,10 @@ public class Coordinator implements CoordInterface {
                     needCheckBackendState = true;
                 }
 
-                Map<TUniqueId, RuntimeProfile> fragmentInstancesMap = new 
HashMap<TUniqueId, RuntimeProfile>();
+                Map<TUniqueId, Boolean> fragmentInstancesMap = new 
HashMap<TUniqueId, Boolean>();
                 for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
                     for (TPipelineInstanceParams instanceParam : 
entry.getValue().local_params) {
-                        String name = "Instance " + 
DebugUtil.printId(instanceParam.fragment_instance_id)
-                                + " (host=" + entry.getKey() + ")";
-                        
fragmentInstancesMap.put(instanceParam.fragment_instance_id, new 
RuntimeProfile(name));
+                        
fragmentInstancesMap.put(instanceParam.fragment_instance_id, false);
                     }
                 }
 
@@ -932,10 +928,10 @@ public class Coordinator implements CoordInterface {
                 // So that we can use one RPC to send all fragment instances 
of a BE.
                 for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry 
: tParams.entrySet()) {
                     Long backendId = 
this.addressToBackendID.get(entry.getKey());
+                    backendFragments.add(Pair.of(fragment.getFragmentId(), 
backendId));
                     PipelineExecContext pipelineExecContext = new 
PipelineExecContext(fragment.getFragmentId(),
-                            profileFragmentId, entry.getValue(), backendId, 
fragmentInstancesMap,
-                            executionProfile.getLoadChannelProfile(), 
this.enablePipelineXEngine,
-                            this.executionProfile);
+                            entry.getValue(), backendId, fragmentInstancesMap,
+                            this.enablePipelineXEngine, executionProfile);
                     // Each tParam will set the total number of Fragments that 
need to be executed on the same BE,
                     // and the BE will determine whether all Fragments have 
been executed based on this information.
                     // Notice. load fragment has a small probability that 
FragmentNumOnHost is 0, for unknown reasons.
@@ -988,6 +984,14 @@ public class Coordinator implements CoordInterface {
                 profileFragmentId += 1;
             } // end for fragments
 
+            // Init the mark done in order to track the finished state of the 
query
+            if (this.enablePipelineXEngine) {
+                fragmentsDoneLatch = new 
MarkedCountDownLatch<>(backendFragments.size());
+                for (Pair<PlanFragmentId, Long> pair : backendFragments) {
+                    fragmentsDoneLatch.addMark(pair.first.asInt(), 
pair.second);
+                }
+            }
+
             // 4. send and wait fragments rpc
             List<Triple<PipelineExecContexts, BackendServiceProxy, 
Future<InternalService.PExecPlanFragmentResult>>>
                     futures = Lists.newArrayList();
@@ -1019,9 +1023,6 @@ public class Coordinator implements CoordInterface {
                 }
                 waitPipelineRpc(futures, this.timeoutDeadline - 
System.currentTimeMillis(), "send execution start");
             }
-            if (context != null && 
context.getSessionVariable().enableProfile()) {
-                attachInstanceProfileToFragmentProfile();
-            }
         } finally {
             unlock();
         }
@@ -1446,8 +1447,10 @@ public class Coordinator implements CoordInterface {
         lock();
         try {
             if (!queryStatus.ok()) {
-                // we can't cancel twice
-                return;
+                // Print an error stack here to know why send cancel again.
+                LOG.warn("Query {} already in abnormal status {}, but received 
cancel again,"
+                        + "so that send cancel to BE again",
+                        DebugUtil.printId(queryId), queryStatus.toString(), 
new Exception());
             } else {
                 queryStatus.setStatus(Status.CANCELLED);
             }
@@ -1459,6 +1462,15 @@ public class Coordinator implements CoordInterface {
         }
     }
 
+    private void cancelLatch() {
+        if (instancesDoneLatch != null) {
+            instancesDoneLatch.countDownToZero(new Status());
+        }
+        if (fragmentsDoneLatch != null) {
+            fragmentsDoneLatch.countDownToZero(new Status());
+        }
+    }
+
     private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
         if (null != receiver) {
             receiver.cancel(cancelReason.toString());
@@ -1468,7 +1480,7 @@ public class Coordinator implements CoordInterface {
             return;
         }
         cancelRemoteFragmentsAsync(cancelReason);
-        executionProfile.onCancel();
+        cancelLatch();
     }
 
     private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason 
cancelReason) {
@@ -1507,6 +1519,12 @@ public class Coordinator implements CoordInterface {
             }
         }
 
+        // Init instancesDoneLatch, it will be used to track if the instances 
has finished for insert stmt
+        instancesDoneLatch = new MarkedCountDownLatch<>(instanceIds.size());
+        for (TUniqueId instanceId : instanceIds) {
+            instancesDoneLatch.addMark(instanceId, -1L /* value is meaningless 
*/);
+        }
+
         // compute multi cast fragment params
         computeMultiCastFragmentParams();
 
@@ -2437,31 +2455,30 @@ public class Coordinator implements CoordInterface {
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
         if (enablePipelineXEngine) {
             PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
-            if (!ctx.updateProfile(params)) {
+            if (ctx == null || !ctx.updatePipelineStatus(params)) {
                 return;
             }
 
-            // print fragment instance profile
-            if (LOG.isDebugEnabled()) {
-                StringBuilder builder = new StringBuilder();
-                ctx.printProfile(builder);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("profile for query_id={} fragment_id={}\n{}",
-                            DebugUtil.printId(queryId),
-                            params.getFragmentId(),
-                            builder.toString());
-                }
-            }
-
             Status status = new Status(params.status);
             // for now, abort the query if we see any error except if the 
error is cancelled
             // and returned_all_results_ is true.
             // (UpdateStatus() initiates cancellation, if it hasn't already 
been initiated)
-            if (!(returnedAllResults && status.isCancelled()) && !status.ok()) 
{
-                LOG.warn("one instance report fail, query_id={} 
instance_id={}, error message: {}",
-                        DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                        status.getErrorMsg());
-                updateStatus(status);
+            if (!status.ok()) {
+                if (returnedAllResults && status.isCancelled()) {
+                    LOG.warn("Query {} has returned all results, 
fragment_id={} instance_id={}, be={}"
+                            + " is reporting failed status {}",
+                            DebugUtil.printId(queryId), params.getFragmentId(),
+                            DebugUtil.printId(params.getFragmentInstanceId()),
+                            params.getBackendId(),
+                            status.toString());
+                } else {
+                    LOG.warn("one instance report fail, query_id={} 
fragment_id={} instance_id={}, be={},"
+                                    + " error message: {}",
+                            DebugUtil.printId(queryId), params.getFragmentId(),
+                            DebugUtil.printId(params.getFragmentInstanceId()),
+                            params.getBackendId(), status.toString());
+                    updateStatus(status);
+                }
             }
             if (params.isSetDeltaUrls()) {
                 updateDeltas(params.getDeltaUrls());
@@ -2489,39 +2506,36 @@ public class Coordinator implements CoordInterface {
             if (ctx.done) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Query {} fragment {} is marked done",
-                            DebugUtil.printId(queryId), ctx.profileFragmentId);
+                            DebugUtil.printId(queryId), ctx.fragmentId);
                 }
-                executionProfile.markOneFragmentDone(ctx.profileFragmentId);
+                fragmentsDoneLatch.markedCountDown(params.getFragmentId(), 
params.getBackendId());
             }
         } else if (enablePipelineEngine) {
             PipelineExecContext ctx = 
pipelineExecContexts.get(Pair.of(params.getFragmentId(), 
params.getBackendId()));
-            if (!ctx.updateProfile(params)) {
+            if (ctx == null || !ctx.updatePipelineStatus(params)) {
                 return;
             }
 
-            // print fragment instance profile
-            if (LOG.isDebugEnabled()) {
-                StringBuilder builder = new StringBuilder();
-                ctx.printProfile(builder);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("profile for query_id={} instance_id={}\n{}",
-                            DebugUtil.printId(queryId),
-                            DebugUtil.printId(params.getFragmentInstanceId()),
-                            builder.toString());
-                }
-            }
-
             Status status = new Status(params.status);
             // for now, abort the query if we see any error except if the 
error is cancelled
             // and returned_all_results_ is true.
             // (UpdateStatus() initiates cancellation, if it hasn't already 
been initiated)
-            if (!(returnedAllResults && status.isCancelled()) && !status.ok()) 
{
-                LOG.warn("one instance report fail, query_id={} fragment_id={} 
instance_id={}, be={},"
-                                + " error message: {}",
-                        DebugUtil.printId(queryId), params.getFragmentId(),
-                        DebugUtil.printId(params.getFragmentInstanceId()),
-                        params.getBackendId(), status.getErrorMsg());
-                updateStatus(status);
+            if (!status.ok()) {
+                if (returnedAllResults && status.isCancelled()) {
+                    LOG.warn("Query {} has returned all results, 
fragment_id={} instance_id={}, be={}"
+                            + " is reporting failed status {}",
+                            DebugUtil.printId(queryId), params.getFragmentId(),
+                            DebugUtil.printId(params.getFragmentInstanceId()),
+                            params.getBackendId(),
+                            status.toString());
+                } else {
+                    LOG.warn("one instance report fail, query_id={} 
fragment_id={} instance_id={}, be={},"
+                                    + " error message: {}",
+                            DebugUtil.printId(queryId), params.getFragmentId(),
+                            DebugUtil.printId(params.getFragmentInstanceId()),
+                            params.getBackendId(), status.toString());
+                    updateStatus(status);
+                }
             }
 
             // params.isDone() should be promised.
@@ -2530,7 +2544,7 @@ public class Coordinator implements CoordInterface {
             // The last report causes the counter to decrease to zero,
             // but it is possible that the report without commit-info 
triggered the commit operation,
             // resulting in the data not being published.
-            if 
(ctx.fragmentInstancesMap.get(params.fragment_instance_id).getIsDone() && 
params.isDone()) {
+            if (ctx.fragmentInstancesMap.get(params.fragment_instance_id) && 
params.isDone()) {
                 if (params.isSetDeltaUrls()) {
                     updateDeltas(params.getDeltaUrls());
                 }
@@ -2556,7 +2570,7 @@ public class Coordinator implements CoordInterface {
                     LOG.debug("Query {} instance {} is marked done",
                             DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()));
                 }
-                
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+                
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Query {} instance {} is not marked done",
@@ -2571,22 +2585,14 @@ public class Coordinator implements CoordInterface {
                 return;
             }
             BackendExecState execState = 
backendExecStates.get(params.backend_num);
-            if (!execState.updateProfile(params)) {
+            if (!execState.updateInstanceStatus(params)) {
+                // Has to return here, to avoid out of order report messages. 
For example,
+                // the first message is done, then we update commit messages, 
but the new
+                // message is running, then we will also update commit 
messages. It will
+                // lead to data corrupt.
                 return;
             }
 
-            // print fragment instance profile
-            if (LOG.isDebugEnabled()) {
-                StringBuilder builder = new StringBuilder();
-                execState.printProfile(builder);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("profile for query_id={} instance_id={}\n{}",
-                            DebugUtil.printId(queryId),
-                            DebugUtil.printId(params.getFragmentInstanceId()),
-                            builder.toString());
-                }
-            }
-
             Status status = new Status(params.status);
             // for now, abort the query if we see any error except if the 
error is cancelled
             // and returned_all_results_ is true.
@@ -2595,11 +2601,11 @@ public class Coordinator implements CoordInterface {
                 if (status.isCancelled() && returnedAllResults) {
                     LOG.warn("Query {} has returned all results, its instance 
{} is reporting failed status {}",
                             DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                            status.getErrorMsg());
+                            status.toString());
                 } else {
                     LOG.warn("Instance {} of query {} report failed status, 
error msg: {}",
                             DebugUtil.printId(queryId), 
DebugUtil.printId(params.getFragmentInstanceId()),
-                            status.getErrorMsg());
+                            status.toString());
                     updateStatus(status);
                 }
             }
@@ -2632,7 +2638,7 @@ public class Coordinator implements CoordInterface {
                 if (params.isSetHivePartitionUpdates()) {
                     
updateHivePartitionUpdates(params.getHivePartitionUpdates());
                 }
-                
executionProfile.markOneInstanceDone(params.getFragmentInstanceId());
+                
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
             }
         }
 
@@ -2667,10 +2673,10 @@ public class Coordinator implements CoordInterface {
             long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime);
             boolean awaitRes = false;
             try {
-                if (enablePipelineXEngine) {
-                    awaitRes = 
executionProfile.awaitAllFragmentsDone(waitTime);
+                if (fragmentsDoneLatch != null) {
+                    awaitRes = fragmentsDoneLatch.await(waitTime, 
TimeUnit.SECONDS);
                 } else {
-                    awaitRes = 
executionProfile.awaitAllInstancesDone(waitTime);
+                    awaitRes = instancesDoneLatch.await(waitTime, 
TimeUnit.SECONDS);
                 }
             } catch (InterruptedException e) {
                 // Do nothing
@@ -2714,10 +2720,10 @@ public class Coordinator implements CoordInterface {
     }
 
     public boolean isDone() {
-        if (enablePipelineXEngine) {
-            return executionProfile.isAllFragmentsDone();
+        if (fragmentsDoneLatch != null) {
+            return fragmentsDoneLatch.getCount() == 0;
         } else {
-            return executionProfile.isAllInstancesDone();
+            return instancesDoneLatch.getCount() == 0;
         }
     }
 
@@ -3024,20 +3030,17 @@ public class Coordinator implements CoordInterface {
         PlanFragmentId fragmentId;
         boolean initiated;
         volatile boolean done;
-        boolean hasCanceled;
-        int profileFragmentId;
-        RuntimeProfile instanceProfile;
-        RuntimeProfile loadChannelProfile;
         TNetworkAddress brpcAddress;
         TNetworkAddress address;
         Backend backend;
         long lastMissingHeartbeatTime = -1;
         TUniqueId instanceId;
+        private boolean hasCancelled = false;
+        private boolean cancelInProcess = false;
 
-        public BackendExecState(PlanFragmentId fragmentId, int instanceId, int 
profileFragmentId,
+        public BackendExecState(PlanFragmentId fragmentId, int instanceId,
                                 TExecPlanFragmentParams rpcParams, 
Map<TNetworkAddress, Long> addressToBackendID,
-                                RuntimeProfile loadChannelProfile) {
-            this.profileFragmentId = profileFragmentId;
+                                ExecutionProfile executionProfile) {
             this.fragmentId = fragmentId;
             this.rpcParams = rpcParams;
             this.initiated = false;
@@ -3047,12 +3050,10 @@ public class Coordinator implements CoordInterface {
             this.address = fi.host;
             this.backend = idToBackend.get(addressToBackendID.get(address));
             this.brpcAddress = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
-
-            String name = "Instance " + DebugUtil.printId(fi.instanceId) + " 
(host=" + address + ")";
-            this.loadChannelProfile = loadChannelProfile;
-            this.instanceProfile = new RuntimeProfile(name);
-            this.hasCanceled = false;
             this.lastMissingHeartbeatTime = 
backend.getLastMissingHeartbeatTime();
+            String profileName = "Instance " + 
DebugUtil.printId(fi.instanceId) + " (host=" + address + ")";
+            RuntimeProfile instanceProfile = new RuntimeProfile(profileName);
+            executionProfile.addInstanceProfile(fragmentId, fi.instanceId, 
instanceProfile);
         }
 
         /**
@@ -3070,19 +3071,12 @@ public class Coordinator implements CoordInterface {
             this.rpcParams.setIsSimplifiedParam(true);
         }
 
-        // update profile.
-        // return true if profile is updated. Otherwise, return false.
-        public synchronized boolean updateProfile(TReportExecStatusParams 
params) {
+        // update the instance status, if it is already finished, then not 
update any more.
+        public synchronized boolean 
updateInstanceStatus(TReportExecStatusParams params) {
             if (this.done) {
                 // duplicate packet
                 return false;
             }
-            if (params.isSetProfile()) {
-                instanceProfile.update(params.profile);
-            }
-            if (params.isSetLoadChannelProfile()) {
-                loadChannelProfile.update(params.loadChannelProfile);
-            }
             this.done = params.done;
             if (statsErrorEstimator != null) {
                 statsErrorEstimator.updateExactReturnedRows(params);
@@ -3090,54 +3084,75 @@ public class Coordinator implements CoordInterface {
             return true;
         }
 
-        public synchronized void printProfile(StringBuilder builder) {
-            this.instanceProfile.computeTimeInProfile();
-            this.instanceProfile.prettyPrint(builder, "");
-        }
-
         // cancel the fragment instance.
         // return true if cancel success. Otherwise, return false
-        public synchronized boolean 
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
-            LOG.warn("cancelRemoteFragments initiated={} done={} 
hasCanceled={} backend: {},"
+        public synchronized void 
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
+            LOG.warn("cancelRemoteFragments initiated={} done={} backend: {},"
                             + " fragment instance id={}, reason: {}",
-                    this.initiated, this.done, this.hasCanceled, 
backend.getId(),
+                    this.initiated, this.done, backend.getId(),
                     DebugUtil.printId(fragmentInstanceId()), 
cancelReason.name());
             try {
                 if (!this.initiated) {
-                    return false;
+                    return;
                 }
                 // don't cancel if it is already finished
                 if (this.done) {
-                    return false;
+                    return;
                 }
-                if (this.hasCanceled) {
-                    return false;
+                if (this.hasCancelled || this.cancelInProcess) {
+                    LOG.info("Fragment instance has already been cancelled {} 
or under cancel {}."
+                            + " initiated={} done={} backend: {},"
+                            + "fragment instance id={}, reason: {}",
+                            this.hasCancelled, this.cancelInProcess,
+                            this.initiated, this.done, backend.getId(),
+                            DebugUtil.printId(fragmentInstanceId()), 
cancelReason.name());
+                    return;
                 }
-
                 try {
-                    
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
-                            fragmentInstanceId(), cancelReason);
+                    
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
+                            
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
+                                    fragmentInstanceId(), cancelReason);
+                    Futures.addCallback(cancelResult, new 
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
+                        public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
+                            cancelInProcess = false;
+                            if (result.hasStatus()) {
+                                Status status = new Status();
+                                status.setPstatus(result.getStatus());
+                                if (status.getErrorCode() == TStatusCode.OK) {
+                                    hasCancelled = true;
+                                } else {
+                                    LOG.warn("Failed to cancel query {} 
instance initiated={} done={} backend: {},"
+                                            + "fragment instance id={}, 
reason: {}",
+                                            DebugUtil.printId(queryId), 
initiated, done, backend.getId(),
+                                            
DebugUtil.printId(fragmentInstanceId()), status.toString());
+                                }
+                            }
+                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
+                                    + "fragment instance id={}, reason: {}",
+                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
+                                    DebugUtil.printId(fragmentInstanceId()), 
"without status");
+                        }
+
+                        public void onFailure(Throwable t) {
+                            cancelInProcess = false;
+                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
+                                    + "fragment instance id={}, reason: {}",
+                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
+                                    DebugUtil.printId(fragmentInstanceId()), 
cancelReason.name(), t);
+                        }
+                    }, backendRpcCallbackExecutor);
+                    cancelInProcess = true;
                 } catch (RpcException e) {
                     LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddress.getHostname(),
                             brpcAddress.getPort());
                     
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), 
e.getMessage());
                 }
 
-                this.hasCanceled = true;
             } catch (Exception e) {
                 LOG.warn("catch a exception", e);
-                return false;
-            }
-            return true;
-        }
-
-        public synchronized boolean computeTimeInProfile(int maxFragmentId) {
-            if (this.profileFragmentId < 0 || this.profileFragmentId > 
maxFragmentId) {
-                LOG.warn("profileFragmentId {} should be in [0, {})", 
profileFragmentId, maxFragmentId);
-                return false;
+                return;
             }
-            instanceProfile.computeTimeInProfile();
-            return true;
+            return;
         }
 
         public boolean isBackendStateHealthy() {
@@ -3163,16 +3178,12 @@ public class Coordinator implements CoordInterface {
         TPipelineFragmentParams rpcParams;
         PlanFragmentId fragmentId;
         boolean initiated;
-        volatile boolean done;
-        boolean hasCanceled;
+        boolean done;
         // use for pipeline
-        Map<TUniqueId, RuntimeProfile> fragmentInstancesMap;
+        Map<TUniqueId, Boolean> fragmentInstancesMap;
         // use for pipelineX
-        List<RuntimeProfile> taskProfile;
 
         boolean enablePipelineX;
-        RuntimeProfile loadChannelProfile;
-        int profileFragmentId;
         TNetworkAddress brpcAddress;
         TNetworkAddress address;
         Backend backend;
@@ -3180,19 +3191,17 @@ public class Coordinator implements CoordInterface {
         long profileReportProgress = 0;
         long beProcessEpoch = 0;
         private final int numInstances;
-        final ExecutionProfile executionProfile;
+        private boolean hasCancelled = false;
+        private boolean cancelInProcess = false;
 
-        public PipelineExecContext(PlanFragmentId fragmentId, int 
profileFragmentId,
+        public PipelineExecContext(PlanFragmentId fragmentId,
                 TPipelineFragmentParams rpcParams, Long backendId,
-                Map<TUniqueId, RuntimeProfile> fragmentInstancesMap,
-                RuntimeProfile loadChannelProfile, boolean enablePipelineX, 
final ExecutionProfile executionProfile) {
-            this.profileFragmentId = profileFragmentId;
+                Map<TUniqueId, Boolean> fragmentInstancesMap,
+                boolean enablePipelineX, ExecutionProfile executionProfile) {
             this.fragmentId = fragmentId;
             this.rpcParams = rpcParams;
             this.numInstances = rpcParams.local_params.size();
             this.fragmentInstancesMap = fragmentInstancesMap;
-            this.taskProfile = new ArrayList<RuntimeProfile>();
-            this.loadChannelProfile = loadChannelProfile;
 
             this.initiated = false;
             this.done = false;
@@ -3202,27 +3211,18 @@ public class Coordinator implements CoordInterface {
             this.brpcAddress = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
             this.beProcessEpoch = backend.getProcessEpoch();
 
-            this.hasCanceled = false;
             this.lastMissingHeartbeatTime = 
backend.getLastMissingHeartbeatTime();
             this.enablePipelineX = enablePipelineX;
-            this.executionProfile = executionProfile;
-            if (enablePipelineX) {
-                executionProfile.addFragments(profileFragmentId);
-            }
-        }
-
-        public Stream<RuntimeProfile> profileStream() {
-            if (enablePipelineX) {
-                return taskProfile.stream();
+            if (this.enablePipelineX) {
+                executionProfile.addFragmentBackend(fragmentId, backendId);
+            } else {
+                for (TPipelineInstanceParams instanceParam : 
rpcParams.local_params) {
+                    String profileName = "Instance " + 
DebugUtil.printId(instanceParam.fragment_instance_id)
+                            + " (host=" + address + ")";
+                    executionProfile.addInstanceProfile(fragmentId, 
instanceParam.fragment_instance_id,
+                        new RuntimeProfile(profileName));
+                }
             }
-            return fragmentInstancesMap.values().stream();
-        }
-
-        public void attachPipelineProfileToFragmentProfile() {
-            profileStream()
-                    .forEach(p -> 
executionProfile.addInstanceProfile(this.profileFragmentId, p));
-            executionProfile.addMultiBeProfileByPipelineX(profileFragmentId, 
address,
-                    taskProfile);
         }
 
         /**
@@ -3243,46 +3243,33 @@ public class Coordinator implements CoordInterface {
 
         // update profile.
         // return true if profile is updated. Otherwise, return false.
-        public synchronized boolean updateProfile(TReportExecStatusParams 
params) {
+        // Has to use synchronized to ensure there are not concurrent update 
threads. Or the done
+        // state maybe update wrong and will lose data. see 
https://github.com/apache/doris/pull/29802/files.
+        public synchronized boolean 
updatePipelineStatus(TReportExecStatusParams params) {
+            // The fragment or instance is not finished, not need update
+            if (!params.done) {
+                return false;
+            }
             if (enablePipelineX) {
-                taskProfile.clear();
-                int pipelineIdx = 0;
-                for (TDetailedReportParams param : params.detailed_report) {
-                    String name = "Pipeline :" + pipelineIdx + " "
-                            + " (host=" + address + ")";
-                    RuntimeProfile profile = new RuntimeProfile(name);
-                    taskProfile.add(profile);
-                    if (param.isSetProfile()) {
-                        profile.update(param.profile);
-                    }
-                    if (params.done) {
-                        profile.setIsDone(true);
-                    }
-                    pipelineIdx++;
-                }
-                if (params.isSetLoadChannelProfile()) {
-                    loadChannelProfile.update(params.loadChannelProfile);
-                }
-                this.done = params.done;
-                attachPipelineProfileToFragmentProfile();
-                return this.done;
-            } else {
-                RuntimeProfile profile = 
fragmentInstancesMap.get(params.fragment_instance_id);
-                if (params.done && profile.getIsDone()) {
+                if (this.done) {
                     // duplicate packet
                     return false;
                 }
-
-                if (params.isSetProfile()) {
-                    profile.update(params.profile);
-                }
-                if (params.isSetLoadChannelProfile()) {
-                    loadChannelProfile.update(params.loadChannelProfile);
+                this.done = true;
+                return true;
+            } else {
+                // could not find the related instances, not update and return 
false, to indicate
+                // that the caller should not update any more.
+                if 
(!fragmentInstancesMap.containsKey(params.fragment_instance_id)) {
+                    return false;
                 }
-                if (params.done) {
-                    profile.setIsDone(true);
-                    profileReportProgress++;
+                Boolean instanceDone = 
fragmentInstancesMap.get(params.fragment_instance_id);
+                if (instanceDone) {
+                    // duplicate packet
+                    return false;
                 }
+                fragmentInstancesMap.put(params.fragment_instance_id, true);
+                profileReportProgress++;
                 if (profileReportProgress == numInstances) {
                     this.done = true;
                 }
@@ -3290,32 +3277,57 @@ public class Coordinator implements CoordInterface {
             }
         }
 
-        public synchronized void printProfile(StringBuilder builder) {
-            this.profileStream().forEach(p -> {
-                p.computeTimeInProfile();
-                p.prettyPrint(builder, "");
-            });
-        }
-
-        // cancel all fragment instances.
-        // return true if cancel success. Otherwise, return false
-
-        private synchronized boolean 
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
-            for (RuntimeProfile profile : taskProfile) {
-                profile.setIsCancel(true);
-            }
+        // Just send the cancel message to BE, not care about the result, 
because there is no retry
+        // logic in upper logic.
+        private synchronized void 
cancelFragment(Types.PPlanFragmentCancelReason cancelReason) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("cancelRemoteFragments initiated={} done={} 
hasCanceled={} backend: {},"
+                LOG.debug("cancelRemoteFragments initiated={} done={} backend: 
{},"
                         + " fragment id={} query={}, reason: {}",
-                        this.initiated, this.done, this.hasCanceled, 
backend.getId(),
-                        this.profileFragmentId,
+                        this.initiated, this.done, backend.getId(),
+                        this.fragmentId,
                         DebugUtil.printId(queryId), cancelReason.name());
             }
+
+            if (this.hasCancelled || this.cancelInProcess) {
+                LOG.info("Frangment has already been cancelled. Query {} 
backend: {}, fragment id={}",
+                        DebugUtil.printId(queryId), backend.getId(), 
this.fragmentId);
+                return;
+            }
             try {
                 try {
-                    
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
+                    
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
+                            
BackendServiceProxy.getInstance().cancelPipelineXPlanFragmentAsync(brpcAddress,
                             this.fragmentId, queryId, cancelReason);
-                    this.hasCanceled = true;
+                    Futures.addCallback(cancelResult, new 
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
+                        public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
+                            cancelInProcess = false;
+                            if (result.hasStatus()) {
+                                Status status = new Status();
+                                status.setPstatus(result.getStatus());
+                                if (status.getErrorCode() == TStatusCode.OK) {
+                                    hasCancelled = true;
+                                } else {
+                                    LOG.warn("Failed to cancel query {} 
instance initiated={} done={} backend: {},"
+                                            + "fragment id={}, reason: {}",
+                                            DebugUtil.printId(queryId), 
initiated, done, backend.getId(),
+                                            fragmentId, status.toString());
+                                }
+                            }
+                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
+                                    + "fragment id={}, reason: {}",
+                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
+                                    fragmentId, "without status");
+                        }
+
+                        public void onFailure(Throwable t) {
+                            cancelInProcess = false;
+                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
+                                    + "fragment id={}, reason: {}",
+                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
+                                    fragmentId, cancelReason.name(), t);
+                        }
+                    }, backendRpcCallbackExecutor);
+                    cancelInProcess = true;
                 } catch (RpcException e) {
                     LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddress.getHostname(),
                             brpcAddress.getPort());
@@ -3323,78 +3335,91 @@ public class Coordinator implements CoordInterface {
                 }
             } catch (Exception e) {
                 LOG.warn("catch a exception", e);
-                return false;
+                return;
             }
-            return true;
+            return;
         }
 
-        private synchronized boolean 
cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
+        // Just send the cancel logic to BE, not care about the result, and 
there is no retry logic
+        // in upper logic.
+        private synchronized void 
cancelInstance(Types.PPlanFragmentCancelReason cancelReason) {
             for (TPipelineInstanceParams localParam : rpcParams.local_params) {
-                LOG.warn("cancelRemoteFragments initiated={} done={} 
hasCanceled={} backend:{},"
+                LOG.warn("cancelRemoteFragments initiated={} done={} 
backend:{},"
                         + " fragment instance id={} query={}, reason: {}",
-                        this.initiated, this.done, this.hasCanceled, 
backend.getId(),
+                        this.initiated, this.done, backend.getId(),
                         DebugUtil.printId(localParam.fragment_instance_id),
                         DebugUtil.printId(queryId), cancelReason.name());
-
-                RuntimeProfile profile = 
fragmentInstancesMap.get(localParam.fragment_instance_id);
-                if (profile.getIsDone() || profile.getIsCancel()) {
-                    continue;
+                if (this.hasCancelled || this.cancelInProcess) {
+                    LOG.info("fragment instance has already been cancelled {} 
or in process {}. "
+                            + "initiated={} done={} backend:{},"
+                            + " fragment instance id={} query={}, reason: {}",
+                            this.hasCancelled, this.cancelInProcess,
+                            this.initiated, this.done, backend.getId(),
+                            DebugUtil.printId(localParam.fragment_instance_id),
+                            DebugUtil.printId(queryId), cancelReason.name());
+                    return;
                 }
-
-                this.hasCanceled = true;
                 try {
-                    try {
-                        
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
-                                localParam.fragment_instance_id, cancelReason);
-                    } catch (RpcException e) {
-                        LOG.warn("cancel plan fragment get a exception, 
address={}:{}", brpcAddress.getHostname(),
-                                brpcAddress.getPort());
-                        
SimpleScheduler.addToBlacklist(addressToBackendID.get(brpcAddress), 
e.getMessage());
-                    }
+                    
ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelResult =
+                            
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
+                                    localParam.fragment_instance_id, 
cancelReason);
+                    Futures.addCallback(cancelResult, new 
FutureCallback<InternalService.PCancelPlanFragmentResult>() {
+                        public void 
onSuccess(InternalService.PCancelPlanFragmentResult result) {
+                            cancelInProcess = false;
+                            if (result.hasStatus()) {
+                                Status status = new Status();
+                                status.setPstatus(result.getStatus());
+                                if (status.getErrorCode() == TStatusCode.OK) {
+                                    hasCancelled = true;
+                                } else {
+                                    LOG.warn("Failed to cancel query {} 
instance initiated={} done={} backend: {},"
+                                            + "fragment instance id={}, 
reason: {}",
+                                            DebugUtil.printId(queryId), 
initiated, done, backend.getId(),
+                                            
DebugUtil.printId(localParam.fragment_instance_id), status.toString());
+                                }
+                            }
+                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
+                                    + "fragment instance id={}, reason: {}",
+                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
+                                    
DebugUtil.printId(localParam.fragment_instance_id), "without status");
+                        }
+
+                        public void onFailure(Throwable t) {
+                            cancelInProcess = false;
+                            LOG.warn("Failed to cancel query {} instance 
initiated={} done={} backend: {},"
+                                    + "fragment instance id={}, reason: {}",
+                                    DebugUtil.printId(queryId), initiated, 
done, backend.getId(),
+                                    
DebugUtil.printId(localParam.fragment_instance_id), cancelReason.name(), t);
+                        }
+                    }, backendRpcCallbackExecutor);
+                    cancelInProcess = true;
                 } catch (Exception e) {
                     LOG.warn("catch a exception", e);
-                    return false;
+                    return;
                 }
             }
-            if (!this.hasCanceled) {
-                return false;
-            }
-            for (int i = 0; i < this.numInstances; i++) {
-                
fragmentInstancesMap.get(rpcParams.local_params.get(i).fragment_instance_id).setIsCancel(true);
-            }
-            return true;
+            return;
         }
 
         /// TODO: refactor rpcParams
-        public synchronized boolean 
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
+        public synchronized void 
cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
             if (!this.initiated) {
                 LOG.warn("Query {}, ccancel before initiated", 
DebugUtil.printId(queryId));
-                return false;
+                return;
             }
             // don't cancel if it is already finished
             if (this.done) {
                 LOG.warn("Query {}, cancel after finished", 
DebugUtil.printId(queryId));
-                return false;
-            }
-            if (this.hasCanceled) {
-                LOG.warn("Query {}, cancel after cancelled", 
DebugUtil.printId(queryId));
-                return false;
+                return;
             }
 
             if (this.enablePipelineX) {
-                return cancelFragment(cancelReason);
+                cancelFragment(cancelReason);
+                return;
             } else {
-                return cancelInstance(cancelReason);
-            }
-        }
-
-        public synchronized boolean computeTimeInProfile(int maxFragmentId) {
-            if (this.profileFragmentId < 0 || this.profileFragmentId > 
maxFragmentId) {
-                LOG.warn("profileFragmentId {} should be in [0, {})", 
profileFragmentId, maxFragmentId);
-                return false;
+                cancelInstance(cancelReason);
+                return;
             }
-            // profile.computeTimeInProfile();
-            return true;
         }
 
         public boolean isBackendStateHealthy() {
@@ -4020,22 +4045,6 @@ public class Coordinator implements CoordInterface {
         return result;
     }
 
-    private void attachInstanceProfileToFragmentProfile() {
-        if (enablePipelineEngine) {
-            for (PipelineExecContext ctx : pipelineExecContexts.values()) {
-                if (!enablePipelineXEngine) {
-                    ctx.profileStream()
-                            .forEach(p -> 
executionProfile.addInstanceProfile(ctx.profileFragmentId, p));
-                }
-            }
-        } else {
-            for (BackendExecState backendExecState : backendExecStates) {
-                
executionProfile.addInstanceProfile(backendExecState.profileFragmentId,
-                        backendExecState.instanceProfile);
-            }
-        }
-    }
-
     // Runtime filter target fragment instance param
     static class FRuntimeFilterTargetParam {
         public TUniqueId targetFragmentInstanceId;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index 4b0302cc403..f41e9a4d896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -168,12 +168,6 @@ public class PointQueryExec implements CoordInterface {
         requestBuilder.addKeyTuples(kBuilder);
     }
 
-    @Override
-    public int getInstanceTotalNum() {
-        // TODO
-        return 1;
-    }
-
     @Override
     public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
         // Do nothing
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
index 44999ecef64..c5aff2c9d5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessor.java
@@ -31,8 +31,6 @@ public interface QeProcessor {
 
     TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, 
TNetworkAddress beAddr);
 
-    void registerQuery(TUniqueId queryId, Coordinator coord) throws 
UserException;
-
     void registerQuery(TUniqueId queryId, QeProcessorImpl.QueryInfo info) 
throws UserException;
 
     void registerInstances(TUniqueId queryId, Integer instancesNum) throws 
UserException;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 03144fc797c..a62f1b66f08 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.ExecutionProfile;
 import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.resource.workloadgroup.QueueToken.TokenState;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -53,6 +54,7 @@ public final class QeProcessorImpl implements QeProcessor {
 
     private Map<TUniqueId, Integer> queryToInstancesNum;
     private Map<String, AtomicInteger> userToInstancesCount;
+    private ExecutorService writeProfileExecutor;
 
     public static final QeProcessor INSTANCE;
 
@@ -60,15 +62,13 @@ public final class QeProcessorImpl implements QeProcessor {
         INSTANCE = new QeProcessorImpl();
     }
 
-    private ExecutorService writeProfileExecutor;
-
     private QeProcessorImpl() {
         coordinatorMap = new ConcurrentHashMap<>();
-        // write profile to ProfileManager when query is running.
-        writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(1, 
100,
-                "profile-write-pool", true);
         queryToInstancesNum = new ConcurrentHashMap<>();
         userToInstancesCount = new ConcurrentHashMap<>();
+        // write profile to ProfileManager when query is running.
+        writeProfileExecutor = ThreadPoolManager.newDaemonProfileThreadPool(3, 
100,
+                "profile-write-pool", true);
     }
 
     @Override
@@ -90,11 +90,6 @@ public final class QeProcessorImpl implements QeProcessor {
         return res;
     }
 
-    @Override
-    public void registerQuery(TUniqueId queryId, Coordinator coord) throws 
UserException {
-        registerQuery(queryId, new QueryInfo(coord));
-    }
-
     @Override
     public void registerQuery(TUniqueId queryId, QueryInfo info) throws 
UserException {
         if (LOG.isDebugEnabled()) {
@@ -104,6 +99,10 @@ public final class QeProcessorImpl implements QeProcessor {
         if (result != null) {
             throw new UserException("queryId " + queryId + " already exists");
         }
+
+        // Should add the execution profile to profile manager, BE will report 
the profile to FE and FE
+        // will update it in ProfileManager
+        
ProfileManager.getInstance().addExecutionProfile(info.getCoord().getExecutionProfile());
     }
 
     @Override
@@ -145,7 +144,18 @@ public final class QeProcessorImpl implements QeProcessor {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Deregister query id {}", 
DebugUtil.printId(queryId));
             }
-
+            ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(queryId);
+            if (executionProfile != null) {
+                
executionProfile.setQueryFinishTime(System.currentTimeMillis());
+                if (queryInfo.connectContext != null) {
+                    long autoProfileThresholdMs = queryInfo.connectContext
+                            .getSessionVariable().getAutoProfileThresholdMs();
+                    if (autoProfileThresholdMs > 0 && 
System.currentTimeMillis() - queryInfo.getStartExecTime()
+                            < autoProfileThresholdMs) {
+                        
ProfileManager.getInstance().removeProfile(executionProfile.getSummaryProfile().getProfileId());
+                    }
+                }
+            }
             if (queryInfo.getConnectContext() != null
                     && 
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
             ) {
@@ -187,7 +197,7 @@ public final class QeProcessorImpl implements QeProcessor {
                     
.connId(String.valueOf(context.getConnectionId())).db(context.getDatabase())
                     .catalog(context.getDefaultCatalog())
                     
.fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos())
-                    
.profile(info.getCoord().getExecutionProfile().getExecutionProfile())
+                    .profile(info.getCoord().getExecutionProfile().getRoot())
                     
.isReportSucc(context.getSessionVariable().enableProfile()).build();
             querySet.put(queryIdStr, item);
         }
@@ -196,13 +206,25 @@ public final class QeProcessorImpl implements QeProcessor 
{
 
     @Override
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams 
params, TNetworkAddress beAddr) {
-        if (params.isSetProfile()) {
+        if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
             LOG.info("ReportExecStatus(): fragment_instance_id={}, query 
id={}, backend num: {}, ip: {}",
                     DebugUtil.printId(params.fragment_instance_id), 
DebugUtil.printId(params.query_id),
                     params.backend_num, beAddr);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("params: {}", params);
             }
+            ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(params.query_id);
+            if (executionProfile != null) {
+                // Update profile may cost a lot of time, use a seperate pool 
to deal with it.
+                writeProfileExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        executionProfile.updateProfile(params, beAddr);
+                    }
+                });
+            } else {
+                LOG.info("Could not find execution profile with query id {}", 
DebugUtil.printId(params.query_id));
+            }
         }
         final TReportExecStatusResult result = new TReportExecStatusResult();
 
@@ -229,12 +251,9 @@ public final class QeProcessorImpl implements QeProcessor {
         }
         try {
             info.getCoord().updateFragmentExecStatus(params);
-            if (params.isSetProfile()) {
-                writeProfileExecutor.submit(new WriteProfileTask(params, 
info));
-            }
         } catch (Exception e) {
             LOG.warn("Exception during handle report, response: {}, query: {}, 
instance: {}", result.toString(),
-                    DebugUtil.printId(params.query_id), 
DebugUtil.printId(params.fragment_instance_id));
+                    DebugUtil.printId(params.query_id), 
DebugUtil.printId(params.fragment_instance_id), e);
             return result;
         }
         result.setStatus(new TStatus(TStatusCode.OK));
@@ -266,6 +285,7 @@ public final class QeProcessorImpl implements QeProcessor {
         private final ConnectContext connectContext;
         private final Coordinator coord;
         private final String sql;
+        private long registerTimeMs = 0L;
 
         // from Export, Pull load, Insert
         public QueryInfo(Coordinator coord) {
@@ -277,6 +297,7 @@ public final class QeProcessorImpl implements QeProcessor {
             this.connectContext = connectContext;
             this.coord = coord;
             this.sql = sql;
+            this.registerTimeMs = System.currentTimeMillis();
         }
 
         public ConnectContext getConnectContext() {
@@ -295,7 +316,7 @@ public final class QeProcessorImpl implements QeProcessor {
             if (coord.getQueueToken() != null) {
                 return coord.getQueueToken().getQueueEndTime();
             }
-            return -1;
+            return registerTimeMs;
         }
 
         public long getQueueStartTime() {
@@ -319,26 +340,4 @@ public final class QeProcessorImpl implements QeProcessor {
             return null;
         }
     }
-
-    private class WriteProfileTask implements Runnable {
-        private TReportExecStatusParams params;
-
-        private QueryInfo queryInfo;
-
-        WriteProfileTask(TReportExecStatusParams params, QueryInfo queryInfo) {
-            this.params = params;
-            this.queryInfo = queryInfo;
-        }
-
-        @Override
-        public void run() {
-            QueryInfo info = coordinatorMap.get(params.query_id);
-            if (info == null) {
-                return;
-            }
-
-            ExecutionProfile executionProfile = 
info.getCoord().getExecutionProfile();
-            executionProfile.update(-1, false);
-        }
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7ae0b9c6301..20a9ad42bd1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -84,6 +84,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String MAX_EXECUTION_TIME = "max_execution_time";
     public static final String INSERT_TIMEOUT = "insert_timeout";
     public static final String ENABLE_PROFILE = "enable_profile";
+    public static final String AUTO_PROFILE_THRESHOLD_MS = 
"auto_profile_threshold_ms";
     public static final String SQL_MODE = "sql_mode";
     public static final String WORKLOAD_VARIABLE = "workload_group";
     public static final String RESOURCE_VARIABLE = "resource_group";
@@ -629,6 +630,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
 
+    // if true, need report to coordinator when plan fragment execute 
successfully.
+    @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
+    public int autoProfileThresholdMs = -1;
+
     @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
     public boolean runtimeFilterPruneForExternal = true;
 
@@ -1969,6 +1974,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableProfile;
     }
 
+    public int getAutoProfileThresholdMs() {
+        return this.autoProfileThresholdMs;
+    }
+
     public boolean enableSingleDistinctColumnOpt() {
         return enableSingleDistinctColumnOpt;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f31fe76bca4..6236009a7ad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -220,7 +220,6 @@ public class StmtExecutor {
     private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0);
     public static final int MAX_DATA_TO_SEND_FOR_TXN = 100;
     public static final String NULL_VALUE_FOR_LOAD = "\\N";
-    private final Object writeProfileLock = new Object();
     private ConnectContext context;
     private final StatementContext statementContext;
     private MysqlSerializer serializer;
@@ -260,7 +259,9 @@ public class StmtExecutor {
         this.isProxy = isProxy;
         this.statementContext = new StatementContext(context, originStmt);
         this.context.setStatementContext(statementContext);
-        this.profile = new Profile("Query", 
this.context.getSessionVariable().enableProfile);
+        this.profile = new Profile("Query", 
this.context.getSessionVariable().enableProfile,
+                this.context.getSessionVariable().profileLevel,
+                this.context.getSessionVariable().getEnablePipelineXEngine());
     }
 
     // for test
@@ -290,7 +291,8 @@ public class StmtExecutor {
             this.statementContext.setParsedStatement(parsedStmt);
         }
         this.context.setStatementContext(statementContext);
-        this.profile = new Profile("Query", 
context.getSessionVariable().enableProfile());
+        this.profile = new Profile("Query", 
context.getSessionVariable().enableProfile(),
+                context.getSessionVariable().profileLevel, 
context.getSessionVariable().getEnablePipelineXEngine());
     }
 
     public static InternalService.PDataRow getRowStringValue(List<Expr> cols) 
throws UserException {
@@ -993,9 +995,7 @@ public class StmtExecutor {
         // and ensure the sql is finished normally. For example, if update 
profile
         // failed, the insert stmt should be success
         try {
-            profile.update(context.startTime, getSummaryInfo(isFinished), 
isFinished,
-                    context.getSessionVariable().profileLevel, this.planner,
-                    context.getSessionVariable().getEnablePipelineXEngine());
+            profile.updateSummary(context.startTime, 
getSummaryInfo(isFinished), isFinished, this.planner);
         } catch (Throwable t) {
             LOG.warn("failed to update profile, ingore this error", t);
         }
@@ -1600,9 +1600,9 @@ public class StmtExecutor {
                     
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
         } else {
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
+            profile.addExecutionProfile(coord.getExecutionProfile());
             QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                     new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
-            profile.addExecutionProfile(coord.getExecutionProfile());
             coordBase = coord;
         }
 
@@ -1610,35 +1610,10 @@ public class StmtExecutor {
             coordBase.exec();
             profile.getSummaryProfile().setQueryScheduleFinishTime();
             updateProfile(false);
-            if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
-                try {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Start to execute fragment. user: {}, db: 
{}, sql: {}, fragment instance num: {}",
-                                context.getQualifiedUser(), 
context.getDatabase(),
-                                
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
-                                coordBase.getInstanceTotalNum());
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Fail to print fragment concurrency for Query.", 
e);
-                }
-            }
 
             if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) 
{
                 Preconditions.checkState(!context.isReturnResultFromLocal());
                 profile.getSummaryProfile().setTempStartTime();
-                if (coordBase.getInstanceTotalNum() > 1 && 
LOG.isDebugEnabled()) {
-                    try {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Finish to execute fragment. user: {}, 
db: {}, sql: {}, "
-                                            + "fragment instance num: {}",
-                                    context.getQualifiedUser(), 
context.getDatabase(),
-                                    
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
-                                    coordBase.getInstanceTotalNum());
-                        }
-                    } catch (Exception e) {
-                        LOG.warn("Fail to print fragment concurrency for 
Query.", e);
-                    }
-                }
                 return;
             }
 
@@ -1723,18 +1698,6 @@ public class StmtExecutor {
             throw e;
         } finally {
             coordBase.close();
-            if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
-                try {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Finish to execute fragment. user: {}, db: 
{}, sql: {}, fragment instance num: {}",
-                                context.getQualifiedUser(), 
context.getDatabase(),
-                                
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
-                                coordBase.getInstanceTotalNum());
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Fail to print fragment concurrency for Query.", 
e);
-                }
-            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 50c24a42330..3f4bb846767 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -22,6 +22,7 @@ import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.PBackendServiceGrpc;
 import org.apache.doris.thrift.TNetworkAddress;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.ConnectivityState;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
@@ -82,7 +83,7 @@ public class BackendServiceClient {
                 .execPlanFragmentStart(request);
     }
 
-    public Future<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(
+    public ListenableFuture<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(
             InternalService.PCancelPlanFragmentRequest request) {
         return stub.cancelPlanFragment(request);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index af21194263f..5a89614bab7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -36,6 +36,7 @@ import org.apache.doris.thrift.TPipelineFragmentParamsList;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.ByteString;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -225,7 +226,7 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(TNetworkAddress address,
+    public ListenableFuture<InternalService.PCancelPlanFragmentResult> 
cancelPlanFragmentAsync(TNetworkAddress address,
             TUniqueId finstId, Types.PPlanFragmentCancelReason cancelReason) 
throws RpcException {
         final InternalService.PCancelPlanFragmentRequest pRequest =
                 InternalService.PCancelPlanFragmentRequest.newBuilder()
@@ -241,8 +242,8 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PCancelPlanFragmentResult> 
cancelPipelineXPlanFragmentAsync(TNetworkAddress address,
-            PlanFragmentId fragmentId, TUniqueId queryId,
+    public ListenableFuture<InternalService.PCancelPlanFragmentResult> 
cancelPipelineXPlanFragmentAsync(
+            TNetworkAddress address, PlanFragmentId fragmentId, TUniqueId 
queryId,
             Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
         final InternalService.PCancelPlanFragmentRequest pRequest = 
InternalService.PCancelPlanFragmentRequest
                 .newBuilder()
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
index 15b4175759c..56ed66c0504 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
@@ -98,7 +98,7 @@ public class RuntimeProfileTest {
 
     @Test
     public void testCounter() {
-        RuntimeProfile profile = new RuntimeProfile();
+        RuntimeProfile profile = new RuntimeProfile("test counter");
         profile.addCounter("key", TUnit.UNIT, "");
         Assert.assertNotNull(profile.getCounterMap().get("key"));
         Assert.assertNull(profile.getCounterMap().get("key2"));
diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
index 29c0adae124..cd9e0cb048a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java
@@ -33,6 +33,7 @@ import org.apache.doris.analysis.UseStmt;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.profile.Profile;
 import org.apache.doris.datasource.InternalCatalog;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.MysqlChannel;
@@ -172,7 +173,8 @@ public class StmtExecutorTest {
     public void testSelect(@Mocked QueryStmt queryStmt,
                            @Mocked SqlParser parser,
                            @Mocked OriginalPlanner planner,
-                           @Mocked Coordinator coordinator) throws Exception {
+                           @Mocked Coordinator coordinator,
+                           @Mocked Profile profile) throws Exception {
         Env env = Env.getCurrentEnv();
         Deencapsulation.setField(env, "canRead", new AtomicBoolean(true));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to