This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7f55c73336e [fix](profile) Make auto_profile_threshold_ms work on FE 
(#39448)
7f55c73336e is described below

commit 7f55c73336e5e11e2daf6056ff21ab424ec690bd
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Mon Aug 19 19:01:14 2024 +0800

    [fix](profile) Make auto_profile_threshold_ms work on FE (#39448)
    
    The `auto_profile_threshold_ms` parameter indicates that a profile will
    only be generated if the query time exceeds this threshold. For load
    jobs, since each load task has a profile, this parameter applies to each
    load task, indicating that a profile will be generated only if the load
    task's execution time exceeds this threshold.
    
    The default value of `auto_profile_threshold_ms` is -1, which means that
    as long as the user has enabled profiling, the profile will be
    collected. This behavior is consistent with the current older version.
    
    When the user sets `auto_profile_threshold_ms` to 4 seconds:
    
    1. On the BE (Backend) side, while reporting the profile, it will check
    the execution time, and if it is less than 4 seconds, it will not report
    it(#38548 ).
    2. . On the FE (Frontend) side, it will calculate based on the execution
    time observed by the coordinator.
    1. If the query completion time is less than 4 seconds, it will directly
    delete the profile without saving it or waiting for the report to
    finish. The associated execution profile will also be deleted.
    3. If the query time is more than 4 seconds, the profile will not be
    deleted and needs to be saved. It will also be persisted when memory is
    insufficient and needs to be evicted.
---
 .../main/java/org/apache/doris/common/Config.java  |   8 +-
 .../org/apache/doris/common/profile/Profile.java   |  25 +++-
 .../apache/doris/common/util/ProfileManager.java   | 132 +++++++++++++++++----
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |   7 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 5 files changed, 140 insertions(+), 34 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c324ce37e2e..88be23b1af9 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1910,7 +1910,7 @@ public class Config extends ConfigBase {
      * Max query profile num.
      */
     @ConfField(mutable = true, masterOnly = false)
-    public static int max_query_profile_num = 100;
+    public static int max_query_profile_num = 500;
 
     /**
      * Set to true to disable backend black list, so that even if we failed to 
send task to a backend,
@@ -2693,6 +2693,12 @@ public class Config extends ConfigBase {
             "Whether to enable proxy protocol"
     })
     public static boolean enable_proxy_protocol = false;
+
+    @ConfField(description = {
+            "Profile 异步收集过期时间,在 query 完成后,如果在该参数指定的时长内 profile 没有收集完成,则未完成的 
profile 会被放弃。",
+            "Profile async collect expire time, after the query is completed, 
if the profile is not collected within "
+                    + " the time specified by this parameter, the uncompleted 
profile will be abandoned."
+    })
     public static int profile_async_collect_expire_time_secs = 5;
 
     // Used to check compatibility when upgrading.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index aab87a5114b..76414677d0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -103,7 +103,7 @@ public class Profile {
     private long queryFinishTimestamp = Long.MAX_VALUE;
     private Map<Integer, String> planNodeMap = Maps.newHashMap();
     private int profileLevel = MergedProfileLevel;
-    private long autoProfileDurationMs = 500;
+    private long autoProfileDurationMs = -1;
     // Profile size is the size of profile file
     private long profileSize = 0;
 
@@ -419,7 +419,7 @@ public class Profile {
                 builder.append("\n MergedProfile \n");
                 
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
 "     ");
             } catch (Throwable aggProfileException) {
-                LOG.warn("build merged simple profile failed", 
aggProfileException);
+                LOG.warn("build merged simple profile {} failed", this.id, 
aggProfileException);
                 builder.append("build merged simple profile failed");
             }
         }
@@ -464,6 +464,11 @@ public class Profile {
         // below is the case where query has finished
         boolean hasReportingProfile = false;
 
+        if (this.executionProfiles.isEmpty()) {
+            LOG.warn("Profile {} has no execution profile, it is abnormal", 
id);
+            return false;
+        }
+
         for (ExecutionProfile executionProfile : executionProfiles) {
             if (!executionProfile.isCompleted()) {
                 hasReportingProfile = true;
@@ -625,4 +630,20 @@ public class Profile {
     public long getProfileSize() {
         return this.profileSize;
     }
+
+    public boolean shouldBeRemoveFromMemory() {
+        if (!this.isQueryFinished) {
+            return false;
+        }
+
+        if (this.profileHasBeenStored()) {
+            return false;
+        }
+
+        if (this.queryFinishTimestamp - 
this.summaryProfile.getQueryBeginTime() >= autoProfileDurationMs) {
+            return false;
+        }
+
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 306b5f1b4e6..eff741e4080 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -193,28 +193,6 @@ public class ProfileManager extends MasterDaemon {
                 LOG.debug("Add execution profile {} to profile manager",
                         DebugUtil.printId(executionProfile.getQueryId()));
             }
-            // This branch has two purposes:
-            // 1. discard profile collecting if its collection not finished in 
5 seconds after query finished.
-            // 2. prevent execution profile from leakage. If we have too many 
execution profiles in memory,
-            // we will remove execution profiles of query that has finished in 
5 seconds ago.
-            if (queryIdToExecutionProfiles.size() > 2 * 
Config.max_query_profile_num) {
-                List<ExecutionProfile> finishOrExpireExecutionProfiles = 
Lists.newArrayList();
-                for (ExecutionProfile tmpProfile : 
queryIdToExecutionProfiles.values()) {
-                    boolean queryFinishedLongEnough = 
tmpProfile.getQueryFinishTime() > 0
-                            && System.currentTimeMillis() - 
tmpProfile.getQueryFinishTime()
-                            > Config.profile_async_collect_expire_time_secs * 
1000;
-
-                    if (queryFinishedLongEnough) {
-                        finishOrExpireExecutionProfiles.add(tmpProfile);
-                    }
-                }
-                StringBuilder stringBuilder = new StringBuilder();
-                for (ExecutionProfile tmp : finishOrExpireExecutionProfiles) {
-                    
stringBuilder.append(DebugUtil.printId(tmp.getQueryId())).append(",");
-                    queryIdToExecutionProfiles.remove(tmp.getQueryId());
-                }
-                LOG.warn("Remove expired execution profiles {}", 
stringBuilder.toString());
-            }
         } finally {
             writeLock.unlock();
         }
@@ -253,7 +231,7 @@ public class ProfileManager extends MasterDaemon {
         List<List<String>> result = Lists.newArrayList();
         readLock.lock();
         try {
-            PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTime();
+            PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTimeDesc();
             while (!queueIdDeque.isEmpty()) {
                 ProfileElement profileElement = queueIdDeque.poll();
                 Map<String, String> infoStrings = profileElement.infoStrings;
@@ -556,7 +534,9 @@ public class ProfileManager extends MasterDaemon {
         loadProfilesFromStorageIfFirstTime();
         writeProfileToStorage();
         deleteBrokenProfiles();
+        deleteOutdatedProfilesFromMemory();
         deleteOutdatedProfilesFromStorage();
+        preventExecutionProfileLeakage();
     }
 
     // List PROFILE_STORAGE_PATH and return all dir names
@@ -882,7 +862,7 @@ public class ProfileManager extends MasterDaemon {
 
     // The init value of query finish time of profile is MAX_VALUE
     // So more recent query will be on the top of heap.
-    private PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTime() {
+    private PriorityQueue<ProfileElement> 
getProfileOrderByQueryFinishTimeDesc() {
         PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
                 (ProfileElement profileElement) -> 
profileElement.profile.getQueryFinishTimestamp()).reversed());
 
@@ -893,6 +873,18 @@ public class ProfileManager extends MasterDaemon {
         return queryIdDeque;
     }
 
+    // Older query will be on the top of heap
+    private PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() {
+        PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
+                (ProfileElement profileElement) -> 
profileElement.profile.getSummaryProfile().getQueryBeginTime()));
+
+        queryIdToProfileMap.forEach((queryId, profileElement) -> {
+            queryIdDeque.add(profileElement);
+        });
+
+        return queryIdDeque;
+    }
+
     // When the query is finished, the execution profile should be marked as 
finished
     // For load task, one of its execution profile is finished.
     public void markExecutionProfileFinished(TUniqueId queryId) {
@@ -913,8 +905,98 @@ public class ProfileManager extends MasterDaemon {
     }
 
     public String getLastProfileId() {
-        PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTime();
+        PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTimeDesc();
         ProfileElement profileElement = queueIdDeque.poll();
         return profileElement.profile.getSummaryProfile().getProfileId();
     }
+
+    private void preventExecutionProfileLeakage() {
+        StringBuilder stringBuilder = new StringBuilder();
+        int executionProfileNum = 0;
+        writeLock.lock();
+        try {
+            // This branch has two purposes:
+            // 1. discard profile collecting if its collection not finished in 
5 seconds after query finished.
+            // 2. prevent execution profile from leakage. If we have too many 
execution profiles in memory,
+            // we will remove execution profiles of query that has finished in 
5 seconds ago.
+            if (queryIdToExecutionProfiles.size() > 2 * 
Config.max_query_profile_num) {
+                List<ExecutionProfile> finishOrExpireExecutionProfiles = 
Lists.newArrayList();
+                for (ExecutionProfile tmpProfile : 
queryIdToExecutionProfiles.values()) {
+                    boolean queryFinishedLongEnough = 
tmpProfile.getQueryFinishTime() > 0
+                            && System.currentTimeMillis() - 
tmpProfile.getQueryFinishTime()
+                            > Config.profile_async_collect_expire_time_secs * 
1000;
+
+                    if (queryFinishedLongEnough) {
+                        finishOrExpireExecutionProfiles.add(tmpProfile);
+                    }
+                }
+
+                for (ExecutionProfile tmp : finishOrExpireExecutionProfiles) {
+                    
stringBuilder.append(DebugUtil.printId(tmp.getQueryId())).append(",");
+                    queryIdToExecutionProfiles.remove(tmp.getQueryId());
+                }
+
+                executionProfileNum = queryIdToExecutionProfiles.size();
+            }
+        } finally {
+            writeLock.unlock();
+            if (stringBuilder.length() != 0) {
+                LOG.warn("Remove expired execution profiles {}, current 
execution profile map size {},"
+                        + "Config.max_query_profile_num{}, 
Config.profile_async_collect_expire_time_secs {}",
+                        stringBuilder.toString(), executionProfileNum,
+                        Config.max_query_profile_num, 
Config.profile_async_collect_expire_time_secs);
+            }
+        }
+    }
+
+    private void deleteOutdatedProfilesFromMemory() {
+        StringBuilder stringBuilder = new StringBuilder();
+        int profileNum = 0;
+        writeLock.lock();
+
+        try {
+            // Remove profiles that costs less than auto_profile_threshold_ms
+            List<String> profilesToRemove = Lists.newArrayList();
+
+            for (ProfileElement profileElement : 
this.queryIdToProfileMap.values()) {
+                if (profileElement.profile.shouldBeRemoveFromMemory()) {
+                    
profilesToRemove.add(profileElement.profile.getSummaryProfile().getProfileId());
+                }
+            }
+
+            for (String profileId : profilesToRemove) {
+                ProfileElement profileElement = 
queryIdToProfileMap.get(profileId);
+                queryIdToProfileMap.remove(profileId);
+                for (ExecutionProfile executionProfile : 
profileElement.profile.getExecutionProfiles()) {
+                    
queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+                }
+                
stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
+            }
+
+            if (this.queryIdToProfileMap.size() <= 
Config.max_query_profile_num) {
+                return;
+            }
+
+            PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryStartTime();
+
+            while (queueIdDeque.size() > Config.max_query_profile_num) {
+                ProfileElement profileElement = queueIdDeque.poll();
+
+                
queryIdToProfileMap.remove(profileElement.profile.getSummaryProfile().getProfileId());
+                for (ExecutionProfile executionProfile : 
profileElement.profile.getExecutionProfiles()) {
+                    
queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+                }
+
+                
stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
+            }
+        } finally {
+            profileNum = queryIdToProfileMap.size();
+            writeLock.unlock();
+
+            if (stringBuilder.length() != 0) {
+                LOG.info("Remove outdated profiles {} from memoy, current 
profile map size {}",
+                        stringBuilder.toString(), profileNum);
+            }
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 2be8a8bcd2c..7d501fb5c11 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -73,8 +73,6 @@ public final class QeProcessorImpl implements QeProcessor {
     }
 
     private Status processQueryProfile(TQueryProfile profile, TNetworkAddress 
address, boolean isDone) {
-        LOG.info("New profile processing API, query {}", 
DebugUtil.printId(profile.query_id));
-
         ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(profile.query_id);
         if (executionProfile == null) {
             LOG.warn("Could not find execution profile with query id {}", 
DebugUtil.printId(profile.query_id));
@@ -247,9 +245,8 @@ public final class QeProcessorImpl implements QeProcessor {
         }
 
         if (params.isSetProfile() || params.isSetLoadChannelProfile()) {
-            LOG.info("ReportExecStatus(): fragment_instance_id={}, query 
id={}, backend num: {}, ip: {}",
-                    DebugUtil.printId(params.fragment_instance_id), 
DebugUtil.printId(params.query_id),
-                    params.backend_num, beAddr);
+            LOG.info("Reporting profile, query_id={}, fragment {} backend num: 
{}, ip: {}",
+                    DebugUtil.printId(params.query_id), 
params.getFragmentId(), params.backend_num, beAddr);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("params: {}", params);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 31032a4291b..ce726463b0e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -729,7 +729,7 @@ public class SessionVariable implements Serializable, 
Writable {
     // When enable_profile is true, profile of queries that costs more than 
autoProfileThresholdMs
     // will be stored to disk.
     @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
-    public int autoProfileThresholdMs = 500;
+    public int autoProfileThresholdMs = -1;
 
     @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
     public boolean runtimeFilterPruneForExternal = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to