This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 23cddd0e24a [refactor][profile] log enhancement and minor change of 
profile (#46564)
23cddd0e24a is described below

commit 23cddd0e24ada257cb58b52907146fef728dc723
Author: zhiqiang <hezhiqi...@selectdb.com>
AuthorDate: Mon Jan 13 09:48:26 2025 +0800

    [refactor][profile] log enhancement and minor change of profile (#46564)
---
 be/src/runtime/query_context.cpp                   |   5 +-
 be/src/runtime/runtime_query_statistics_mgr.cpp    |   6 +-
 .../apache/doris/common/proc/BackendsProcDir.java  |   2 +-
 .../common/proc/CurrentQueryInfoProvider.java      |   4 +-
 .../doris/common/{util => profile}/AggCounter.java |   2 +-
 .../doris/common/{util => profile}/Counter.java    |   2 +-
 .../doris/common/profile/ExecutionProfile.java     |  12 +-
 .../org/apache/doris/common/profile/Profile.java   |  64 +++--
 .../doris/common/profile/ProfileManager.java       | 119 ++++++---
 .../common/{util => profile}/RuntimeProfile.java   |   4 +-
 .../doris/common/profile/SummaryProfile.java       |   5 +-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   2 +-
 .../nereids/trees/plans/commands/LoadCommand.java  |   3 +-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |  34 ++-
 .../org/apache/doris/qe/QueryStatisticsItem.java   |   2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   4 +-
 .../doris/common/profile/ProfileManagerTest.java   | 277 +++++++++++++++++++++
 .../{util => profile}/ProfilePersistentTest.java   |   8 +-
 .../{util => profile}/RuntimeProfileTest.java      |   2 +-
 .../pipeline/cloud_p0/conf/be_custom.conf          |   2 +
 .../pipeline/cloud_p0/conf/fe_custom.conf          |   5 +-
 regression-test/pipeline/p0/conf/be.conf           |   2 +-
 regression-test/pipeline/p0/conf/fe.conf           |   5 +
 23 files changed, 468 insertions(+), 103 deletions(-)

diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c777c8100ef..1ff08f7d9bb 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -351,8 +351,9 @@ void QueryContext::add_fragment_profile(
 #endif
 
     std::lock_guard<std::mutex> l(_profile_mutex);
-    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
-             print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+    VLOG_ROW << fmt::format(
+            "Query add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+            print_id(this->_query_id), fragment_id, pipeline_profiles.size());
 
     _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
 
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp 
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 93d5256cad7..ebcaf30eab1 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -65,6 +65,8 @@ static Status _do_report_exec_stats_rpc(const 
TNetworkAddress& coor_addr,
         return Status::RpcError("Client rpc client failed");
     }
 
+    VLOG_DEBUG << "Sending profile";
+
     try {
         try {
             rpc_client->reportExecStatus(res, req);
@@ -272,13 +274,13 @@ void RuntimeQueryStatisticsMgr::register_fragment_profile(
 void RuntimeQueryStatisticsMgr::_report_query_profiles_function() {
     decltype(_profile_map) profile_copy;
     decltype(_load_channel_profile_map) load_channel_profile_copy;
-
+    VLOG_DEBUG << "Beging reporting profile";
     {
         std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
         _profile_map.swap(profile_copy);
         _load_channel_profile_map.swap(load_channel_profile_copy);
     }
-
+    VLOG_DEBUG << "After swap profile map";
     // query_id -> {coordinator_addr, {fragment_id -> 
std::vectpr<pipeline_profile>}}
     for (auto& entry : profile_copy) {
         const auto& query_id = entry.first;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
index c5273304137..e3db2a3a716 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java
@@ -20,9 +20,9 @@ package org.apache.doris.common.proc;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.profile.RuntimeProfile;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ListComparator;
-import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
index 2b6d8f6702e..de7247ab3ab 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java
@@ -18,9 +18,9 @@
 package org.apache.doris.common.proc;
 
 import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.util.Counter;
+import org.apache.doris.common.profile.Counter;
+import org.apache.doris.common.profile.RuntimeProfile;
 import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.qe.QueryStatisticsItem;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java
similarity index 98%
rename from 
fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java
index 3f080b3752f..ab3b43bd3e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
 
 import java.util.LinkedList;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
similarity index 98%
rename from fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
rename to fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
index f6c06890047..f306d7c73fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
 
 import org.apache.doris.common.io.Text;
 import org.apache.doris.persist.gson.GsonUtils;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index d2300cd667d..fffcca49bf6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -20,7 +20,6 @@ package org.apache.doris.common.profile;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.thrift.TDetailedReportParams;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -235,7 +234,9 @@ public class ExecutionProfile {
             String suffix = " (host=" + backendHBAddress + ")";
             for (TDetailedReportParams pipelineProfile : fragmentProfile) {
                 String name = "";
-                if (pipelineProfile.isSetIsFragmentLevel() && 
pipelineProfile.is_fragment_level) {
+                boolean isFragmentLevel = 
(pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level);
+                if (isFragmentLevel) {
+                    // Fragment Level profile is also represented by 
TDetailedReportParams.
                     name = "Fragment Level Profile: " + suffix;
                 } else {
                     name = "Pipeline :" + pipelineIdx + " " + suffix;
@@ -243,9 +244,9 @@ public class ExecutionProfile {
                 }
 
                 RuntimeProfile profileNode = new RuntimeProfile(name);
-                // The taskprofile is used to save the profile of the 
pipeline, without
+                // The taskProfile is used to save the profile of the 
pipeline, without
                 // considering the FragmentLevel.
-                if (!(pipelineProfile.isSetIsFragmentLevel() && 
pipelineProfile.is_fragment_level)) {
+                if (!isFragmentLevel) {
                     taskProfile.add(profileNode);
                 }
                 if (!pipelineProfile.isSetProfile()) {
@@ -260,6 +261,9 @@ public class ExecutionProfile {
             setMultiBeProfile(fragmentId, backendHBAddress, taskProfile);
         }
 
+        LOG.info("Profile update finished query: {} fragments: {} isDone: {}",
+                DebugUtil.printId(getQueryId()), 
profile.getFragmentIdToProfile().size(), isDone);
+
         if (profile.isSetLoadChannelProfiles()) {
             for (TRuntimeProfileTree loadChannelProfile : 
profile.getLoadChannelProfiles()) {
                 this.loadChannelProfile.update(loadChannelProfile);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index a5173247403..80a79ebffbf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -20,7 +20,6 @@ package org.apache.doris.common.profile;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.nereids.NereidsPlanner;
 import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -81,10 +80,6 @@ public class Profile {
     // profile file name format: time_id
     private static final String SEPERATOR = "_";
 
-    // id will be assigned to id of SummaryProfile.
-    // For broker load, its SummaryProfile id is a string representation of a 
long integer,
-    // for others, it is queryID
-    private String id = "";
     // summaryProfile will be serialized to storage as JSON, and we can 
recover it from storage
     // recover of SummaryProfile is important, because it contains the meta 
information of the profile
     // we need it to construct memory index for profile retrieving.
@@ -103,7 +98,7 @@ public class Profile {
     // when coordinator finishes, it will mark finish time.
     // we will wait for about 5 seconds to see if all profiles have been 
reported.
     // if not, we will store the profile to storage, and release the memory,
-    // futher report will be ignored.
+    // further report will be ignored.
     // why MAX_VALUE? So that we can use PriorityQueue to sort profile by 
finish time decreasing order.
     private long queryFinishTimestamp = Long.MAX_VALUE;
     private Map<Integer, String> planNodeMap = Maps.newHashMap();
@@ -193,7 +188,6 @@ public class Profile {
             DataInput dataInput = new DataInputStream(profileFileInputStream);
             Profile res = new Profile();
             res.summaryProfile = SummaryProfile.read(dataInput);
-            res.setId(res.summaryProfile.getProfileId());
             res.profileStoragePath = path;
             res.isQueryFinished = true;
             res.profileSize = fileSize;
@@ -262,7 +256,7 @@ public class Profile {
     // For load task, the profile contains many execution profiles
     public void addExecutionProfile(ExecutionProfile executionProfile) {
         if (executionProfile == null) {
-            LOG.warn("try to set a null excecution profile, it is abnormal", 
new Exception());
+            LOG.warn("try to set a null execution profile, it is abnormal", 
new Exception());
             return;
         }
         executionProfile.setSummaryProfile(summaryProfile);
@@ -298,18 +292,17 @@ public class Profile {
             }
 
             summaryProfile.update(summaryInfo);
-            this.setId(summaryProfile.getProfileId());
 
             if (isFinished) {
                 this.markQueryFinished(System.currentTimeMillis());
             }
-            // Nerids native insert not set planner, so it is null
+            // Nereids native insert not set planner, so it is null
             if (planner != null) {
                 this.planNodeMap = planner.getExplainStringMap();
             }
             ProfileManager.getInstance().pushProfile(this);
         } catch (Throwable t) {
-            LOG.warn("update profile {} failed", id, t);
+            LOG.warn("update profile {} failed", getId(), t);
             throw t;
         }
     }
@@ -352,7 +345,7 @@ public class Profile {
     }
 
     private RuntimeProfile composeRootProfile() {
-        RuntimeProfile rootProfile = new RuntimeProfile(id);
+        RuntimeProfile rootProfile = new RuntimeProfile(getId());
         rootProfile.addChild(summaryProfile.getSummary());
         rootProfile.addChild(summaryProfile.getExecutionSummary());
         for (ExecutionProfile executionProfile : executionProfiles) {
@@ -378,8 +371,7 @@ public class Profile {
             return;
         }
 
-        // Only generate merged profile for select, insert into select.
-        // Not support broker load now.
+        // For broker load, if it has more than one execution profile, we will 
not generate merged profile.
         RuntimeProfile mergedProfile = null;
         if (this.profileLevel == MergedProfileLevel && 
this.executionProfiles.size() == 1) {
             try {
@@ -389,7 +381,7 @@ public class Profile {
                     updateActualRowCountOnPhysicalPlan(physicalPlan);
                 }
             } catch (Throwable aggProfileException) {
-                LOG.warn("build merged simple profile {} failed", this.id, 
aggProfileException);
+                LOG.warn("build merged simple profile {} failed", getId(), 
aggProfileException);
             }
         }
 
@@ -433,10 +425,6 @@ public class Profile {
         return this.queryFinishTimestamp;
     }
 
-    public void setId(String id) {
-        this.id = id;
-    }
-
     // For UT
     public void setSummaryProfile(SummaryProfile summaryProfile) {
         this.summaryProfile = summaryProfile;
@@ -483,7 +471,7 @@ public class Profile {
                     > (this.executionProfiles.size() * autoProfileDurationMs)) 
{
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Query/LoadJob {} costs {} ms, begin {} finish 
{}, need store its profile",
-                            id, durationMs, 
summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp);
+                            getId(), durationMs, 
summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp);
                 }
                 return true;
             }
@@ -491,7 +479,7 @@ public class Profile {
         }
 
         if (this.queryFinishTimestamp == Long.MAX_VALUE) {
-            LOG.warn("Logical error, query {} has finished, but 
queryFinishTimestamp is not set,", id);
+            LOG.warn("Logical error, query {} has finished, but 
queryFinishTimestamp is not set,", getId());
             return false;
         }
 
@@ -501,7 +489,8 @@ public class Profile {
                         > Config.profile_waiting_time_for_spill_seconds * 
1000) {
             LOG.warn("Profile {} should be stored to storage without waiting 
for incoming profile,"
                     + " since it has been waiting for {} ms, current time {} 
query finished time: {}",
-                    id, currentTimeMillis - this.queryFinishTimestamp, 
currentTimeMillis, this.queryFinishTimestamp);
+                    getId(), currentTimeMillis - this.queryFinishTimestamp, 
currentTimeMillis,
+                    this.queryFinishTimestamp);
 
             this.summaryProfile.setSystemMessage(
                             "This profile is not complete, since its 
collection does not finish in time."
@@ -526,7 +515,7 @@ public class Profile {
     public void markQueryFinished(long queryFinishTime) {
         try {
             if (this.profileHasBeenStored()) {
-                LOG.error("Logical error, profile {} has already been stored 
to storage", this.id);
+                LOG.error("Logical error, profile {} has already been stored 
to storage", getId());
                 return;
             }
 
@@ -539,13 +528,13 @@ public class Profile {
     }
 
     public void writeToStorage(String systemProfileStorageDir) {
-        if (Strings.isNullOrEmpty(id)) {
+        if (Strings.isNullOrEmpty(getId())) {
             LOG.warn("store profile failed, name is empty");
             return;
         }
 
         if (!Strings.isNullOrEmpty(profileStoragePath)) {
-            LOG.error("Logical error, profile {} has already been stored to 
storage", id);
+            LOG.error("Logical error, profile {} has already been stored to 
storage", getId());
             return;
         }
 
@@ -586,7 +575,7 @@ public class Profile {
             dataOutputStream.flush();
             this.profileSize = profileFile.length();
         } catch (Exception e) {
-            LOG.error("write {} summary profile failed", id, e);
+            LOG.error("write {} summary profile failed", getId(), e);
             return;
         } finally {
             try {
@@ -690,7 +679,7 @@ public class Profile {
             return;
         }
 
-        LOG.info("Profile {} has been stored to storage, reading it from 
storage", id);
+        LOG.info("Profile {} has been stored to storage, reading it from 
storage", getId());
 
         FileInputStream fileInputStream = null;
 
@@ -728,4 +717,25 @@ public class Profile {
 
         return;
     }
+
+    public String debugInfo() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("ProfileId:").append(getId()).append("|");
+        builder.append("StoragePath:").append(profileStoragePath).append("|");
+        
builder.append("StartTimeStamp:").append(summaryProfile.getQueryBeginTime()).append("|");
+        builder.append("IsFinished:").append(isQueryFinished).append("|");
+        
builder.append("FinishTimestamp:").append(queryFinishTimestamp).append("|");
+        builder.append("AutoProfileDuration: 
").append(autoProfileDurationMs).append("|");
+        builder.append("ExecutionProfileCnt: 
").append(executionProfiles.size()).append("|");
+        builder.append("ProfileOnStorageSize:").append(profileSize);
+        return builder.toString();
+    }
+
+    public void setQueryFinishTimestamp(long l) {
+        this.queryFinishTimestamp = l;
+    }
+
+    public String getId() {
+        return summaryProfile.getProfileId();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
index ef9f2f7bbaf..f174b6b7dcc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java
@@ -65,7 +65,7 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
  * All attributes can be seen from the above.
  *
  * why the element in the finished profile array is not RuntimeProfile,
- * the purpose is let coordinator can destruct earlier(the fragment profile is 
in Coordinator)
+ * the purpose is let coordinator can destruct earlier (the fragment profile 
is in Coordinator)
  *
  */
 public class ProfileManager extends MasterDaemon {
@@ -83,7 +83,7 @@ public class ProfileManager extends MasterDaemon {
             this.profile = profile;
         }
 
-        private final Profile profile;
+        final Profile profile;
         public Map<String, String> infoStrings = Maps.newHashMap();
         public String errMsg = "";
 
@@ -655,11 +655,6 @@ public class ProfileManager extends MasterDaemon {
             }
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("{} profiles size on storage: {}", profileDeque.size(),
-                        DebugUtil.printByteWithUnit(totalProfileSize));
-        }
-
         final int maxSpilledProfileNum = Config.max_spilled_profile_num;
         final long spilledProfileLimitBytes = 
Config.spilled_profile_storage_limit_bytes;
         List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList();
@@ -803,29 +798,57 @@ public class ProfileManager extends MasterDaemon {
         }
     }
 
+    // The init value of query finish time of profile is MAX_VALUE,
+    // So a more recent query will be on the top of the heap.
+    PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTimeDesc() {
+        readLock.lock();
+        try {
+            PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
+                    (ProfileElement profileElement) -> 
profileElement.profile.getQueryFinishTimestamp()).reversed());
+
+            queryIdToProfileMap.forEach((queryId, profileElement) -> {
+                queryIdDeque.add(profileElement);
+            });
+
+            return queryIdDeque;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
     // The init value of query finish time of profile is MAX_VALUE
-    // So more recent query will be on the top of heap.
-    private PriorityQueue<ProfileElement> 
getProfileOrderByQueryFinishTimeDesc() {
-        PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
-                (ProfileElement profileElement) -> 
profileElement.profile.getQueryFinishTimestamp()).reversed());
+    // So query finished earlier will be on the top of heap
+    PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTime() {
+        readLock.lock();
+        try {
+            PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
+                    (ProfileElement profileElement) -> 
profileElement.profile.getQueryFinishTimestamp()));
 
-        queryIdToProfileMap.forEach((queryId, profileElement) -> {
-            queryIdDeque.add(profileElement);
-        });
+            queryIdToProfileMap.forEach((queryId, profileElement) -> {
+                queryIdDeque.add(profileElement);
+            });
 
-        return queryIdDeque;
+            return queryIdDeque;
+        } finally {
+            readLock.unlock();
+        }
     }
 
     // Older query will be on the top of heap
-    private PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() {
-        PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
-                (ProfileElement profileElement) -> 
profileElement.profile.getSummaryProfile().getQueryBeginTime()));
+    PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() {
+        readLock.lock();
+        try {
+            PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
+                    (ProfileElement profileElement) -> 
profileElement.profile.getSummaryProfile().getQueryBeginTime()));
 
-        queryIdToProfileMap.forEach((queryId, profileElement) -> {
-            queryIdDeque.add(profileElement);
-        });
+            queryIdToProfileMap.forEach((queryId, profileElement) -> {
+                queryIdDeque.add(profileElement);
+            });
 
-        return queryIdDeque;
+            return queryIdDeque;
+        } finally {
+            readLock.unlock();
+        }
     }
 
     // When the query is finished, the execution profile should be marked as 
finished
@@ -885,7 +908,7 @@ public class ProfileManager extends MasterDaemon {
             writeLock.unlock();
             if (stringBuilder.length() != 0) {
                 LOG.warn("Remove expired execution profiles {}, current 
execution profile map size {},"
-                        + "Config.max_query_profile_num{}, 
Config.profile_async_collect_expire_time_secs {}",
+                        + "Config.max_query_profile_num {}, 
Config.profile_async_collect_expire_time_secs {}",
                         stringBuilder.toString(), executionProfileNum,
                         Config.max_query_profile_num, 
Config.profile_async_collect_expire_time_secs);
             }
@@ -894,7 +917,7 @@ public class ProfileManager extends MasterDaemon {
 
     private void deleteOutdatedProfilesFromMemory() {
         StringBuilder stringBuilder = new StringBuilder();
-        int profileNum = 0;
+        StringBuilder stringBuilderTTL = new StringBuilder();
         writeLock.lock();
 
         try {
@@ -903,7 +926,13 @@ public class ProfileManager extends MasterDaemon {
 
             for (ProfileElement profileElement : 
this.queryIdToProfileMap.values()) {
                 if (profileElement.profile.shouldBeRemoveFromMemory()) {
-                    
profilesToRemove.add(profileElement.profile.getSummaryProfile().getProfileId());
+                    String profileId = 
profileElement.profile.getSummaryProfile().getProfileId();
+                    profilesToRemove.add(profileId);
+                    stringBuilder.append(profileId).append(",");
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Profile {} should be filtered from memory, 
information {}", profileId,
+                                profileElement.profile.debugInfo());
+                    }
                 }
             }
 
@@ -913,34 +942,54 @@ public class ProfileManager extends MasterDaemon {
                 for (ExecutionProfile executionProfile : 
profileElement.profile.getExecutionProfiles()) {
                     
queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
                 }
-                
stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
             }
 
             if (this.queryIdToProfileMap.size() <= 
Config.max_query_profile_num) {
                 return;
             }
 
-            PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryStartTime();
+            // profile is ordered by query finish time
+            // query finished earlier will be on the top of heap
+            // query finished time of unfinished query is INT_MAX, so they 
will be on the bottom of the heap.
+            PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTime();
 
-            while (queueIdDeque.size() > Config.max_query_profile_num) {
+            while (queueIdDeque.size() > Config.max_query_profile_num && 
!queueIdDeque.isEmpty()) {
                 ProfileElement profileElement = queueIdDeque.poll();
-
-                
queryIdToProfileMap.remove(profileElement.profile.getSummaryProfile().getProfileId());
+                String profileId = 
profileElement.profile.getSummaryProfile().getProfileId();
+                stringBuilderTTL.append(profileId).append(",");
+                queryIdToProfileMap.remove(profileId);
                 for (ExecutionProfile executionProfile : 
profileElement.profile.getExecutionProfiles()) {
                     
queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
                 }
 
-                
stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Remove profile {} since ttl from memory, info 
{}", profileId,
+                                        profileElement.profile.debugInfo());
+                }
             }
         } finally {
-            profileNum = queryIdToProfileMap.size();
+            int profileNum = queryIdToProfileMap.size();
             writeLock.unlock();
 
-            if (stringBuilder.length() != 0) {
-                LOG.info("Remove outdated profiles {} from memoy, current 
profile map size {}",
-                        stringBuilder.toString(), profileNum);
+            if (stringBuilder.length() != 0 || stringBuilderTTL.length() != 0) 
{
+                LOG.info("Filtered profiles {}, outdated profiles {}, they are 
removed from memory,"
+                                + " current profile map size {}",
+                        stringBuilder.toString(), stringBuilderTTL.toString(), 
profileNum);
+            }
+        }
+    }
+
+    String getDebugInfo() {
+        StringBuilder stringBuilder = new StringBuilder();
+        readLock.lock();
+        try {
+            for (ProfileElement profileElement : queryIdToProfileMap.values()) 
{
+                
stringBuilder.append(profileElement.profile.debugInfo()).append("\n");
             }
+        } finally {
+            readLock.unlock();
         }
+        return stringBuilder.toString();
     }
 
     public List<List<String>> getProfileMetaWithType(ProfileType profileType, 
long limit) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
similarity index 99%
rename from 
fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index 19082959034..78ff0f58ed2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -15,12 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
 
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TCounter;
 import org.apache.doris.thrift.TRuntimeProfileNode;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 5b0d5ba3533..7215b8a9c65 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -19,7 +19,6 @@ package org.apache.doris.common.profile;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
-import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -543,8 +542,8 @@ public class SummaryProfile {
         this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs();
     }
 
-    public void setQueryBeginTime() {
-        this.queryBeginTime = TimeUtils.getStartTimeMs();
+    public void setQueryBeginTime(long queryBeginTime) {
+        this.queryBeginTime = queryBeginTime;
     }
 
     public void setQueryAnalysisFinishTime() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index b2fd6746f2f..efb2fcc06d4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -247,7 +247,7 @@ public class BrokerLoadJob extends BulkLoadJob {
                     true,
                     
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, 
"3")),
                     
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.AUTO_PROFILE_THRESHOLD_MS,
 "-1")));
-            this.jobProfile.getSummaryProfile().setQueryBeginTime();
+            
this.jobProfile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
             // TODO: 怎么给这些 load job 设置 profile 记录时间
             // this.jobProfile.setId("BrokerLoadJob " + id + ". " + label);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index aed3cb4c1f2..69cbf762c2a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.NereidsException;
 import org.apache.doris.common.profile.Profile;
 import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.FileFormatUtils;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.base.JobExecutionConfiguration;
@@ -133,7 +134,7 @@ public class LoadCommand extends Command implements 
ForwardWithSync {
                 ctx.getSessionVariable().enableProfile,
                 ctx.getSessionVariable().profileLevel,
                 ctx.getSessionVariable().getAutoProfileThresholdMs());
-        profile.getSummaryProfile().setQueryBeginTime();
+        
profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
         if (sourceInfos.size() == 1) {
             plans = ImmutableList.of(new 
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
                     Optional.of(labelName), Optional.empty(), 
Optional.empty()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index d9b4583d71f..378d37d082f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -75,18 +75,25 @@ public final class QeProcessorImpl implements QeProcessor {
     private Status processQueryProfile(TQueryProfile profile, TNetworkAddress 
address, boolean isDone) {
         ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(profile.query_id);
         if (executionProfile == null) {
-            LOG.warn("Could not find execution profile with query id {}", 
DebugUtil.printId(profile.query_id));
+            LOG.warn("Could not find execution profile, query {} be {}",
+                                DebugUtil.printId(profile.query_id), 
address.toString());
             return new Status(TStatusCode.NOT_FOUND, "Could not find execution 
profile with query id "
                     + DebugUtil.printId(profile.query_id));
         }
 
-        // Update profile may cost a lot of time, use a seperate pool to deal 
with it.
-        writeProfileExecutor.submit(new Runnable() {
-            @Override
-            public void run() {
-                executionProfile.updateProfile(profile, address, isDone);
-            }
-        });
+        // Update profile may cost a lot of time, use a separate pool to deal 
with it.
+        try {
+            writeProfileExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    executionProfile.updateProfile(profile, address, isDone);
+                }
+            });
+        } catch (Exception e) {
+            LOG.warn("Failed to submit profile write task, query {} be {}",
+                                DebugUtil.printId(profile.query_id), 
address.toString());
+            return new Status(TStatusCode.INTERNAL_ERROR, "Failed to submit 
profile write task");
+        }
 
         return Status.OK;
     }
@@ -231,9 +238,16 @@ public final class QeProcessorImpl implements QeProcessor {
             // with profile in a single rpc, this will make FE ignore the exec 
status and may lead to bug in query
             // like insert into select.
             if (params.isSetBackendId() && params.isSetDone()) {
+                LOG.info("Receive profile {} report from {}, isDone {}, 
fragments {}",
+                        
DebugUtil.printId(params.getQueryProfile().getQueryId()), beAddr.toString(),
+                        params.isDone(), 
params.getQueryProfile().fragment_id_to_profile.size());
+
                 Backend backend = 
Env.getCurrentSystemInfo().getBackend(params.getBackendId());
-                boolean isDone = params.isDone();
-                if (backend != null) {
+                if (backend == null) {
+                    LOG.warn("Invalid report profile req, backend {} not 
found, query id: {}",
+                            params.getBackendId(), 
DebugUtil.printId(params.getQueryProfile().getQueryId()));
+                } else {
+                    boolean isDone = params.isDone();
                     // the process status is ignored by design.
                     // actually be does not care the process status of profile 
on fe.
                     processQueryProfile(params.getQueryProfile(), 
backend.getHeartbeatAddress(), isDone);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
index 76b528464d6..c51ff24ca14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java
@@ -17,7 +17,7 @@
 
 package org.apache.doris.qe;
 
-import org.apache.doris.common.util.RuntimeProfile;
+import org.apache.doris.common.profile.RuntimeProfile;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index f693f5b82aa..8d712dda76a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -691,7 +691,7 @@ public class StmtExecutor {
         }
         context.setQueryId(queryId);
         context.setStartTime();
-        profile.getSummaryProfile().setQueryBeginTime();
+        
profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
         List<List<String>> changedSessionVar = 
VariableMgr.dumpChangedVars(context.getSessionVariable());
         
profile.setChangedSessionVar(DebugUtil.prettyPrintChangedSessionVar(changedSessionVar));
         context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
@@ -995,7 +995,7 @@ public class StmtExecutor {
     public void executeByLegacy(TUniqueId queryId) throws Exception {
         context.setStartTime();
 
-        profile.getSummaryProfile().setQueryBeginTime();
+        
profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs());
         context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
         context.setQueryId(queryId);
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
new file mode 100644
index 00000000000..d0d17505861
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.profile;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+class ProfileManagerTest {
+    // We need a logger.
+    private static final Logger LOG = 
LogManager.getLogger(ProfilePersistentTest.class);
+
+    private static ProfileManager profileManager;
+
+    @BeforeAll
+    static void setUp() {
+        profileManager = new ProfileManager();
+    }
+
+    @BeforeEach
+    void cleanProfile() {
+        profileManager.cleanProfile();
+    }
+
+    @Test
+    void returnsEmptyQueueWhenNoProfiles() {
+        PriorityQueue<ProfileManager.ProfileElement> result = 
profileManager.getProfileOrderByQueryFinishTimeDesc();
+        Assertions.assertTrue(result.isEmpty());
+        result = profileManager.getProfileOrderByQueryFinishTime();
+        Assertions.assertTrue(result.isEmpty());
+        result = profileManager.getProfileOrderByQueryStartTime();
+        Assertions.assertTrue(result.isEmpty());
+    }
+
+    static Profile constructProfile(String id) {
+        Profile profile = new Profile();
+        SummaryProfile summaryProfile = new SummaryProfile();
+        
summaryProfile.getSummary().getInfoStrings().put(SummaryProfile.PROFILE_ID, id);
+        profile.setSummaryProfile(summaryProfile);
+        return profile;
+    }
+
+    @Test
+    void getProfileByOrder() {
+        final int normalProfiles = 100;
+        for (int i = 0; i < normalProfiles; i++) {
+            Profile profile = constructProfile(String.valueOf(i));
+            Random random = new Random();
+            profile.setQueryFinishTimestamp(random.nextInt(200 - 101) + 101);
+            // set query start time in range of [0, 1000)
+            profile.getSummaryProfile().setQueryBeginTime(random.nextInt(100));
+            profileManager.pushProfile(profile);
+
+            if (i == 10) {
+                LOG.info("Profile manager debug info: {}", 
profileManager.getDebugInfo());
+            }
+        }
+
+        // Insert two profiles with default value.
+        Profile profile1 = constructProfile("Default 1");
+        profileManager.pushProfile(profile1);
+        Profile profile2 = constructProfile("Default 2");
+        profileManager.pushProfile(profile2);
+
+        profile1 = constructProfile("Default 3");
+        profile1.setQueryFinishTimestamp(1000L);
+        profileManager.pushProfile(profile1);
+        profile1 = constructProfile("Default 4");
+        profile1.setQueryFinishTimestamp(1000L);
+        profileManager.pushProfile(profile1);
+
+        profile1 = constructProfile("Default 5");
+        profile1.getSummaryProfile().setQueryBeginTime(1000L);
+        profileManager.pushProfile(profile1);
+        profile1 = constructProfile("Default 6");
+        profile1.getSummaryProfile().setQueryBeginTime(1000L);
+        profileManager.pushProfile(profile1);
+
+
+        Set<String> profileThatHasQueryFinishTime1000 = new HashSet<>();
+        profileThatHasQueryFinishTime1000.add("Default 3");
+        profileThatHasQueryFinishTime1000.add("Default 4");
+        Set<String> profileThatHasQueryStartTime1000 = new HashSet<>();
+        profileThatHasQueryStartTime1000.add("Default 5");
+        profileThatHasQueryStartTime1000.add("Default 6");
+        Set<String> profileThatHasDefaultQueryFinishTime = new HashSet<>();
+        profileThatHasDefaultQueryFinishTime.add("Default 1");
+        profileThatHasDefaultQueryFinishTime.add("Default 2");
+        profileThatHasDefaultQueryFinishTime.add("Default 5");
+        profileThatHasDefaultQueryFinishTime.add("Default 6");
+        Set<String> profileThatHasDefaultQueryStartTime = new HashSet<>();
+        profileThatHasDefaultQueryStartTime.add("Default 1");
+        profileThatHasDefaultQueryStartTime.add("Default 2");
+        profileThatHasDefaultQueryStartTime.add("Default 3");
+        profileThatHasDefaultQueryStartTime.add("Default 4");
+
+
+        // Profile should be ordered by query finish time in descending order.
+        // Meas that the profile with the latest query finish time should be 
at the top of the queue.
+        PriorityQueue<ProfileManager.ProfileElement> orderedResults = 
profileManager.getProfileOrderByQueryFinishTimeDesc();
+        assert orderedResults != null;
+        assert !orderedResults.isEmpty();
+        Assertions.assertEquals(106, orderedResults.size(), 
profileManager.getDebugInfo());
+
+        for (int i = 0; i < profileThatHasDefaultQueryFinishTime.size(); i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(profileThatHasDefaultQueryFinishTime.contains(result.profile.getId()));
+        }
+        for (int i = 0; i < profileThatHasQueryFinishTime1000.size(); i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(profileThatHasQueryFinishTime1000.contains(result.profile.getId()));
+        }
+
+        long prevQueryFinishTime = 1000L;
+        for (int i = 0; i < normalProfiles; i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            Assertions.assertTrue(result.profile.getQueryFinishTimestamp() <= 
prevQueryFinishTime);
+            prevQueryFinishTime = result.profile.getQueryFinishTimestamp();
+        }
+
+        orderedResults = profileManager.getProfileOrderByQueryFinishTime();
+        Assertions.assertEquals(orderedResults.size(), 106);
+        // Profile should be ordered by query finish time in ascending order.
+        prevQueryFinishTime = Long.MIN_VALUE;
+        for (int i = 0; i < normalProfiles; i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            Assertions.assertTrue(result.profile.getQueryFinishTimestamp() >= 
prevQueryFinishTime);
+            prevQueryFinishTime = result.profile.getQueryFinishTimestamp();
+        }
+        for (int i = 0; i < profileThatHasQueryFinishTime1000.size(); i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(profileThatHasQueryFinishTime1000.contains(result.profile.getId()));
+        }
+        for (int i = 0; i < profileThatHasDefaultQueryFinishTime.size(); i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(profileThatHasDefaultQueryFinishTime.contains(result.profile.getId()));
+        }
+
+        orderedResults = profileManager.getProfileOrderByQueryStartTime();
+        Assertions.assertEquals(orderedResults.size(), 106);
+        // Profile should be ordered by query start time in ascending order.
+        long prevQueryStartTime = -1;
+        for (int i = 0; i < profileThatHasDefaultQueryStartTime.size(); i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(profileThatHasDefaultQueryStartTime.contains(result.profile.getId()),
+                                result.profile.getId() + " " + 
result.profile.getSummaryProfile().getQueryBeginTime());
+        }
+
+        for (int i = 0; i < normalProfiles; i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(result.profile.getSummaryProfile().getQueryBeginTime() >= 
prevQueryStartTime);
+            prevQueryStartTime = 
result.profile.getSummaryProfile().getQueryBeginTime();
+        }
+
+        for (int i = 0; i < profileThatHasQueryStartTime1000.size(); i++) {
+            ProfileManager.ProfileElement result = orderedResults.poll();
+            Assertions.assertNotEquals(result, null);
+            
Assertions.assertTrue(profileThatHasQueryStartTime1000.contains(result.profile.getId()),
+                                result.profile.getId() + " " + 
result.profile.getSummaryProfile().getQueryBeginTime());
+        }
+    }
+
+    @Test
+    void getProfileByOrderParallel() throws InterruptedException {
+        // Test the parallel case.
+        // Create a thread pool with 3 threads.
+        final int threadNum = 3;
+        List<Thread> threads = new ArrayList<>();
+        AtomicBoolean stopFlag = new AtomicBoolean(false);
+
+        // These threads keep adding profiles to the profile manager.
+        // The profile they create has random name, random query finish time 
and random query start time.
+        for (int i = 0; i < threadNum; i++) {
+            threads.add(new Thread(() -> {
+                Random random = new Random();
+                for (int j = 0; j < 100; j++) {
+                    Profile profile = 
constructProfile(String.valueOf(random.nextInt(1000)));
+                    
profile.getSummaryProfile().setQueryBeginTime(random.nextInt(1000));
+                    profile.setQueryFinishTimestamp(random.nextInt(2000) + 
1000);
+                    profileManager.pushProfile(profile);
+                }
+            }));
+        }
+        // Create another thread to get the profile by different order.
+        for (int i = 0; i < threadNum; i++) {
+            threads.add(new Thread(() -> {
+                while (!stopFlag.get()) {
+                    PriorityQueue<ProfileManager.ProfileElement> 
orderedResults = profileManager.getProfileOrderByQueryFinishTimeDesc();
+                    long prevQueryFinishTime = Long.MAX_VALUE;
+                    while (!orderedResults.isEmpty()) {
+                        ProfileManager.ProfileElement result = 
orderedResults.poll();
+                        
Assertions.assertTrue(result.profile.getQueryFinishTimestamp() <= 
prevQueryFinishTime);
+                        prevQueryFinishTime = 
result.profile.getQueryFinishTimestamp();
+                    }
+                }
+            }));
+        }
+
+        for (int i = 0; i < threadNum; i++) {
+            threads.add(new Thread(() -> {
+                while (!stopFlag.get()) {
+                    PriorityQueue<ProfileManager.ProfileElement> 
orderedResults = profileManager.getProfileOrderByQueryStartTime();
+                    long prevQueryStartTime = -1;
+                    while (!orderedResults.isEmpty()) {
+                        ProfileManager.ProfileElement result = 
orderedResults.poll();
+                        
Assertions.assertTrue(result.profile.getSummaryProfile().getQueryBeginTime() >= 
prevQueryStartTime);
+                        prevQueryStartTime = 
result.profile.getSummaryProfile().getQueryBeginTime();
+                    }
+                }
+            }));
+        }
+
+        for (int i = 0; i < threadNum; i++) {
+            threads.add(new Thread(() -> {
+                while (!stopFlag.get()) {
+                    PriorityQueue<ProfileManager.ProfileElement> 
orderedResults = profileManager.getProfileOrderByQueryFinishTime();
+                    long prevQueryFinishTime = Long.MIN_VALUE;
+                    while (!orderedResults.isEmpty()) {
+                        ProfileManager.ProfileElement result = 
orderedResults.poll();
+                        
Assertions.assertTrue(result.profile.getQueryFinishTimestamp() >= 
prevQueryFinishTime);
+                        prevQueryFinishTime = 
result.profile.getQueryFinishTimestamp();
+                    }
+                }
+            }));
+        }
+
+        for (Thread thread : threads) {
+            thread.start();
+        }
+
+        Thread.sleep(5000);
+
+        stopFlag.set(true);
+
+        for (Thread thread : threads) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java
similarity index 98%
rename from 
fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
rename to 
fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java
index c21c57ba1d4..cb1804e2d50 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
 
-import org.apache.doris.common.profile.ExecutionProfile;
-import org.apache.doris.common.profile.Profile;
-import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.thrift.QueryState;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUnit;
@@ -77,7 +76,6 @@ public class ProfilePersistentTest {
         SummaryProfile summaryProfile = constructRandomSummaryProfile();
         String stringUniqueId = summaryProfile.getProfileId();
         TUniqueId thriftUniqueId = 
DebugUtil.parseTUniqueIdFromString(stringUniqueId);
-        profile.setId(stringUniqueId);
         profile.setSummaryProfile(summaryProfile);
 
         for (int i = 0; i < executionProfileNum; i++) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java
similarity index 99%
rename from 
fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
rename to 
fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java
index 56ed66c0504..93cb5e35794 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.common.util;
+package org.apache.doris.common.profile;
 
 import org.apache.doris.thrift.TCounter;
 import org.apache.doris.thrift.TRuntimeProfileNode;
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf 
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index d201cad3fac..c35859cd6b4 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -39,3 +39,5 @@ pipeline_task_leakage_detect_period_sec=1
 crash_in_memory_tracker_inaccurate = true
 enable_table_size_correctness_check=true
 enable_brpc_connection_check=true
+
+sys_log_verbose_modules=query_context,runtime_query_statistics_mgr
\ No newline at end of file
diff --git a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf 
b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
index 955409d9bb2..40d88d0f207 100644
--- a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
@@ -24,7 +24,10 @@ enable_debug_points = true
 disable_datev1=false
 
 disable_decimalv2=false
-max_query_profile_num=1000
+sys_log_verbose_modules = 
org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl
+# profile related
+max_query_profile_num = 2000
+max_spilled_profile_num = 2000
 
 statistics_sql_mem_limit_in_bytes=21474836480
 cpu_resource_limit_per_analyze_task=-1
diff --git a/regression-test/pipeline/p0/conf/be.conf 
b/regression-test/pipeline/p0/conf/be.conf
index 0b73375b3fb..be798738e68 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -35,7 +35,7 @@ JEMALLOC_PROF_PRFIX="jemalloc_heap_profile_"
 
 # INFO, WARNING, ERROR, FATAL
 sys_log_level = INFO
-
+sys_log_verbose_modules=query_context,runtime_query_statistics_mgr
 be_port = 9161
 webserver_port = 8141
 heartbeat_service_port = 9151
diff --git a/regression-test/pipeline/p0/conf/fe.conf 
b/regression-test/pipeline/p0/conf/fe.conf
index 44688ff4adc..2f493ff1098 100644
--- a/regression-test/pipeline/p0/conf/fe.conf
+++ b/regression-test/pipeline/p0/conf/fe.conf
@@ -34,6 +34,7 @@ 
JAVA_OPTS_FOR_JDK_17="-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m
 
 sys_log_level = INFO
 sys_log_mode = NORMAL
+sys_log_verbose_modules = 
org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl
 arrow_flight_sql_port = 8081
 catalog_trash_expire_second=1
 #enable ssl for test
@@ -85,3 +86,7 @@ enable_deadlock_detection = true
 max_lock_hold_threshold_seconds = 10
 
 force_olap_table_replication_allocation=tag.location.default:1
+
+# profile related
+max_query_profile_num = 2000
+max_spilled_profile_num = 2000


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to