This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dc2734a7dda [enhancement](profile) Store profile on disk so that we 
can hold more profile in memory (#33690)
dc2734a7dda is described below

commit dc2734a7dda83845a3247ecb98d3220c15f4ce3f
Author: zhiqiang <seuhezhiqi...@163.com>
AuthorDate: Fri Jul 26 10:31:10 2024 +0800

    [enhancement](profile) Store profile on disk so that we can hold more 
profile in memory (#33690)
    
    Step 3 of https://github.com/apache/doris/issues/33744
    
    Store profile on disk of FE, in the same path with audit log.
    1. ProfileManager will have a daemon thread that checks whether a
    profile should be stored on disk
    2. Which profile can be stored?
         a. query finished.
    b. collection of profile has finished or query itself has finished for a
    long time, like 5 seconds
    3. Profile structure on disk:
    ```
    * Integer: n(size of summary profile)
    * String: json of summary profile
    * Integer: m(size of compressed execution profile)
    * String: compressed binary of execution profile
    ```
    4. IO thread of ProfileManager will also remove garbage from disk.
    5. Once a profile is stored to disk, its detail content will be release
    from memory, so that we can hold more profiles in memory, further access
    to profile will read from disk directly.
    6. Basic but necessary UT for profile serialization.
    7. Refine constructor of ExecutionProfile.
    
    Further work:
    1. Abstract class ProfileReader/ProfileWriter, they define common
    interface for profile io, and we can implement
    ProfileDiskReader/ProfileDiskWriter and also
    ProfileS3Reader/ProfileS3Writer to support profile io on object storage.
    2. Finer granularity for profile creation. Currently for ddl like
    `create/drop table`, profile is also created, this is unnecessary.
    3. More regression test.
    4. More reasonable default value of max_profile_on_disk.
    5. system table for profile storage, so that we can figure out how much
    storage is costed for profile
    
    ---------
    
    Co-authored-by: yiguolei <676222...@qq.com>
    Co-authored-by: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
---
 .../main/java/org/apache/doris/common/Config.java  |  13 +
 .../doris/common/profile/ExecutionProfile.java     |  39 +-
 .../org/apache/doris/common/profile/Profile.java   | 496 +++++++++++++++++++--
 .../doris/common/profile/SummaryProfile.java       | 115 ++++-
 .../java/org/apache/doris/common/util/Counter.java |  35 ++
 .../apache/doris/common/util/ProfileManager.java   | 487 ++++++++++++++++----
 .../apache/doris/common/util/RuntimeProfile.java   |  66 ++-
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |  13 +-
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   3 -
 .../nereids/trees/plans/commands/LoadCommand.java  |   6 +-
 .../commands/insert/AbstractInsertExecutor.java    |   1 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  14 +-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |  17 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   5 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  29 +-
 .../doris/common/util/ProfilePersistentTest.java   | 314 +++++++++++++
 16 files changed, 1430 insertions(+), 223 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index f3629537b2c..eb158bf2a57 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.common;
 
+import java.io.File;
+
 public class Config extends ConfigBase {
 
     @ConfField(description = {"用户自定义配置文件的路径,用于存放 fe_custom.conf。该文件中的配置会覆盖 
fe.conf 中的配置",
@@ -2701,6 +2703,17 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean enable_cooldown_replica_affinity = true;
 
+    @ConfField
+    public static String spilled_profile_storage_path = 
System.getenv("LOG_DIR") + File.separator + "profile";
+
+    // The max number of profiles that can be stored to storage.
+    @ConfField
+    public static int max_spilled_profile_num = 500;
+
+    // The total size of profiles that can be stored to storage.
+    @ConfField
+    public static long spilled_profile_storage_limit_bytes = 1 * 1024 * 1024 * 
1024; // 1GB
+
     
//==========================================================================
     //                    begin of cloud config
     
//==========================================================================
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
index 7e61fd04ad7..d4a00939fe7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java
@@ -22,8 +22,6 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.Status;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.RuntimeProfile;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TDetailedReportParams;
@@ -33,7 +31,6 @@ import org.apache.doris.thrift.TReportExecStatusParams;
 import org.apache.doris.thrift.TRuntimeProfileTree;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TUniqueId;
-import org.apache.doris.thrift.TUnit;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -64,8 +61,6 @@ public class ExecutionProfile {
     private static final Logger LOG = 
LogManager.getLogger(ExecutionProfile.class);
 
     private final TUniqueId queryId;
-    private boolean isFinished = false;
-    private long startTime = 0L;
     private long queryFinishTime = 0L;
     // The root profile of this execution task
     private RuntimeProfile root;
@@ -86,7 +81,9 @@ public class ExecutionProfile {
     private Map<Integer, Integer> fragmentIdBeNum;
     private Map<Integer, Integer> seqNoToFragmentId;
 
-    public ExecutionProfile(TUniqueId queryId, List<PlanFragment> fragments) {
+    // Constructor does not need list<PlanFragment>, use List<FragmentId> is 
enough
+    // and will be convenient for the test.
+    public ExecutionProfile(TUniqueId queryId, List<Integer> fragmentIds) {
         this.queryId = queryId;
         root = new RuntimeProfile("Execution Profile " + 
DebugUtil.printId(queryId));
         RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments");
@@ -96,14 +93,14 @@ public class ExecutionProfile {
         fragmentIdBeNum = Maps.newHashMap();
         seqNoToFragmentId = Maps.newHashMap();
         int i = 0;
-        for (PlanFragment planFragment : fragments) {
+        for (int fragmentId : fragmentIds) {
             RuntimeProfile runtimeProfile = new RuntimeProfile("Fragment " + 
i);
-            fragmentProfiles.put(planFragment.getFragmentId().asInt(), 
runtimeProfile);
+            fragmentProfiles.put(fragmentId, runtimeProfile);
             fragmentsProfile.addChild(runtimeProfile);
-            multiBeProfile.put(planFragment.getFragmentId().asInt(),
+            multiBeProfile.put(fragmentId,
                     new ConcurrentHashMap<TNetworkAddress, 
List<RuntimeProfile>>());
-            fragmentIdBeNum.put(planFragment.getFragmentId().asInt(), 0);
-            seqNoToFragmentId.put(i, planFragment.getFragmentId().asInt());
+            fragmentIdBeNum.put(fragmentId, 0);
+            seqNoToFragmentId.put(i, fragmentId);
             ++i;
         }
         loadChannelProfile = new RuntimeProfile("LoadChannels");
@@ -152,6 +149,9 @@ public class ExecutionProfile {
     }
 
     public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> 
planNodeMap) {
+        for (RuntimeProfile fragmentProfile : fragmentProfiles.values()) {
+            fragmentProfile.sortChildren();
+        }
         /*
             * Fragment 0
             * ---Pipeline 0
@@ -179,23 +179,6 @@ public class ExecutionProfile {
         return root;
     }
 
-    // The execution profile is maintained in ProfileManager, if it is 
finished, then should
-    // remove it from it as soon as possible.
-    public void update(long startTime, boolean isFinished) {
-        if (this.isFinished) {
-            return;
-        }
-        this.isFinished = isFinished;
-        this.startTime = startTime;
-        if (startTime > 0) {
-            root.getCounterTotalTime().setValue(TUnit.TIME_MS, 
TimeUtils.getElapsedTimeMs(startTime));
-        }
-
-        for (RuntimeProfile fragmentProfile : fragmentProfiles.values()) {
-            fragmentProfile.sortChildren();
-        }
-    }
-
     public Status updateProfile(TQueryProfile profile, TNetworkAddress 
backendHBAddress, boolean isDone) {
         if (!profile.isSetQueryId()) {
             LOG.warn("QueryId is not set");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index b45527f60bd..14b186a683f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.common.profile;
 
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -24,15 +26,30 @@ import 
org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
 import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
 
 /**
  * Profile is a class to record the execution time of a query. It has the
@@ -57,20 +74,176 @@ import java.util.Map;
 public class Profile {
     private static final Logger LOG = LogManager.getLogger(Profile.class);
     private static final int MergedProfileLevel = 1;
-    private final String name;
-    private SummaryProfile summaryProfile;
+    // profile file name format: time_id
+    private static final String SEPERATOR = "_";
+
+    // id will be assgined to id of SummaryProfile.
+    // For broker load, its SummaryPRofile id is a string representation of a 
long integer,
+    // for others, it is queryID
+    private String id = "";
+    // summaryProfile will be serialized to storage as JSON, and we can 
recover it from storage
+    // recover of SummaryProfile is important, because it contains the meta 
information of the profile
+    // we need it to construct memory index for profile retrieving.
+    private SummaryProfile summaryProfile = new SummaryProfile();
+    // executionProfiles will be stored to storage as text, when geting 
profile content, we will read
+    // from storage directly.
     private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
-    private boolean isFinished;
-    private Map<Integer, String> planNodeMap;
+    // profileStoragePath will only be assigned when:
+    // 1. profile is stored to storage
+    // 2. or profile is loaded from storage
+    private String profileStoragePath = "";
+    // isQueryFinished means the coordinator or stmtexecutor is finished.
+    // does not mean the profile report has finished, since the report is 
async.
+    // finish of collection of profile is marked by isCompleted of 
ExecutionProfiles.
+    private boolean isQueryFinished = false;
+    // when coordinator finishes, it will mark finish time.
+    // we will wait for about 5 seconds to see if all profiles have been 
reported.
+    // if not, we will store the profile to storage, and release the memory,
+    // futher report will be ignored.
+    // why MAX_VALUE? So that we can use PriorityQueue to sort profile by 
finish time decreasing order.
+    private long queryFinishTimestamp = Long.MAX_VALUE;
+    private Map<Integer, String> planNodeMap = Maps.newHashMap();
+    private int profileLevel = MergedProfileLevel;
+    private long autoProfileDurationMs = 500;
+    // Profile size is the size of profile file
+    private long profileSize = 0;
 
-    private int profileLevel = 3;
+    // Need default constructor for read from storage
+    public Profile() {}
 
-    public Profile(String name, boolean isEnable, int profileLevel) {
-        this.name = name;
+    public Profile(boolean isEnable, int profileLevel, long 
autoProfileDurationMs) {
         this.summaryProfile = new SummaryProfile();
         // if disabled, just set isFinished to true, so that update() will do 
nothing
-        this.isFinished = !isEnable;
+        this.isQueryFinished = !isEnable;
         this.profileLevel = profileLevel;
+        this.autoProfileDurationMs = autoProfileDurationMs;
+    }
+
+    // check if the profile file is valid and create a file input stream
+    // user need to close the file stream.
+    private static FileInputStream createPorfileFileInputStream(String path) {
+        File profileFile = new File(path);
+        if (!profileFile.isFile()) {
+            LOG.warn("Profile storage path {} is invalid, its not a file.", 
profileFile.getAbsolutePath());
+            return null;
+        }
+
+        String[] parts = path.split(File.separator);
+        if (parts.length < 1) {
+            LOG.warn("Profile storage path {} is invalid", 
profileFile.getAbsolutePath());
+            return null;
+        }
+
+        // Profile could be a load task with multiple queries, so we call it 
id.
+        if (parseProfileFileName(parts[parts.length - 1]) == null) {
+            LOG.warn("{} is not a valid profile file", 
profileFile.getAbsolutePath());
+            return null;
+        }
+
+        FileInputStream profileMetaFileInputStream = null;
+        try {
+            profileMetaFileInputStream = new FileInputStream(path);
+        } catch (Exception e) {
+            LOG.warn("open profile file {} failed", path, e);
+        }
+
+        return profileMetaFileInputStream;
+    }
+
+    // For normal profile, the profile id is a TUniqueId, but for broker load, 
the profile id is a long.
+    public static String[] parseProfileFileName(String profileFileName) {
+        String [] timeAndID = profileFileName.split(SEPERATOR);
+        if (timeAndID.length != 2) {
+            return null;
+        }
+        TUniqueId thriftId = DebugUtil.parseTUniqueIdFromString(timeAndID[1]);
+        if (thriftId == null) {
+            if (Long.valueOf(timeAndID[1]) == null) {
+                return null;
+            }
+        }
+        return timeAndID;
+    }
+
+    // read method will only read summary profile, and return a Profile object
+    public static Profile read(String path) {
+        FileInputStream profileFileInputStream = null;
+        try {
+            profileFileInputStream = createPorfileFileInputStream(path);
+            // Maybe profile path is invalid
+            if (profileFileInputStream == null) {
+                return null;
+            }
+            File profileFile = new File(path);
+            long fileSize = profileFile.length();
+            // read method will move the cursor to the end of the summary 
profile
+            DataInput dataInput = new DataInputStream(profileFileInputStream);
+            Profile res = new Profile();
+            res.summaryProfile = SummaryProfile.read(dataInput);
+            res.setId(res.summaryProfile.getProfileId());
+            res.profileStoragePath = path;
+            res.isQueryFinished = true;
+            res.profileSize = fileSize;
+            String[] parts = path.split(File.separator);
+            String queryFinishTimeStr = 
parseProfileFileName(parts[parts.length - 1])[0];
+            // queryFinishTime is used for sorting profile by finish time.
+            res.queryFinishTimestamp = Long.valueOf(queryFinishTimeStr);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Read profile from storage: {}", 
res.summaryProfile.getProfileId());
+            }
+            return res;
+        } catch (Exception exception) {
+            LOG.error("read profile failed", exception);
+            return null;
+        } finally {
+            if (profileFileInputStream != null) {
+                try {
+                    profileFileInputStream.close();
+                } catch (Exception e) {
+                    LOG.warn("close profile file {} failed", path, e);
+                }
+            }
+        }
+    }
+
+    // Method to compress a string using Deflater
+    public static byte[] compressExecutionProfile(String str) throws 
IOException {
+        byte[] data = str.getBytes(StandardCharsets.UTF_8);
+        Deflater deflater = new Deflater();
+        deflater.setInput(data);
+        deflater.finish();
+
+        ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(data.length);
+        byte[] buffer = new byte[1024];
+        while (!deflater.finished()) {
+            int count = deflater.deflate(buffer);
+            outputStream.write(buffer, 0, count);
+        }
+        deflater.end();
+        outputStream.close();
+        return outputStream.toByteArray();
+    }
+
+    // Method to decompress a byte array using Inflater
+    public static String decompressExecutionProfile(byte[] data) throws 
IOException {
+        Inflater inflater = new Inflater();
+        inflater.setInput(data, 0, data.length);
+
+        ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(data.length);
+        byte[] buffer = new byte[1024];
+        try {
+            while (!inflater.finished()) {
+                int count = inflater.inflate(buffer);
+                outputStream.write(buffer, 0, count);
+            }
+            inflater.end();
+        } catch (Exception e) {
+            throw new IOException("Failed to decompress data", e);
+        } finally {
+            outputStream.close();
+        }
+
+        return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
     }
 
     // For load task, the profile contains many execution profiles
@@ -89,12 +262,13 @@ public class Profile {
 
     // This API will also add the profile to ProfileManager, so that we could 
get the profile from ProfileManager.
     // isFinished ONLY means the coordinator or stmtexecutor is finished.
-    public synchronized void updateSummary(long startTime, Map<String, String> 
summaryInfo, boolean isFinished,
+    public synchronized void updateSummary(Map<String, String> summaryInfo, 
boolean isFinished,
             Planner planner) {
         try {
-            if (this.isFinished) {
+            if (this.isQueryFinished) {
                 return;
             }
+
             if (planner instanceof NereidsPlanner) {
                 NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
                 StringBuilder builder = new StringBuilder();
@@ -119,20 +293,20 @@ public class Profile {
                     );
                 }
             }
+
             summaryProfile.update(summaryInfo);
-            for (ExecutionProfile executionProfile : executionProfiles) {
-                // Tell execution profile the start time
-                executionProfile.update(startTime, isFinished);
-            }
+            this.setId(summaryProfile.getProfileId());
 
+            if (isFinished) {
+                this.markQueryFinished(System.currentTimeMillis());
+            }
             // Nerids native insert not set planner, so it is null
             if (planner != null) {
                 this.planNodeMap = planner.getExplainStringMap();
             }
             ProfileManager.getInstance().pushProfile(this);
-            this.isFinished = isFinished;
         } catch (Throwable t) {
-            LOG.warn("update profile failed", t);
+            LOG.warn("update profile {} failed", id, t);
             throw t;
         }
     }
@@ -145,35 +319,16 @@ public class Profile {
         StringBuilder builder = new StringBuilder();
         // add summary to builder
         summaryProfile.prettyPrint(builder);
-        waitProfileCompleteIfNeeded();
-        // Only generate merged profile for select, insert into select.
-        // Not support broker load now.
-        if (this.profileLevel == MergedProfileLevel && 
this.executionProfiles.size() == 1) {
-            try {
-                builder.append("\n MergedProfile \n");
-                
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
 "     ");
-            } catch (Throwable aggProfileException) {
-                LOG.warn("build merged simple profile failed", 
aggProfileException);
-                builder.append("build merged simple profile failed");
-            }
-        }
-        try {
-            // For load task, they will have multiple execution_profiles.
-            for (ExecutionProfile executionProfile : executionProfiles) {
-                builder.append("\n");
-                executionProfile.getRoot().prettyPrint(builder, "");
-            }
-        } catch (Throwable aggProfileException) {
-            LOG.warn("build profile failed", aggProfileException);
-            builder.append("build  profile failed");
-        }
+        // read execution profile from storage or generate it from memory 
(during query execution)
+        getExecutionProfileContent(builder);
+
         return builder.toString();
     }
 
     // If the query is already finished, and user wants to get the profile, we 
should check
     // if BE has reported all profiles, if not, sleep 2s.
     private void waitProfileCompleteIfNeeded() {
-        if (!this.isFinished) {
+        if (!this.isQueryFinished) {
             return;
         }
         boolean allCompleted = true;
@@ -193,7 +348,7 @@ public class Profile {
     }
 
     private RuntimeProfile composeRootProfile() {
-        RuntimeProfile rootProfile = new RuntimeProfile(name);
+        RuntimeProfile rootProfile = new RuntimeProfile(id);
         rootProfile.addChild(summaryProfile.getSummary());
         rootProfile.addChild(summaryProfile.getExecutionSummary());
         for (ExecutionProfile executionProfile : executionProfiles) {
@@ -204,9 +359,268 @@ public class Profile {
     }
 
     public String getProfileBrief() {
-        waitProfileCompleteIfNeeded();
         RuntimeProfile rootProfile = composeRootProfile();
         Gson gson = new GsonBuilder().setPrettyPrinting().create();
         return gson.toJson(rootProfile.toBrief());
     }
+
+    // Read file if profile has been stored to storage.
+    public void getExecutionProfileContent(StringBuilder builder) {
+        if (builder == null) {
+            builder = new StringBuilder();
+        }
+
+        if (profileHasBeenStored()) {
+            LOG.info("Profile {} has been stored to storage, reading it from 
storage", id);
+
+            FileInputStream fileInputStream = null;
+
+            try {
+                fileInputStream = 
createPorfileFileInputStream(profileStoragePath);
+                if (fileInputStream == null) {
+                    builder.append("Failed to read execution profile from " + 
profileStoragePath);
+                    return;
+                }
+
+                DataInputStream dataInput = new 
DataInputStream(fileInputStream);
+                // skip summary profile
+                Text.readString(dataInput);
+                // read compressed execution profile
+                int binarySize = dataInput.readInt();
+                byte[] binaryExecutionProfile = new byte[binarySize];
+                dataInput.readFully(binaryExecutionProfile, 0, binarySize);
+                // decompress binary execution profile
+                String textExecutionProfile = 
decompressExecutionProfile(binaryExecutionProfile);
+                builder.append(textExecutionProfile);
+                return;
+            } catch (Exception e) {
+                LOG.error("An error occurred while reading execution profile 
from storage, profile storage path: {}",
+                        profileStoragePath, e);
+                builder.append("Failed to read execution profile from " + 
profileStoragePath);
+            } finally {
+                if (fileInputStream != null) {
+                    try {
+                        fileInputStream.close();
+                    } catch (Exception e) {
+                        LOG.warn("Close profile {} failed", 
profileStoragePath, e);
+                    }
+                }
+            }
+
+            return;
+        }
+
+        // Only generate merged profile for select, insert into select.
+        // Not support broker load now.
+        if (this.profileLevel == MergedProfileLevel && 
this.executionProfiles.size() == 1) {
+            try {
+                builder.append("\n MergedProfile \n");
+                
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
 "     ");
+            } catch (Throwable aggProfileException) {
+                LOG.warn("build merged simple profile failed", 
aggProfileException);
+                builder.append("build merged simple profile failed");
+            }
+        }
+        try {
+            // For load task, they will have multiple execution_profiles.
+            for (ExecutionProfile executionProfile : executionProfiles) {
+                builder.append("\n");
+                executionProfile.getRoot().prettyPrint(builder, "");
+            }
+        } catch (Throwable aggProfileException) {
+            LOG.warn("build profile failed", aggProfileException);
+            builder.append("build  profile failed");
+        }
+    }
+
+    public long getQueryFinishTimestamp() {
+        return this.queryFinishTimestamp;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    // For UT
+    public void setSummaryProfile(SummaryProfile summaryProfile) {
+        this.summaryProfile = summaryProfile;
+    }
+
+    public void releaseExecutionProfile() {
+        this.executionProfiles.clear();
+    }
+
+    public boolean shouldStoreToStorage() {
+        if (profileHasBeenStored()) {
+            return false;
+        }
+
+        if (!isQueryFinished) {
+            return false;
+        }
+
+        // below is the case where query has finished
+        boolean hasReportingProfile = false;
+
+        for (ExecutionProfile executionProfile : executionProfiles) {
+            if (!executionProfile.isCompleted()) {
+                hasReportingProfile = true;
+                break;
+            }
+        }
+
+        if (!hasReportingProfile) {
+            // query finished and no flying profile
+            // I do want to use TotalTime in summary profile, but it is an 
encoded string,
+            // it is hard to write a parse function.
+            long durationMs = this.queryFinishTimestamp - 
summaryProfile.getQueryBeginTime();
+            // time cost of this query is large enough.
+            if (this.queryFinishTimestamp != Long.MAX_VALUE && durationMs
+                    > (this.executionProfiles.size() * autoProfileDurationMs)) 
{
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Query/LoadJob {} costs {} ms, begin {} finish 
{}, need store its profile",
+                            id, durationMs, 
summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp);
+                }
+                return true;
+            }
+            return false;
+        }
+
+        if (this.queryFinishTimestamp == Long.MAX_VALUE) {
+            LOG.warn("Logical error, query {} has finished, but 
queryFinishTimestamp is not set,", id);
+            return false;
+        }
+
+        if (this.queryFinishTimestamp != Long.MAX_VALUE
+                    && (System.currentTimeMillis() - 
this.queryFinishTimestamp) > autoProfileDurationMs) {
+            LOG.warn("Profile {} should be stored to storage without waiting 
for incoming profile,"
+                    + " since it has been waiting for {} ms, query finished 
time: {}", id,
+                    System.currentTimeMillis() - this.queryFinishTimestamp, 
this.queryFinishTimestamp);
+
+            this.summaryProfile.setSystemMessage(
+                            "This profile is not complete, since its 
collection does not finish in time."
+                            + " Maybe increase auto_profile_threshold_ms 
current val: "
+                            + String.valueOf(autoProfileDurationMs));
+            return true;
+        }
+
+        // query finished, wait a while for reporting profile
+        return false;
+    }
+
+    public String getProfileStoragePath() {
+        return this.profileStoragePath;
+    }
+
+    public boolean profileHasBeenStored() {
+        return !Strings.isNullOrEmpty(profileStoragePath);
+    }
+
+    // Profile IO threads races with Coordinator threads.
+    public void markQueryFinished(long queryFinishTime) {
+        try {
+            if (this.profileHasBeenStored()) {
+                LOG.error("Logical error, profile {} has already been stored 
to storage", this.id);
+                return;
+            }
+
+            this.isQueryFinished = true;
+            this.queryFinishTimestamp = System.currentTimeMillis();
+        } catch (Throwable t) {
+            LOG.warn("mark query finished failed", t);
+            throw t;
+        }
+    }
+
+    public void writeToStorage(String systemProfileStorageDir) {
+        if (Strings.isNullOrEmpty(id)) {
+            LOG.warn("store profile failed, name is empty");
+            return;
+        }
+
+        if (!Strings.isNullOrEmpty(profileStoragePath)) {
+            LOG.error("Logical error, profile {} has already been stored to 
storage", id);
+            return;
+        }
+
+        final String profileId = this.summaryProfile.getProfileId();
+
+        // queryFinishTimeStamp_ProfileId
+        final String profileFilePath = systemProfileStorageDir + File.separator
+                                    + String.valueOf(this.queryFinishTimestamp)
+                                    + SEPERATOR + profileId;
+
+        File profileFile = new File(profileFilePath);
+        if (profileFile.exists()) {
+            LOG.warn("profile directory {} already exists, remove it", 
profileFile.getAbsolutePath());
+            profileFile.delete();
+        }
+
+        // File structure of profile:
+        /*
+         * Integer: n(size of summary profile)
+         * String: json of summary profile
+         * Integer: m(size of compressed execution profile)
+         * String: compressed binary of execution profile
+        */
+        FileOutputStream fileOutputStream = null;
+        try {
+            fileOutputStream = new FileOutputStream(profileFilePath);
+            DataOutputStream dataOutputStream = new 
DataOutputStream(fileOutputStream);
+            this.summaryProfile.write(dataOutputStream);
+
+            // store execution profiles as string
+            StringBuilder build = new StringBuilder();
+            getExecutionProfileContent(build);
+            byte[] buf = compressExecutionProfile(build.toString());
+            dataOutputStream.writeInt(buf.length);
+            dataOutputStream.write(buf);
+            build = null;
+            dataOutputStream.flush();
+            this.profileSize = profileFile.length();
+        } catch (Exception e) {
+            LOG.error("write {} summary profile failed", id, e);
+            return;
+        } finally {
+            try {
+                if (fileOutputStream != null) {
+                    fileOutputStream.close();
+                }
+            } catch (Exception e) {
+                LOG.warn("close profile file {} failed", profileFilePath, e);
+            }
+        }
+
+        this.profileStoragePath = profileFilePath;
+    }
+
+    // remove profile from storage
+    public void deleteFromStorage() {
+        if (!profileHasBeenStored()) {
+            return;
+        }
+
+        String storagePath = getProfileStoragePath();
+        if (Strings.isNullOrEmpty(storagePath)) {
+            LOG.warn("remove profile failed, storage path is empty");
+            return;
+        }
+
+        File profileFile = new File(storagePath);
+        if (!profileFile.exists()) {
+            LOG.warn("Profile {} does not exist", 
profileFile.getAbsolutePath());
+            return;
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Remove profile: {}", getProfileStoragePath());
+        }
+
+        if (!FileUtils.deleteQuietly(profileFile)) {
+            LOG.warn("remove profile {} failed", 
profileFile.getAbsolutePath());
+        }
+    }
+
+    public long getProfileSize() {
+        return this.profileSize;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index f827e130b74..ed1f0caa027 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -18,8 +18,10 @@
 package org.apache.doris.common.profile;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUnit;
 import org.apache.doris.transaction.TransactionType;
@@ -29,7 +31,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+import com.google.gson.annotations.SerializedName;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -62,6 +68,7 @@ public class SummaryProfile {
     public static final String WORKLOAD_GROUP = "Workload Group";
     public static final String PHYSICAL_PLAN = "Physical Plan";
     public static final String DISTRIBUTED_PLAN = "Distributed Plan";
+    public static final String SYSTEM_MESSAGE = "System Message";
     // Execution Summary
     public static final String EXECUTION_SUMMARY_PROFILE_NAME = "Execution 
Summary";
     public static final String ANALYSIS_TIME = "Analysis Time";
@@ -162,12 +169,13 @@ public class SummaryProfile {
             WRITE_RESULT_TIME,
             DORIS_VERSION,
             IS_NEREIDS,
-                    IS_CACHED,
+            IS_CACHED,
             TOTAL_INSTANCES_NUM,
             INSTANCES_NUM_PER_BE,
             PARALLEL_FRAGMENT_EXEC_INSTANCE,
             TRACE_ID,
-            TRANSACTION_COMMIT_TIME
+            TRANSACTION_COMMIT_TIME,
+            SYSTEM_MESSAGE
     );
 
     // Ident of each item. Default is 0, which doesn't need to present in this 
Map.
@@ -205,67 +213,116 @@ public class SummaryProfile {
             .put(HMS_UPDATE_PARTITION_CNT, 2)
             .build();
 
-    private RuntimeProfile summaryProfile;
-    private RuntimeProfile executionSummaryProfile;
-
+    @SerializedName(value = "summaryProfile")
+    private RuntimeProfile summaryProfile = new 
RuntimeProfile(SUMMARY_PROFILE_NAME);
+    @SerializedName(value = "executionSummaryProfile")
+    private RuntimeProfile executionSummaryProfile = new 
RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
+    @SerializedName(value = "parseSqlStartTime")
     private long parseSqlStartTime = -1;
+    @SerializedName(value = "parseSqlFinishTime")
     private long parseSqlFinishTime = -1;
+    @SerializedName(value = "nereidsAnalysisFinishTime")
     private long nereidsAnalysisFinishTime = -1;
+    @SerializedName(value = "nereidsRewriteFinishTime")
     private long nereidsRewriteFinishTime = -1;
+    @SerializedName(value = "nereidsOptimizeFinishTime")
     private long nereidsOptimizeFinishTime = -1;
+    @SerializedName(value = "nereidsTranslateFinishTime")
     private long nereidsTranslateFinishTime = -1;
     private long nereidsDistributeFinishTime = -1;
     // timestamp of query begin
+    @SerializedName(value = "queryBeginTime")
     private long queryBeginTime = -1;
     // Analysis end time
+    @SerializedName(value = "queryAnalysisFinishTime")
     private long queryAnalysisFinishTime = -1;
     // Join reorder end time
+    @SerializedName(value = "queryJoinReorderFinishTime")
     private long queryJoinReorderFinishTime = -1;
     // Create single node plan end time
+    @SerializedName(value = "queryCreateSingleNodeFinishTime")
     private long queryCreateSingleNodeFinishTime = -1;
     // Create distribute plan end time
+    @SerializedName(value = "queryDistributedFinishTime")
     private long queryDistributedFinishTime = -1;
+    @SerializedName(value = "initScanNodeStartTime")
     private long initScanNodeStartTime = -1;
+    @SerializedName(value = "initScanNodeFinishTime")
     private long initScanNodeFinishTime = -1;
+    @SerializedName(value = "finalizeScanNodeStartTime")
     private long finalizeScanNodeStartTime = -1;
+    @SerializedName(value = "finalizeScanNodeFinishTime")
     private long finalizeScanNodeFinishTime = -1;
+    @SerializedName(value = "getSplitsStartTime")
     private long getSplitsStartTime = -1;
+    @SerializedName(value = "getPartitionsFinishTime")
     private long getPartitionsFinishTime = -1;
+    @SerializedName(value = "getPartitionFilesFinishTime")
     private long getPartitionFilesFinishTime = -1;
+    @SerializedName(value = "getSplitsFinishTime")
     private long getSplitsFinishTime = -1;
+    @SerializedName(value = "createScanRangeFinishTime")
     private long createScanRangeFinishTime = -1;
     // Plan end time
+    @SerializedName(value = "queryPlanFinishTime")
     private long queryPlanFinishTime = -1;
+    @SerializedName(value = "assignFragmentTime")
     private long assignFragmentTime = -1;
+    @SerializedName(value = "fragmentSerializeTime")
     private long fragmentSerializeTime = -1;
+    @SerializedName(value = "fragmentSendPhase1Time")
     private long fragmentSendPhase1Time = -1;
+    @SerializedName(value = "fragmentSendPhase2Time")
     private long fragmentSendPhase2Time = -1;
+    @SerializedName(value = "fragmentCompressedSize")
     private long fragmentCompressedSize = 0;
+    @SerializedName(value = "fragmentRpcCount")
     private long fragmentRpcCount = 0;
     // Fragment schedule and send end time
+    @SerializedName(value = "queryScheduleFinishTime")
     private long queryScheduleFinishTime = -1;
     // Query result fetch end time
+    @SerializedName(value = "queryFetchResultFinishTime")
     private long queryFetchResultFinishTime = -1;
+    @SerializedName(value = "tempStarTime")
     private long tempStarTime = -1;
+    @SerializedName(value = "queryFetchResultConsumeTime")
     private long queryFetchResultConsumeTime = 0;
+    @SerializedName(value = "queryWriteResultConsumeTime")
     private long queryWriteResultConsumeTime = 0;
+    @SerializedName(value = "getPartitionVersionTime")
     private long getPartitionVersionTime = 0;
+    @SerializedName(value = "getPartitionVersionCount")
     private long getPartitionVersionCount = 0;
+    @SerializedName(value = "getPartitionVersionByHasDataCount")
     private long getPartitionVersionByHasDataCount = 0;
+    @SerializedName(value = "getTableVersionTime")
     private long getTableVersionTime = 0;
+    @SerializedName(value = "getTableVersionCount")
     private long getTableVersionCount = 0;
+    @SerializedName(value = "transactionCommitBeginTime")
     private long transactionCommitBeginTime = -1;
+    @SerializedName(value = "transactionCommitEndTime")
     private long transactionCommitEndTime = -1;
+    @SerializedName(value = "filesystemOptTime")
     private long filesystemOptTime = -1;
+    @SerializedName(value = "hmsAddPartitionTime")
     private long hmsAddPartitionTime = -1;
+    @SerializedName(value = "hmsAddPartitionCnt")
     private long hmsAddPartitionCnt = 0;
+    @SerializedName(value = "hmsUpdatePartitionTime")
     private long hmsUpdatePartitionTime = -1;
+    @SerializedName(value = "hmsUpdatePartitionCnt")
     private long hmsUpdatePartitionCnt = 0;
+    @SerializedName(value = "filesystemRenameFileCnt")
     private long filesystemRenameFileCnt = 0;
+    @SerializedName(value = "filesystemRenameDirCnt")
     private long filesystemRenameDirCnt = 0;
-
-    private long filesystemDeleteFileCnt = 0;
+    @SerializedName(value = "filesystemDeleteDirCnt")
     private long filesystemDeleteDirCnt = 0;
+    @SerializedName(value = "filesystemDeleteFileCnt")
+    private long filesystemDeleteFileCnt = 0;
+    @SerializedName(value = "transactionType")
     private TransactionType transactionType = TransactionType.UNKNOWN;
 
     // BE -> (RPC latency from FE to BE, Execution latency on bthread, 
Duration of doing work, RPC latency from BE
@@ -274,11 +331,34 @@ public class SummaryProfile {
     private Map<TNetworkAddress, List<Long>> rpcPhase2Latency;
 
     public SummaryProfile() {
-        summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME);
-        executionSummaryProfile = new 
RuntimeProfile(EXECUTION_SUMMARY_PROFILE_NAME);
         init();
     }
 
+    private void init() {
+        for (String key : SUMMARY_KEYS) {
+            summaryProfile.addInfoString(key, "N/A");
+        }
+        for (String key : EXECUTION_SUMMARY_KEYS) {
+            executionSummaryProfile.addInfoString(key, "N/A");
+        }
+    }
+
+    // For UT usage
+    public void fuzzyInit() {
+        for (String key : SUMMARY_KEYS) {
+            String randomId = String.valueOf(TimeUtils.getStartTimeMs());
+            summaryProfile.addInfoString(key, randomId);
+        }
+        for (String key : EXECUTION_SUMMARY_KEYS) {
+            String randomId = String.valueOf(TimeUtils.getStartTimeMs());
+            executionSummaryProfile.addInfoString(key, randomId);
+        }
+    }
+
+    public static SummaryProfile read(DataInput input) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(input), 
SummaryProfile.class);
+    }
+
     public String getProfileId() {
         return this.summaryProfile.getInfoString(PROFILE_ID);
     }
@@ -291,15 +371,6 @@ public class SummaryProfile {
         return executionSummaryProfile;
     }
 
-    private void init() {
-        for (String key : SUMMARY_KEYS) {
-            summaryProfile.addInfoString(key, "N/A");
-        }
-        for (String key : EXECUTION_SUMMARY_KEYS) {
-            executionSummaryProfile.addInfoString(key, "N/A");
-        }
-    }
-
     public void prettyPrint(StringBuilder builder) {
         summaryProfile.prettyPrint(builder, "");
         executionSummaryProfile.prettyPrint(builder, "");
@@ -829,4 +900,12 @@ public class SummaryProfile {
         }
         return new Gson().toJson(jsonObject);
     }
+
+    public void setSystemMessage(String msg) {
+        summaryProfile.addInfoString(SYSTEM_MESSAGE, msg);
+    }
+
+    public void write(DataOutput output) throws IOException {
+        Text.writeString(output, GsonUtils.GSON.toJson(this));
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
index af9cd1e6cd6..f6c06890047 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
@@ -17,14 +17,29 @@
 
 package org.apache.doris.common.util;
 
+import org.apache.doris.common.io.Text;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TUnit;
 
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 // Counter means indicators field. The counter's name is key, the counter 
itself is value.
 public class Counter {
+    @SerializedName(value = "value")
     private volatile long value;
+    @SerializedName(value = "type")
     private volatile int type;
+    @SerializedName(value = "level")
     private volatile long level;
 
+    public static Counter read(DataInput input) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(input), Counter.class);
+    }
+
     public long getValue() {
         return value;
     }
@@ -103,4 +118,24 @@ public class Counter {
         return RuntimeProfile.printCounter(value, getType());
     }
 
+    public String toString() {
+        return print();
+    }
+
+    public void write(DataOutput output) throws IOException {
+        Text.writeString(output, GsonUtils.GSON.toJson(this));
+    }
+
+    public boolean equals(Object rhs) {
+        if (this == rhs) {
+            return true;
+        }
+        if (rhs == null || getClass() != rhs.getClass()) {
+            return false;
+        }
+
+        Counter other = (Counter) rhs;
+        return other.value == value && other.type == type && other.level == 
level;
+    }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
index 4c1d6b30859..9f4fa151709 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java
@@ -44,18 +44,19 @@ import org.apache.doris.thrift.TUniqueId;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang3.tuple.Triple;
+import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
-import java.util.Deque;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -72,9 +73,10 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
  * the purpose is let coordinator can destruct earlier(the fragment profile is 
in Coordinator)
  *
  */
-public class ProfileManager {
+public class ProfileManager extends MasterDaemon {
     private static final Logger LOG = 
LogManager.getLogger(ProfileManager.class);
     private static volatile ProfileManager INSTANCE = null;
+    private static final String PROFILE_STORAGE_PATH = 
Config.spilled_profile_storage_path;
 
     public enum ProfileType {
         QUERY,
@@ -112,42 +114,60 @@ public class ProfileManager {
         public void setStatsErrorEstimator(StatsErrorEstimator 
statsErrorEstimator) {
             this.statsErrorEstimator = statsErrorEstimator;
         }
+
+        // Store profile to path
+        public void writeToStorage(String profileStoragePath) {
+            profile.writeToStorage(profileStoragePath);
+        }
+
+        // Remove profile from storage
+        public void deleteFromStorage() {
+            profile.deleteFromStorage();
+        }
     }
 
+    // this variable is assgiened to true the first time the profile is loaded 
from storage
+    // no futher write operaiton, so no data race
+    boolean isProfileLoaded = false;
+
     // only protect queryIdDeque; queryIdToProfileMap is concurrent, no need 
to protect
     private ReentrantReadWriteLock lock;
     private ReadLock readLock;
     private WriteLock writeLock;
 
-    // record the order of profiles by queryId
-    private Deque<String> queryIdDeque;
-    private Map<String, ProfileElement> queryIdToProfileMap; // from QueryId 
to RuntimeProfile
+    // profile id is long string for brocker load
+    // is TUniqueId for others.
+    private Map<String, ProfileElement> queryIdToProfileMap;
     // Sometimes one Profile is related with multiple execution 
profiles(Brokerload), so that
     // execution profile's query id is not related with Profile's query id.
     private Map<TUniqueId, ExecutionProfile> queryIdToExecutionProfiles;
 
     private final ExecutorService fetchRealTimeProfileExecutor;
+    private final ExecutorService profileIOExecutor;
 
     public static ProfileManager getInstance() {
         if (INSTANCE == null) {
             synchronized (ProfileManager.class) {
                 if (INSTANCE == null) {
                     INSTANCE = new ProfileManager();
+                    INSTANCE.start();
                 }
             }
         }
         return INSTANCE;
     }
 
-    private ProfileManager() {
+    // The visiablity of ProfileManager() is package level, so that we can 
write ut for it.
+    ProfileManager() {
         lock = new ReentrantReadWriteLock(true);
         readLock = lock.readLock();
         writeLock = lock.writeLock();
-        queryIdDeque = new LinkedList<>();
-        queryIdToProfileMap = new ConcurrentHashMap<>();
+        queryIdToProfileMap = Maps.newHashMap();
         queryIdToExecutionProfiles = Maps.newHashMap();
         fetchRealTimeProfileExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(
                 10, 100, "fetch-realtime-profile-pool", true);
+        profileIOExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
+                20, 100, "profile-io-thread-pool", true);
     }
 
     private ProfileElement createElement(Profile profile) {
@@ -211,79 +231,32 @@ public class ProfileManager {
 
         ProfileElement element = createElement(profile);
         // 'insert into' does have job_id, put all profiles key with query_id
-        String key = element.profile.getSummaryProfile().getProfileId();
+        String key = profile.getSummaryProfile().getProfileId();
         // check when push in, which can ensure every element in the list has 
QUERY_ID column,
         // so there is no need to check when remove element from list.
         if (Strings.isNullOrEmpty(key)) {
             LOG.warn("the key or value of Map is null, "
                     + "may be forget to insert 'QUERY_ID' or 'JOB_ID' column 
into infoStrings");
         }
-        writeLock.lock();
-        // a profile may be updated multiple times in queryIdToProfileMap,
-        // and only needs to be inserted into the queryIdDeque for the first 
time.
-        queryIdToProfileMap.put(key, element);
-        try {
-            if (!queryIdDeque.contains(key)) {
-                if (queryIdDeque.size() >= Config.max_query_profile_num) {
-                    ProfileElement profileElementRemoved = 
queryIdToProfileMap.remove(queryIdDeque.getFirst());
-                    // If the Profile object is removed from manager, then 
related execution profile is also useless.
-                    if (profileElementRemoved != null) {
-                        StringBuilder sb = new StringBuilder();
-                        for (ExecutionProfile executionProfile : 
profileElementRemoved.profile.getExecutionProfiles()) {
-                            
sb.append(executionProfile.getQueryId()).append(",");
-                            
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
-                        }
-                        LOG.warn("Remove expired profile {}, execution 
profiles {},"
-                                    + " queryIdDeque size {}, profile count 
{},"
-                                    + " execution profile count {} 
max_query_profile_num {}",
-                                    
profileElementRemoved.profile.getSummaryProfile().getProfileId(),
-                                    sb.toString(), queryIdDeque.size(), 
queryIdToProfileMap.size(),
-                                    queryIdToExecutionProfiles.size(), 
Config.max_query_profile_num);
-                    }
-                    queryIdDeque.removeFirst();
-                }
-                queryIdDeque.addLast(key);
-            }
-        } finally {
-            writeLock.unlock();
-        }
-    }
 
-    public void removeProfile(String profileId) {
         writeLock.lock();
         try {
-            ProfileElement profileElementRemoved = 
queryIdToProfileMap.remove(profileId);
-            // If the Profile object is removed from manager, then related 
execution profile is also useless.
-            if (profileElementRemoved != null) {
-                for (ExecutionProfile executionProfile : 
profileElementRemoved.profile.getExecutionProfiles()) {
-                    
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
-                }
-            }
+            // a profile may be updated multiple times in queryIdToProfileMap,
+            // and only needs to be inserted into the queryIdDeque for the 
first time.
+            queryIdToProfileMap.put(key, element);
         } finally {
             writeLock.unlock();
         }
     }
 
     public List<List<String>> getAllQueries() {
-        return getQueryWithType(null);
-    }
-
-    public List<List<String>> getQueryWithType(ProfileType type) {
         List<List<String>> result = Lists.newArrayList();
         readLock.lock();
         try {
-            Iterator reverse = queryIdDeque.descendingIterator();
-            while (reverse.hasNext()) {
-                String queryId = (String) reverse.next();
-                ProfileElement profileElement = 
queryIdToProfileMap.get(queryId);
-                if (profileElement == null) {
-                    continue;
-                }
+            PriorityQueue<ProfileElement> queueIdDeque = 
getProfileOrderByQueryFinishTime();
+            while (!queueIdDeque.isEmpty()) {
+                ProfileElement profileElement = queueIdDeque.poll();
                 Map<String, String> infoStrings = profileElement.infoStrings;
-                if (type != null && 
!infoStrings.get(SummaryProfile.TASK_TYPE).equalsIgnoreCase(type.name())) {
-                    continue;
-                }
-
                 List<String> row = Lists.newArrayList();
                 for (String str : SummaryProfile.SUMMARY_KEYS) {
                     row.add(infoStrings.get(str));
@@ -506,25 +479,6 @@ public class ProfileManager {
         return builder.getFragmentTreeRoot(executionId);
     }
 
-    public List<Triple<String, String, Long>> getFragmentInstanceList(String 
queryID,
-            String executionId, String fragmentId)
-            throws AnalysisException {
-        MultiProfileTreeBuilder builder;
-        readLock.lock();
-        try {
-            ProfileElement element = queryIdToProfileMap.get(queryID);
-            if (element == null || element.builder == null) {
-                throw new AnalysisException("failed to get instance list. err: 
"
-                        + (element == null ? "not found" : element.errMsg));
-            }
-            builder = element.builder;
-        } finally {
-            readLock.unlock();
-        }
-
-        return builder.getInstanceList(executionId, fragmentId);
-    }
-
     public ProfileTreeNode getInstanceProfileTree(String queryID, String 
executionId,
             String fragmentId, String instanceId)
             throws AnalysisException {
@@ -595,9 +549,370 @@ public class ProfileManager {
         writeLock.lock();
         try {
             queryIdToProfileMap.clear();
-            queryIdDeque.clear();
+            queryIdToExecutionProfiles.clear();
         } finally {
             writeLock.unlock();
         }
     }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        loadProfilesFromStorageIfFirstTime();
+        writeProfileToStorage();
+        deleteBrokenProfiles();
+        deleteOutdatedProfilesFromStorage();
+    }
+
+    // List PROFILE_STORAGE_PATH and return all dir names
+    // string will contain profile id and its storage timestamp
+    private List<String> getOnStorageProfileInfos() {
+        List<String> res = Lists.newArrayList();
+        try {
+            File profileDir = new File(PROFILE_STORAGE_PATH);
+            if (!profileDir.exists()) {
+                LOG.warn("Profile storage directory {} does not exist", 
PROFILE_STORAGE_PATH);
+                return res;
+            }
+
+            File[] files = profileDir.listFiles();
+            for (File file : files) {
+                if (file.isFile()) {
+                    res.add(file.getAbsolutePath());
+                }
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to get profile meta from storage", e);
+        }
+
+        return res;
+    }
+
+    // read profile file on storage
+    // deserialize to an object Profile
+    // push them to memory structure of ProfileManager for index
+    private void loadProfilesFromStorageIfFirstTime() {
+        if (this.isProfileLoaded) {
+            return;
+        }
+
+        try {
+            LOG.info("Reading profile from {}", PROFILE_STORAGE_PATH);
+            List<String> profileDirAbsPaths = getOnStorageProfileInfos();
+            // Thread safe list
+            List<Profile> profiles = Collections.synchronizedList(new 
ArrayList<>());
+            // List of profile io futures
+            List<Future<?>> profileIOfutures = Lists.newArrayList();
+            // Creatre and add task to executor
+            for (String profileDirAbsPath : profileDirAbsPaths) {
+                Thread thread = new Thread(() -> {
+                    Profile profile = Profile.read(profileDirAbsPath);
+                    if (profile != null) {
+                        profiles.add(profile);
+                    }
+                });
+                profileIOfutures.add(profileIOExecutor.submit(thread));
+            }
+
+            // Wait for all submitted futures to complete
+            for (Future<?> future : profileIOfutures) {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    LOG.warn("Failed to read profile from storage", e);
+                }
+            }
+
+            LOG.info("There are {} profiles loaded into memory", 
profiles.size());
+
+            // there may already has some queries running before this thread 
running
+            // so we should not clear current memory structure
+
+            for (Profile profile : profiles) {
+                this.pushProfile(profile);
+            }
+
+            this.isProfileLoaded = true;
+        } catch (Exception e) {
+            LOG.error("Failed to load query profile from storage", e);
+        }
+    }
+
+    private void createProfileStorageDirIfNecessary() {
+        File profileDir = new File(PROFILE_STORAGE_PATH);
+        if (profileDir.exists()) {
+            return;
+        }
+
+        // create query_id directory
+        if (!profileDir.mkdir()) {
+            LOG.warn("create profile directory {} failed", 
profileDir.getAbsolutePath());
+        } else {
+            LOG.info("Create profile storage {} succeed", 
PROFILE_STORAGE_PATH);
+        }
+    }
+
+    private List<ProfileElement> getProfilesNeedStore() {
+        List<ProfileElement> profilesToBeStored = Lists.newArrayList();
+
+        queryIdToProfileMap.forEach((queryId, profileElement) -> {
+            if (profileElement.profile.shouldStoreToStorage()) {
+                profilesToBeStored.add(profileElement);
+            }
+        });
+
+        return profilesToBeStored;
+    }
+
+    // Collect profiles that need to be stored to storage
+    // Store them to storage
+    // Release the memory
+    private void writeProfileToStorage() {
+        try {
+            if (Strings.isNullOrEmpty(PROFILE_STORAGE_PATH)) {
+                LOG.error("Logical error, PROFILE_STORAGE_PATH is empty");
+                return;
+            }
+
+            createProfileStorageDirIfNecessary();
+            List<ProfileElement> profilesToBeStored = Lists.newArrayList();
+
+            readLock.lock();
+            try {
+                profilesToBeStored = getProfilesNeedStore();
+            } finally {
+                readLock.unlock();
+            }
+
+            // Store profile to storage in parallel
+            List<Future<?>> profileWriteFutures = Lists.newArrayList();
+
+            for (ProfileElement profileElement : profilesToBeStored) {
+                Thread thread = new Thread(() -> {
+                    profileElement.writeToStorage(PROFILE_STORAGE_PATH);
+                });
+                profileWriteFutures.add(profileIOExecutor.submit(thread));
+            }
+
+            for (Future<?> future : profileWriteFutures) {
+                try {
+                    future.get();
+                } catch (Exception e) {
+                    LOG.warn("Failed to write profile to storage", e);
+                }
+            }
+
+            // After profile is stored to storage, the executoin profile must 
be ejected from memory
+            // or the memory will be exhausted
+
+            writeLock.lock();
+            try {
+                for (ProfileElement profileElement : profilesToBeStored) {
+                    for (ExecutionProfile executionProfile : 
profileElement.profile.getExecutionProfiles()) {
+                        
this.queryIdToExecutionProfiles.remove(executionProfile.getQueryId());
+                    }
+                    profileElement.profile.releaseExecutionProfile();
+                }
+            } finally {
+                writeLock.unlock();
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to remove query profile", e);
+        }
+    }
+
+    private List<ProfileElement> getProfilesToBeRemoved() {
+        // By order of query finish timestamp
+        // The profile with the least storage timestamp will be on the top of 
heap
+        PriorityQueue<ProfileElement> profileDeque = new 
PriorityQueue<>(Comparator.comparingLong(
+                (ProfileElement profileElement) -> 
profileElement.profile.getQueryFinishTimestamp()));
+
+        long totalProfileSize = 0;
+
+        // Collect all profiles that has been stored to storage
+        for (ProfileElement profileElement : queryIdToProfileMap.values()) {
+            if (profileElement.profile.profileHasBeenStored()) {
+                totalProfileSize += profileElement.profile.getProfileSize();
+                profileDeque.add(profileElement);
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("{} profiles size on storage: {}", profileDeque.size(),
+                        DebugUtil.printByteWithUnit(totalProfileSize));
+        }
+
+        final int maxSpilledProfileNum = Config.max_spilled_profile_num;
+        final long spilledProfileLimitBytes = 
Config.spilled_profile_storage_limit_bytes;
+        List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList();
+
+        while (profileDeque.size() > maxSpilledProfileNum || totalProfileSize 
>= spilledProfileLimitBytes) {
+            // First profile is the oldest profile
+            ProfileElement profileElement = profileDeque.poll();
+            totalProfileSize -= profileElement.profile.getProfileSize();
+            queryIdToBeRemoved.add(profileElement);
+        }
+
+        return queryIdToBeRemoved;
+    }
+
+    // We can not store all profiles on storage, because the storage space is 
limited
+    // So we need to remove the outdated profiles
+    private void deleteOutdatedProfilesFromStorage() {
+        try {
+            List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList();
+            readLock.lock();
+            try {
+                queryIdToBeRemoved = getProfilesToBeRemoved();
+            } finally {
+                readLock.unlock();
+            }
+
+            List<Thread> iothreads = Lists.newArrayList();
+
+            for (ProfileElement profileElement : queryIdToBeRemoved) {
+                Thread thread = new Thread(() -> {
+                    profileElement.deleteFromStorage();
+                });
+                thread.start();
+                iothreads.add(thread);
+            }
+
+            try {
+                for (Thread thread : iothreads) {
+                    thread.join();
+                }
+            } catch (InterruptedException e) {
+                LOG.error("Failed to remove outdated query profile", e);
+            }
+
+            writeLock.lock();
+            try {
+                for (ProfileElement profileElement : queryIdToBeRemoved) {
+                    
queryIdToProfileMap.remove(profileElement.profile.getSummaryProfile().getProfileId());
+                    TUniqueId thriftQueryId = 
DebugUtil.parseTUniqueIdFromString(
+                            
profileElement.profile.getSummaryProfile().getProfileId());
+                    queryIdToExecutionProfiles.remove(thriftQueryId);
+                }
+            } finally {
+                writeLock.unlock();
+            }
+
+            if (queryIdToBeRemoved.size() != 0 && LOG.isDebugEnabled()) {
+                StringBuilder builder = new StringBuilder();
+                for (ProfileElement profileElement : queryIdToBeRemoved) {
+                    
builder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(",");
+                }
+                LOG.debug("Remove outdated profile: {}", builder.toString());
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to remove outdated query profile", e);
+        }
+    }
+
+    private List<String> getBrokenProfiles() {
+        List<String> profilesOnStorage = getOnStorageProfileInfos();
+        List<String> brokenProfiles = Lists.newArrayList();
+
+        for (String profileDirAbsPath : profilesOnStorage) {
+            int separatorIdx = profileDirAbsPath.lastIndexOf(File.separator);
+            if (separatorIdx == -1) {
+                LOG.warn("Invalid profile path {}", profileDirAbsPath);
+                brokenProfiles.add(profileDirAbsPath);
+                continue;
+            }
+
+            String profileId = "";
+
+            try {
+                String timeStampAndId = 
profileDirAbsPath.substring(separatorIdx + 1);
+                String[] parsed = Profile.parseProfileFileName(timeStampAndId);
+                if (parsed == null) {
+                    LOG.warn("Invalid profile directory path: {}", 
profileDirAbsPath);
+                    brokenProfiles.add(profileDirAbsPath);
+                    continue;
+                } else {
+                    profileId = parsed[1];
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to get profile id from path: {}", 
profileDirAbsPath, e);
+                brokenProfiles.add(profileDirAbsPath);
+                continue;
+            }
+
+            readLock.lock();
+            try {
+                if (!queryIdToProfileMap.containsKey(profileId)) {
+                    LOG.debug("Wild profile {}, need to be removed.", 
profileDirAbsPath);
+                    brokenProfiles.add(profileDirAbsPath);
+                }
+            } finally {
+                readLock.unlock();
+            }
+        }
+
+        return brokenProfiles;
+    }
+
+    private void deleteBrokenProfiles() {
+        List<String> brokenProfiles = getBrokenProfiles();
+        List<Future<?>> profileDeleteFutures = Lists.newArrayList();
+
+        for (String brokenProfile : brokenProfiles) {
+            Thread iothread = new Thread(() -> {
+                try {
+                    File profileFile = new File(brokenProfile);
+                    if (!profileFile.isFile()) {
+                        LOG.warn("Profile path {} is not a file, can not 
delete.", brokenProfile);
+                        return;
+                    }
+
+                    FileUtils.deleteQuietly(profileFile);
+                    LOG.debug("Delete broken profile: {}", brokenProfile);
+                } catch (Exception e) {
+                    LOG.error("Failed to delete broken profile: {}", 
brokenProfile, e);
+                }
+            });
+            profileDeleteFutures.add(profileIOExecutor.submit(iothread));
+        }
+
+        for (Future<?> future : profileDeleteFutures) {
+            try {
+                future.get();
+            } catch (Exception e) {
+                LOG.error("Failed to remove broken profile", e);
+            }
+        }
+    }
+
+    // The init value of query finish time of profile is MAX_VALUE
+    // So more recent query will be on the top of heap.
+    private PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTime() {
+        PriorityQueue<ProfileElement> queryIdDeque = new 
PriorityQueue<>(Comparator.comparingLong(
+                (ProfileElement profileElement) -> 
profileElement.profile.getQueryFinishTimestamp()).reversed());
+
+        queryIdToProfileMap.forEach((queryId, profileElement) -> {
+            queryIdDeque.add(profileElement);
+        });
+
+        return queryIdDeque;
+    }
+
+    // When the query is finished, the execution profile should be marked as 
finished
+    // For load task, one of its execution profile is finished.
+    public void markExecutionProfileFinished(TUniqueId queryId) {
+        readLock.lock();
+        try {
+            ExecutionProfile execProfile = 
queryIdToExecutionProfiles.get(queryId);
+            if (execProfile == null) {
+                LOG.debug("Profile {} does not exist, already finished or does 
not enable profile",
+                        DebugUtil.printId(queryId));
+                return;
+            }
+            execProfile.setQueryFinishTime(System.currentTimeMillis());
+        } catch (Exception e) {
+            LOG.error("Failed to mark query {} finished", 
DebugUtil.printId(queryId), e);
+        } finally {
+            readLock.unlock();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
index 0ff3ea80a41..60207b49172 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java
@@ -19,7 +19,9 @@ package org.apache.doris.common.util;
 
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.Reference;
+import org.apache.doris.common.io.Text;
 import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.thrift.TCounter;
 import org.apache.doris.thrift.TRuntimeProfileNode;
 import org.apache.doris.thrift.TRuntimeProfileTree;
@@ -29,9 +31,13 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Formatter;
 import java.util.LinkedList;
@@ -53,54 +59,80 @@ public class RuntimeProfile {
     public static String MIN_TIME_PRE = "min ";
     public static String AVG_TIME_PRE = "avg ";
     public static String SUM_TIME_PRE = "sum ";
+    @SerializedName(value = "counterTotalTime")
     private Counter counterTotalTime;
-    private double localTimePercent;
-
+    @SerializedName(value = "localTimePercent")
+    private double localTimePercent = 0;
+    @SerializedName(value = "infoStrings")
     private Map<String, String> infoStrings = Maps.newHashMap();
+    @SerializedName(value = "infoStringsDisplayOrder")
     private List<String> infoStringsDisplayOrder = Lists.newArrayList();
-    private ReentrantReadWriteLock infoStringsLock = new 
ReentrantReadWriteLock();
+    private transient ReentrantReadWriteLock infoStringsLock = new 
ReentrantReadWriteLock();
 
+    @SerializedName(value = "counterMap")
     private Map<String, Counter> counterMap = Maps.newConcurrentMap();
+    @SerializedName(value = "childCounterMap")
     private Map<String, TreeSet<String>> childCounterMap = 
Maps.newConcurrentMap();
     // protect TreeSet in ChildCounterMap
-    private ReentrantReadWriteLock counterLock = new ReentrantReadWriteLock();
-
+    private transient ReentrantReadWriteLock counterLock = new 
ReentrantReadWriteLock();
+    @SerializedName(value = "childMap")
     private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
+    @SerializedName(value = "childList")
     private LinkedList<Pair<RuntimeProfile, Boolean>> childList = 
Lists.newLinkedList();
-    private ReentrantReadWriteLock childLock = new ReentrantReadWriteLock();
-
+    private transient ReentrantReadWriteLock childLock = new 
ReentrantReadWriteLock();
+    @SerializedName(value = "planNodeInfos")
     private List<String> planNodeInfos = Lists.newArrayList();
-    // name should not changed.
-    private final String name;
 
+    @SerializedName(value = "name")
+    private String name = "";
+    @SerializedName(value = "timestamp")
     private Long timestamp = -1L;
-
+    @SerializedName(value = "isDone")
     private Boolean isDone = false;
+    @SerializedName(value = "isCancel")
     private Boolean isCancel = false;
-
+    // In pipelineX, we have explicitly split the Operator into sink and 
operator,
+    // and we can distinguish them using tags.
+    // In the old pipeline, we can only differentiate them based on their 
position
+    // in the profile, which is quite tricky and only transitional.
+    @SerializedName(value = "isSinkOperator")
     private Boolean isSinkOperator = false;
-
+    @SerializedName(value = "nodeid")
     private int nodeid = -1;
 
+    public RuntimeProfile() {
+        init();
+    }
+
     public RuntimeProfile(String name) {
-        this.localTimePercent = 0;
         if (Strings.isNullOrEmpty(name)) {
             throw new RuntimeException("Profile name must not be null");
         }
         this.name = name;
         this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
         this.counterMap.put("TotalTime", counterTotalTime);
+        init();
     }
 
     public RuntimeProfile(String name, int nodeId) {
-        this.localTimePercent = 0;
         if (Strings.isNullOrEmpty(name)) {
             throw new RuntimeException("Profile name must not be null");
         }
         this.name = name;
+        this.nodeid = nodeId;
         this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 3);
         this.counterMap.put("TotalTime", counterTotalTime);
-        this.nodeid = nodeId;
+        init();
+    }
+
+    private void init() {
+        this.infoStringsLock = new ReentrantReadWriteLock();
+        this.childLock = new ReentrantReadWriteLock();
+        this.counterLock = new ReentrantReadWriteLock();
+    }
+
+    public static RuntimeProfile read(DataInput input) throws IOException {
+        return GsonUtils.GSON.fromJson(Text.readString(input), 
RuntimeProfile.class);
     }
 
     public void setIsCancel(Boolean isCancel) {
@@ -744,4 +776,8 @@ public class RuntimeProfile {
     public Map<String, String> getInfoStrings() {
         return infoStrings;
     }
+
+    public void write(DataOutput output) throws IOException {
+        Text.writeString(output, GsonUtils.GSON.toJson(this));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 110c98c3027..fb5f06fced5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -242,10 +242,12 @@ public class BrokerLoadJob extends BulkLoadJob {
         // divide job into broker loading task by table
         List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList();
         if (enableProfile) {
-            this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + 
label, true,
-                    
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, 
"3")));
-            // profile is registered in ProfileManager, so that we can get 
realtime profile
-            jobProfile.updateSummary(loadStartTimestamp, 
getSummaryInfo(false), false, null);
+            this.jobProfile = new Profile(
+                    true,
+                    
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, 
"3")),
+                    
Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.AUTO_PROFILE_THRESHOLD_MS,
 "500")));
+            // TODO: 怎么给这些 load job 设置 profile 记录时间
+            // this.jobProfile.setId("BrokerLoadJob " + id + ". " + label);
         }
         ProgressManager progressManager = Env.getCurrentProgressManager();
         progressManager.registerProgressSimple(String.valueOf(id));
@@ -387,6 +389,7 @@ public class BrokerLoadJob extends BulkLoadJob {
     private Map<String, String> getSummaryInfo(boolean isFinished) {
         long currentTimestamp = System.currentTimeMillis();
         SummaryBuilder builder = new SummaryBuilder();
+        // Id of summary profile will be shown as the profile id in the web 
page
         builder.profileId(String.valueOf(id));
         if (Version.DORIS_BUILD_VERSION_MAJOR == 0) {
             builder.dorisVersion(Version.DORIS_BUILD_SHORT_HASH);
@@ -459,7 +462,7 @@ public class BrokerLoadJob extends BulkLoadJob {
         if (!enableProfile) {
             return;
         }
-        jobProfile.updateSummary(createTimestamp, getSummaryInfo(true), true, 
null);
+        jobProfile.updateSummary(getSummaryInfo(true), true, null);
         // jobProfile has been pushed into ProfileManager, remove reference in 
brokerLoadJob
         jobProfile = null;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index df66dfdd232..d30a5a53ed6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -182,9 +182,6 @@ public class LoadLoadingTask extends LoadTask {
         try {
             QeProcessorImpl.INSTANCE.registerQuery(loadId, new 
QeProcessorImpl.QueryInfo(curCoordinator));
             actualExecute(curCoordinator, timeoutS);
-            if (this.jobProfile != null) {
-                curCoordinator.getExecutionProfile().update(beginTime, true);
-            }
         } finally {
             QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
index 53ee7263b00..fbe3e45bbaf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java
@@ -128,8 +128,10 @@ public class LoadCommand extends Command implements 
ForwardWithSync {
             ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
             throw new AnalysisException("Fallback to legacy planner 
temporary.");
         }
-        this.profile = new Profile("Query", 
ctx.getSessionVariable().enableProfile,
-                ctx.getSessionVariable().profileLevel);
+        this.profile = new Profile(
+                ctx.getSessionVariable().enableProfile,
+                ctx.getSessionVariable().profileLevel,
+                ctx.getSessionVariable().getAutoProfileThresholdMs());
         profile.getSummaryProfile().setQueryBeginTime();
         if (sourceInfos.size() == 1) {
             plans = ImmutableList.of(new 
InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 11751115517..defcd6c6e99 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -188,6 +188,7 @@ public abstract class AbstractInsertExecutor {
     public void executeSingleInsert(StmtExecutor executor, long jobId) throws 
Exception {
         beforeExec();
         try {
+            executor.updateProfile(false);
             execImpl(executor, jobId);
             checkStrictModeAndFilterRatio();
             int retryTimes = 0;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 70857c9be5b..b07e2d7304c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -366,7 +366,12 @@ public class Coordinator implements CoordInterface {
         }
         this.assignedRuntimeFilters = planner.getRuntimeFilters();
         this.topnFilters = planner.getTopnFilters();
-        this.executionProfile = new ExecutionProfile(queryId, fragments);
+
+        List<Integer> fragmentIds = new ArrayList<>();
+        for (PlanFragment fragment : fragments) {
+            fragmentIds.add(fragment.getFragmentId().asInt());
+        }
+        this.executionProfile = new ExecutionProfile(queryId, fragmentIds);
     }
 
     // Used for broker load task/export task/update coordinator
@@ -386,7 +391,12 @@ public class Coordinator implements CoordInterface {
         this.queryGlobals.setTimeZone(timezone);
         this.queryGlobals.setLoadZeroTolerance(loadZeroTolerance);
         this.queryOptions.setBeExecVersion(Config.be_exec_version);
-        this.executionProfile = new ExecutionProfile(queryId, fragments);
+
+        List<Integer> fragmentIds = new ArrayList<>();
+        for (PlanFragment fragment : fragments) {
+            fragmentIds.add(fragment.getFragmentId().asInt());
+        }
+        this.executionProfile = new ExecutionProfile(queryId, fragmentIds);
     }
 
     private void setFromUserProperty(ConnectContext connectContext) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index 4f9f51a951e..2be8a8bcd2c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -167,18 +167,13 @@ public final class QeProcessorImpl implements QeProcessor 
{
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Deregister query id {}", 
DebugUtil.printId(queryId));
             }
-            ExecutionProfile executionProfile = 
ProfileManager.getInstance().getExecutionProfile(queryId);
-            if (executionProfile != null) {
-                
executionProfile.setQueryFinishTime(System.currentTimeMillis());
-                if (queryInfo.connectContext != null) {
-                    long autoProfileThresholdMs = queryInfo.connectContext
-                            .getSessionVariable().getAutoProfileThresholdMs();
-                    if (autoProfileThresholdMs > 0 && 
System.currentTimeMillis() - queryInfo.getStartExecTime()
-                            < autoProfileThresholdMs) {
-                        
ProfileManager.getInstance().removeProfile(executionProfile.getSummaryProfile().getProfileId());
-                    }
-                }
+
+            // Here we shuold use query option instead of ConnectContext,
+            // because for the coordinator of load task, it does not have 
ConnectContext.
+            if (queryInfo.getCoord().getQueryOptions().enable_profile) {
+                
ProfileManager.getInstance().markExecutionProfileFinished(queryId);
             }
+
             if (queryInfo.getConnectContext() != null
                     && 
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
             ) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 106e2f3c1be..46fb386fc30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -728,9 +728,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
     public boolean enableProfile = false;
 
-    // if true, need report to coordinator when plan fragment execute 
successfully.
+    // When enable_profile is true, profile of queries that costs more than 
autoProfileThresholdMs
+    // will be stored to disk.
     @VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
-    public int autoProfileThresholdMs = -1;
+    public int autoProfileThresholdMs = 500;
 
     @VariableMgr.VarAttr(name = "runtime_filter_prune_for_external")
     public boolean runtimeFilterPruneForExternal = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 207e75b5065..9d7c971ca7b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -310,8 +310,10 @@ public class StmtExecutor {
         this.isProxy = isProxy;
         this.statementContext = new StatementContext(context, originStmt);
         this.context.setStatementContext(statementContext);
-        this.profile = new Profile("Query", 
this.context.getSessionVariable().enableProfile,
-                this.context.getSessionVariable().profileLevel);
+        this.profile = new Profile(
+                this.context.getSessionVariable().enableProfile(),
+                this.context.getSessionVariable().getProfileLevel(),
+                this.context.getSessionVariable().getAutoProfileThresholdMs());
     }
 
     // for test
@@ -341,8 +343,10 @@ public class StmtExecutor {
             this.statementContext.setParsedStatement(parsedStmt);
         }
         this.context.setStatementContext(statementContext);
-        this.profile = new Profile("Query", 
context.getSessionVariable().enableProfile(),
-                context.getSessionVariable().profileLevel);
+        this.profile = new Profile(
+                            context.getSessionVariable().enableProfile(),
+                            context.getSessionVariable().getProfileLevel(),
+                            
context.getSessionVariable().getAutoProfileThresholdMs());
     }
 
     public static InternalService.PDataRow getRowStringValue(List<Expr> cols,
@@ -387,6 +391,10 @@ public class StmtExecutor {
         }
         builder.taskType(profileType.name());
         builder.startTime(TimeUtils.longToTimeString(context.getStartTime()));
+        // TODO: Never use custom data format when deliverying information 
between two systems.
+        // UI can not order profile by TOTAL_TIME since its not a sortable 
string (2h1m3s > 2h1s?)
+        // to get decoded info, UI need to decode it first, it means others 
need to
+        // reference the implementation of DebugUtil.getPrettyStringMs to 
figure out the format
         if (isFinished) {
             builder.endTime(TimeUtils.longToTimeString(currentTimestamp));
             builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - 
context.getStartTime()));
@@ -1200,7 +1208,7 @@ public class StmtExecutor {
         // and ensure the sql is finished normally. For example, if update 
profile
         // failed, the insert stmt should be success
         try {
-            profile.updateSummary(context.startTime, 
getSummaryInfo(isFinished), isFinished, this.planner);
+            profile.updateSummary(getSummaryInfo(isFinished), isFinished, 
this.planner);
         } catch (Throwable t) {
             LOG.warn("failed to update profile, ignore this error", t);
         }
@@ -3415,7 +3423,10 @@ public class StmtExecutor {
                 throw new RuntimeException("Failed to execute internal SQL. " 
+ Util.getRootCauseMessage(e), e);
             }
             RowBatch batch;
-            coord =  EnvFactory.getInstance().createCoordinator(context, 
analyzer,
+            if (Config.enable_collect_internal_query_profile) {
+                context.getSessionVariable().enableProfile = true;
+            }
+            coord = EnvFactory.getInstance().createCoordinator(context, 
analyzer,
                     planner, context.getStatsErrorEstimator());
             profile.addExecutionProfile(coord.getExecutionProfile());
             try {
@@ -3424,7 +3435,7 @@ public class StmtExecutor {
             } catch (UserException e) {
                 throw new RuntimeException("Failed to execute internal SQL. " 
+ Util.getRootCauseMessage(e), e);
             }
-
+            updateProfile(false);
             try {
                 coord.exec();
             } catch (Exception e) {
@@ -3454,10 +3465,8 @@ public class StmtExecutor {
             }
             AuditLogHelper.logAuditLog(context, originStmt.originStmt, 
parsedStmt, getQueryStatisticsForAuditLog(),
                     true);
-            if (Config.enable_collect_internal_query_profile) {
-                updateProfile(true);
-            }
             QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
+            updateProfile(true);
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
new file mode 100644
index 00000000000..c21c57ba1d4
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.common.profile.ExecutionProfile;
+import org.apache.doris.common.profile.Profile;
+import org.apache.doris.common.profile.SummaryProfile;
+import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder;
+import org.apache.doris.thrift.QueryState;
+import org.apache.doris.thrift.TUniqueId;
+import org.apache.doris.thrift.TUnit;
+
+import com.google.common.base.Strings;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+public class ProfilePersistentTest {
+    private static final Logger LOG = 
LogManager.getLogger(ProfilePersistentTest.class);
+
+    public static SummaryProfile constructRandomSummaryProfile() {
+        TUniqueId qUniqueId = new TUniqueId();
+        UUID uuid = UUID.randomUUID();
+        qUniqueId.setHi(uuid.getMostSignificantBits());
+        qUniqueId.setLo(uuid.getLeastSignificantBits());
+        // Construct a summary profile
+        SummaryBuilder builder = new SummaryBuilder();
+        builder.profileId(DebugUtil.printId(qUniqueId));
+        builder.taskType(System.currentTimeMillis() % 2 == 0 ? "QUERY" : 
"LOAD");
+        long currentTimestampSeconds = System.currentTimeMillis() / 1000;
+        builder.startTime(TimeUtils.longToTimeString(currentTimestampSeconds));
+        builder.endTime(TimeUtils.longToTimeString(currentTimestampSeconds + 
10));
+        builder.totalTime(DebugUtil.getPrettyStringMs(10));
+        builder.taskState(QueryState.RUNNING.toString());
+        builder.user(DebugUtil.printId(qUniqueId) + "-user");
+        builder.defaultDb(DebugUtil.printId(qUniqueId) + "-db");
+
+        SummaryProfile summaryProfile = new SummaryProfile();
+        summaryProfile.fuzzyInit();
+        summaryProfile.update(builder.build());
+
+        return summaryProfile;
+    }
+
+    public static Profile constructRandomProfile(int executionProfileNum) {
+        Profile profile = new Profile();
+        SummaryProfile summaryProfile = constructRandomSummaryProfile();
+        String stringUniqueId = summaryProfile.getProfileId();
+        TUniqueId thriftUniqueId = 
DebugUtil.parseTUniqueIdFromString(stringUniqueId);
+        profile.setId(stringUniqueId);
+        profile.setSummaryProfile(summaryProfile);
+
+        for (int i = 0; i < executionProfileNum; i++) {
+            RuntimeProfile runtimeProfile = new RuntimeProfile("profile-" + i);
+            runtimeProfile.addCounter(String.valueOf(0), TUnit.BYTES, 
RuntimeProfile.ROOT_COUNTER);
+            runtimeProfile.addCounter(String.valueOf(1), TUnit.BYTES, 
String.valueOf(0));
+            runtimeProfile.addCounter(String.valueOf(2), TUnit.BYTES, 
String.valueOf(1));
+            runtimeProfile.addCounter(String.valueOf(3), TUnit.BYTES, 
String.valueOf(2));
+            List<Integer> fragmentIds = new ArrayList<>();
+            fragmentIds.add(i);
+
+            ExecutionProfile executionProfile = new 
ExecutionProfile(thriftUniqueId, fragmentIds);
+            profile.addExecutionProfile(executionProfile);
+        }
+
+        return profile;
+    }
+
+    @Test
+    public void compressBasicTest() {
+        // Initialize StringBuilder for faster string construction
+        StringBuilder executionProfileTextBuilder = new StringBuilder(1024 * 
1024);
+        // Populate the StringBuilder with random characters
+        for (int i = 0; i < 1024 * 1024; i++) {
+            executionProfileTextBuilder.append((char) (Math.random() * 26 + 
'a'));
+        }
+        // Convert StringBuilder to String
+        String executionProfileText = executionProfileTextBuilder.toString();
+
+        byte[] compressed = null;
+        try {
+            compressed = 
Profile.compressExecutionProfile(executionProfileText);
+        } catch (IOException e) {
+            LOG.error("Failed to compress execution profile: {}", 
e.getMessage(), e);
+            Assert.fail();
+        }
+        String executionProfileTextDecompressed = null;
+        try {
+            executionProfileTextDecompressed = 
Profile.decompressExecutionProfile(compressed);
+        } catch (IOException e) {
+            LOG.error("Failed to decompress execution profile: {}", 
e.getMessage(), e);
+            Assert.fail();
+        }
+        Assert.assertEquals(executionProfileText, 
executionProfileTextDecompressed);
+    }
+
+    @Test
+    public void counterBasicTest() {
+        TUnit thriftType = TUnit.TIME_NS;
+        long value = 1000;
+        Counter counter = new Counter(thriftType, value);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(baos);
+        boolean writeFailed = false;
+
+        try {
+            counter.write(output);
+        } catch (Exception e) {
+            writeFailed = true;
+        }
+
+        Assert.assertFalse(writeFailed);
+
+        byte[] data = baos.toByteArray();
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        DataInput input = new DataInputStream(bais);
+
+        boolean readFailed = false;
+        Counter deserializedCounter = null;
+        try {
+            deserializedCounter = Counter.read(input);
+        } catch (Exception e) {
+            readFailed = true;
+        }
+
+        Assert.assertFalse(readFailed);
+        Assert.assertEquals(deserializedCounter.getValue(), 
counter.getValue());
+        Assert.assertEquals(deserializedCounter.getType(), counter.getType());
+        Assert.assertEquals(deserializedCounter.toString(), 
counter.toString());
+    }
+
+    @Test
+    public void runtimeProfileBasicTest() {
+        RuntimeProfile profile = new RuntimeProfile("profile");
+        for (int i = 0; i < 5; i++) {
+            if (i == 0) {
+                profile.addCounter(String.valueOf(i), TUnit.BYTES, 
RuntimeProfile.ROOT_COUNTER);
+            } else {
+                profile.addCounter(String.valueOf(i), TUnit.BYTES, 
String.valueOf(i - 1));
+            }
+        }
+
+        // 1 second
+        profile.getCounterTotalTime().setValue(1000 * 1000 * 1000);
+        profile.computeTimeInProfile();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(baos);
+        boolean writeFailed = false;
+
+        try {
+            profile.write(output);
+        } catch (Exception e) {
+            writeFailed = true;
+        }
+
+        Assert.assertFalse(writeFailed);
+
+        byte[] data = baos.toByteArray();
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        DataInput input = new DataInputStream(bais);
+
+        boolean readFailed = false;
+        RuntimeProfile deserializedProfile = null;
+        try {
+            deserializedProfile = RuntimeProfile.read(input);
+        } catch (Exception e) {
+            readFailed = true;
+        }
+
+        Assert.assertFalse(readFailed);
+        Assert.assertEquals(profile.getName(), deserializedProfile.getName());
+        Assert.assertEquals(profile.getCounterTotalTime(), 
deserializedProfile.getCounterTotalTime());
+
+        for (Entry<String, Counter> entry : 
profile.getCounterMap().entrySet()) {
+            String key = entry.getKey();
+            Counter counter = entry.getValue();
+            Counter deserializedCounter = 
deserializedProfile.getCounterMap().get(key);
+            Assert.assertEquals(counter, deserializedCounter);
+        }
+
+        StringBuilder builder1 = new StringBuilder();
+        profile.prettyPrint(builder1, "");
+        StringBuilder builder2 = new StringBuilder();
+        deserializedProfile.prettyPrint(builder2, "");
+        Assert.assertEquals(builder1.toString(), builder2.toString());
+    }
+
+    @Test
+    public void summaryProfileBasicTest() {
+        SummaryProfile summaryProfile = new SummaryProfile();
+        summaryProfile.fuzzyInit();
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(baos);
+        boolean writeFailed = false;
+
+        try {
+            summaryProfile.write(output);
+        } catch (Exception e) {
+            writeFailed = true;
+        }
+
+        Assert.assertFalse(writeFailed);
+
+        byte[] data = baos.toByteArray();
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        DataInput input = new DataInputStream(bais);
+
+        boolean readFailed = false;
+        SummaryProfile deserializedSummaryProfile = null;
+        try {
+            deserializedSummaryProfile = SummaryProfile.read(input);
+        } catch (Exception e) {
+            LOG.info("read failed: {}", e.getMessage(), e);
+            readFailed = true;
+        }
+        Assert.assertFalse(readFailed);
+
+        StringBuilder builder1 = new StringBuilder();
+        summaryProfile.prettyPrint(builder1);
+        StringBuilder builder2 = new StringBuilder();
+        deserializedSummaryProfile.prettyPrint(builder2);
+
+        Assert.assertNotEquals("", builder1.toString());
+        Assert.assertEquals(builder1.toString(), builder2.toString());
+
+        for (Entry<String, String> entry : 
summaryProfile.getAsInfoStings().entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            String deserializedValue = 
deserializedSummaryProfile.getAsInfoStings().get(key);
+            Assert.assertEquals(value, deserializedValue);
+        }
+    }
+
+    @Test
+    public void profileBasicTest() {
+        final int executionProfileNum = 5;
+        Profile profile = constructRandomProfile(executionProfileNum);
+
+        // after profile is stored to disk, futher read will be from disk
+        // so we store the original answer to a string
+        String profileContentString = profile.getProfileByLevel();
+        String currentBinaryWorkingDir = System.getProperty("user.dir");
+        String profileStoragePath = currentBinaryWorkingDir + File.separator + 
"doris-feut-profile";
+        File profileDir = new File(profileStoragePath);
+        if (!profileDir.exists()) {
+            // create query_id directory
+            if (!profileDir.mkdir()) {
+                LOG.warn("create profile directory {} failed", 
profileDir.getAbsolutePath());
+                Assert.fail();
+                return;
+            }
+        }
+
+        try {
+            profile.writeToStorage(profileStoragePath);
+
+            String profileStoragePathTmp = profile.getProfileStoragePath();
+            Assert.assertFalse(Strings.isNullOrEmpty(profileStoragePathTmp));
+
+            LOG.info("Profile storage path: {}", profileStoragePathTmp);
+
+            Profile deserializedProfile = Profile.read(profileStoragePathTmp);
+            Assert.assertNotNull(deserializedProfile);
+            Assert.assertEquals(profileContentString, 
profile.getProfileByLevel());
+            Assert.assertEquals(profile.getProfileByLevel(), 
deserializedProfile.getProfileByLevel());
+
+            // make sure file is removed
+            profile.deleteFromStorage();
+            File tmpFile = new File(profileStoragePathTmp);
+            Assert.assertFalse(tmpFile.exists());
+            FileUtils.deleteQuietly(profileDir);
+        } finally {
+            try {
+                FileUtils.deleteDirectory(profileDir);
+            } catch (Exception e) {
+                LOG.warn("delete profile directory {} failed", 
profileDir.getAbsolutePath());
+                Assert.fail();
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to