This is an automated email from the ASF dual-hosted git repository. wangbo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 23cddd0e24a [refactor][profile] log enhancement and minor change of profile (#46564) 23cddd0e24a is described below commit 23cddd0e24ada257cb58b52907146fef728dc723 Author: zhiqiang <hezhiqi...@selectdb.com> AuthorDate: Mon Jan 13 09:48:26 2025 +0800 [refactor][profile] log enhancement and minor change of profile (#46564) --- be/src/runtime/query_context.cpp | 5 +- be/src/runtime/runtime_query_statistics_mgr.cpp | 6 +- .../apache/doris/common/proc/BackendsProcDir.java | 2 +- .../common/proc/CurrentQueryInfoProvider.java | 4 +- .../doris/common/{util => profile}/AggCounter.java | 2 +- .../doris/common/{util => profile}/Counter.java | 2 +- .../doris/common/profile/ExecutionProfile.java | 12 +- .../org/apache/doris/common/profile/Profile.java | 64 +++-- .../doris/common/profile/ProfileManager.java | 119 ++++++--- .../common/{util => profile}/RuntimeProfile.java | 4 +- .../doris/common/profile/SummaryProfile.java | 5 +- .../apache/doris/load/loadv2/BrokerLoadJob.java | 2 +- .../nereids/trees/plans/commands/LoadCommand.java | 3 +- .../java/org/apache/doris/qe/QeProcessorImpl.java | 34 ++- .../org/apache/doris/qe/QueryStatisticsItem.java | 2 +- .../java/org/apache/doris/qe/StmtExecutor.java | 4 +- .../doris/common/profile/ProfileManagerTest.java | 277 +++++++++++++++++++++ .../{util => profile}/ProfilePersistentTest.java | 8 +- .../{util => profile}/RuntimeProfileTest.java | 2 +- .../pipeline/cloud_p0/conf/be_custom.conf | 2 + .../pipeline/cloud_p0/conf/fe_custom.conf | 5 +- regression-test/pipeline/p0/conf/be.conf | 2 +- regression-test/pipeline/p0/conf/fe.conf | 5 + 23 files changed, 468 insertions(+), 103 deletions(-) diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index c777c8100ef..1ff08f7d9bb 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -351,8 +351,9 @@ void QueryContext::add_fragment_profile( #endif std::lock_guard<std::mutex> l(_profile_mutex); - LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline profile count {} ", - print_id(this->_query_id), fragment_id, pipeline_profiles.size()); + VLOG_ROW << fmt::format( + "Query add fragment profile, query {}, fragment {}, pipeline profile count {} ", + print_id(this->_query_id), fragment_id, pipeline_profiles.size()); _profile_map.insert(std::make_pair(fragment_id, pipeline_profiles)); diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp b/be/src/runtime/runtime_query_statistics_mgr.cpp index 93d5256cad7..ebcaf30eab1 100644 --- a/be/src/runtime/runtime_query_statistics_mgr.cpp +++ b/be/src/runtime/runtime_query_statistics_mgr.cpp @@ -65,6 +65,8 @@ static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, return Status::RpcError("Client rpc client failed"); } + VLOG_DEBUG << "Sending profile"; + try { try { rpc_client->reportExecStatus(res, req); @@ -272,13 +274,13 @@ void RuntimeQueryStatisticsMgr::register_fragment_profile( void RuntimeQueryStatisticsMgr::_report_query_profiles_function() { decltype(_profile_map) profile_copy; decltype(_load_channel_profile_map) load_channel_profile_copy; - + VLOG_DEBUG << "Beging reporting profile"; { std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock); _profile_map.swap(profile_copy); _load_channel_profile_map.swap(load_channel_profile_copy); } - + VLOG_DEBUG << "After swap profile map"; // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} for (auto& entry : profile_copy) { const auto& query_id = entry.first; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java index c5273304137..e3db2a3a716 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BackendsProcDir.java @@ -20,9 +20,9 @@ package org.apache.doris.common.proc; import org.apache.doris.catalog.Env; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Pair; +import org.apache.doris.common.profile.RuntimeProfile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListComparator; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java index 2b6d8f6702e..de7247ab3ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CurrentQueryInfoProvider.java @@ -18,9 +18,9 @@ package org.apache.doris.common.proc; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.util.Counter; +import org.apache.doris.common.profile.Counter; +import org.apache.doris.common.profile.RuntimeProfile; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.qe.QueryStatisticsItem; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java rename to fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java index 3f080b3752f..ab3b43bd3e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/AggCounter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggCounter.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; +package org.apache.doris.common.profile; import java.util.LinkedList; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java rename to fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java index f6c06890047..f306d7c73fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; +package org.apache.doris.common.profile; import org.apache.doris.common.io.Text; import org.apache.doris.persist.gson.GsonUtils; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java index d2300cd667d..fffcca49bf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -20,7 +20,6 @@ package org.apache.doris.common.profile; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.thrift.TDetailedReportParams; import org.apache.doris.thrift.TNetworkAddress; @@ -235,7 +234,9 @@ public class ExecutionProfile { String suffix = " (host=" + backendHBAddress + ")"; for (TDetailedReportParams pipelineProfile : fragmentProfile) { String name = ""; - if (pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level) { + boolean isFragmentLevel = (pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level); + if (isFragmentLevel) { + // Fragment Level profile is also represented by TDetailedReportParams. name = "Fragment Level Profile: " + suffix; } else { name = "Pipeline :" + pipelineIdx + " " + suffix; @@ -243,9 +244,9 @@ public class ExecutionProfile { } RuntimeProfile profileNode = new RuntimeProfile(name); - // The taskprofile is used to save the profile of the pipeline, without + // The taskProfile is used to save the profile of the pipeline, without // considering the FragmentLevel. - if (!(pipelineProfile.isSetIsFragmentLevel() && pipelineProfile.is_fragment_level)) { + if (!isFragmentLevel) { taskProfile.add(profileNode); } if (!pipelineProfile.isSetProfile()) { @@ -260,6 +261,9 @@ public class ExecutionProfile { setMultiBeProfile(fragmentId, backendHBAddress, taskProfile); } + LOG.info("Profile update finished query: {} fragments: {} isDone: {}", + DebugUtil.printId(getQueryId()), profile.getFragmentIdToProfile().size(), isDone); + if (profile.isSetLoadChannelProfiles()) { for (TRuntimeProfileTree loadChannelProfile : profile.getLoadChannelProfiles()) { this.loadChannelProfile.update(loadChannelProfile); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index a5173247403..80a79ebffbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -20,7 +20,6 @@ package org.apache.doris.common.profile; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Plan; @@ -81,10 +80,6 @@ public class Profile { // profile file name format: time_id private static final String SEPERATOR = "_"; - // id will be assigned to id of SummaryProfile. - // For broker load, its SummaryProfile id is a string representation of a long integer, - // for others, it is queryID - private String id = ""; // summaryProfile will be serialized to storage as JSON, and we can recover it from storage // recover of SummaryProfile is important, because it contains the meta information of the profile // we need it to construct memory index for profile retrieving. @@ -103,7 +98,7 @@ public class Profile { // when coordinator finishes, it will mark finish time. // we will wait for about 5 seconds to see if all profiles have been reported. // if not, we will store the profile to storage, and release the memory, - // futher report will be ignored. + // further report will be ignored. // why MAX_VALUE? So that we can use PriorityQueue to sort profile by finish time decreasing order. private long queryFinishTimestamp = Long.MAX_VALUE; private Map<Integer, String> planNodeMap = Maps.newHashMap(); @@ -193,7 +188,6 @@ public class Profile { DataInput dataInput = new DataInputStream(profileFileInputStream); Profile res = new Profile(); res.summaryProfile = SummaryProfile.read(dataInput); - res.setId(res.summaryProfile.getProfileId()); res.profileStoragePath = path; res.isQueryFinished = true; res.profileSize = fileSize; @@ -262,7 +256,7 @@ public class Profile { // For load task, the profile contains many execution profiles public void addExecutionProfile(ExecutionProfile executionProfile) { if (executionProfile == null) { - LOG.warn("try to set a null excecution profile, it is abnormal", new Exception()); + LOG.warn("try to set a null execution profile, it is abnormal", new Exception()); return; } executionProfile.setSummaryProfile(summaryProfile); @@ -298,18 +292,17 @@ public class Profile { } summaryProfile.update(summaryInfo); - this.setId(summaryProfile.getProfileId()); if (isFinished) { this.markQueryFinished(System.currentTimeMillis()); } - // Nerids native insert not set planner, so it is null + // Nereids native insert not set planner, so it is null if (planner != null) { this.planNodeMap = planner.getExplainStringMap(); } ProfileManager.getInstance().pushProfile(this); } catch (Throwable t) { - LOG.warn("update profile {} failed", id, t); + LOG.warn("update profile {} failed", getId(), t); throw t; } } @@ -352,7 +345,7 @@ public class Profile { } private RuntimeProfile composeRootProfile() { - RuntimeProfile rootProfile = new RuntimeProfile(id); + RuntimeProfile rootProfile = new RuntimeProfile(getId()); rootProfile.addChild(summaryProfile.getSummary()); rootProfile.addChild(summaryProfile.getExecutionSummary()); for (ExecutionProfile executionProfile : executionProfiles) { @@ -378,8 +371,7 @@ public class Profile { return; } - // Only generate merged profile for select, insert into select. - // Not support broker load now. + // For broker load, if it has more than one execution profile, we will not generate merged profile. RuntimeProfile mergedProfile = null; if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) { try { @@ -389,7 +381,7 @@ public class Profile { updateActualRowCountOnPhysicalPlan(physicalPlan); } } catch (Throwable aggProfileException) { - LOG.warn("build merged simple profile {} failed", this.id, aggProfileException); + LOG.warn("build merged simple profile {} failed", getId(), aggProfileException); } } @@ -433,10 +425,6 @@ public class Profile { return this.queryFinishTimestamp; } - public void setId(String id) { - this.id = id; - } - // For UT public void setSummaryProfile(SummaryProfile summaryProfile) { this.summaryProfile = summaryProfile; @@ -483,7 +471,7 @@ public class Profile { > (this.executionProfiles.size() * autoProfileDurationMs)) { if (LOG.isDebugEnabled()) { LOG.debug("Query/LoadJob {} costs {} ms, begin {} finish {}, need store its profile", - id, durationMs, summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp); + getId(), durationMs, summaryProfile.getQueryBeginTime(), this.queryFinishTimestamp); } return true; } @@ -491,7 +479,7 @@ public class Profile { } if (this.queryFinishTimestamp == Long.MAX_VALUE) { - LOG.warn("Logical error, query {} has finished, but queryFinishTimestamp is not set,", id); + LOG.warn("Logical error, query {} has finished, but queryFinishTimestamp is not set,", getId()); return false; } @@ -501,7 +489,8 @@ public class Profile { > Config.profile_waiting_time_for_spill_seconds * 1000) { LOG.warn("Profile {} should be stored to storage without waiting for incoming profile," + " since it has been waiting for {} ms, current time {} query finished time: {}", - id, currentTimeMillis - this.queryFinishTimestamp, currentTimeMillis, this.queryFinishTimestamp); + getId(), currentTimeMillis - this.queryFinishTimestamp, currentTimeMillis, + this.queryFinishTimestamp); this.summaryProfile.setSystemMessage( "This profile is not complete, since its collection does not finish in time." @@ -526,7 +515,7 @@ public class Profile { public void markQueryFinished(long queryFinishTime) { try { if (this.profileHasBeenStored()) { - LOG.error("Logical error, profile {} has already been stored to storage", this.id); + LOG.error("Logical error, profile {} has already been stored to storage", getId()); return; } @@ -539,13 +528,13 @@ public class Profile { } public void writeToStorage(String systemProfileStorageDir) { - if (Strings.isNullOrEmpty(id)) { + if (Strings.isNullOrEmpty(getId())) { LOG.warn("store profile failed, name is empty"); return; } if (!Strings.isNullOrEmpty(profileStoragePath)) { - LOG.error("Logical error, profile {} has already been stored to storage", id); + LOG.error("Logical error, profile {} has already been stored to storage", getId()); return; } @@ -586,7 +575,7 @@ public class Profile { dataOutputStream.flush(); this.profileSize = profileFile.length(); } catch (Exception e) { - LOG.error("write {} summary profile failed", id, e); + LOG.error("write {} summary profile failed", getId(), e); return; } finally { try { @@ -690,7 +679,7 @@ public class Profile { return; } - LOG.info("Profile {} has been stored to storage, reading it from storage", id); + LOG.info("Profile {} has been stored to storage, reading it from storage", getId()); FileInputStream fileInputStream = null; @@ -728,4 +717,25 @@ public class Profile { return; } + + public String debugInfo() { + StringBuilder builder = new StringBuilder(); + builder.append("ProfileId:").append(getId()).append("|"); + builder.append("StoragePath:").append(profileStoragePath).append("|"); + builder.append("StartTimeStamp:").append(summaryProfile.getQueryBeginTime()).append("|"); + builder.append("IsFinished:").append(isQueryFinished).append("|"); + builder.append("FinishTimestamp:").append(queryFinishTimestamp).append("|"); + builder.append("AutoProfileDuration: ").append(autoProfileDurationMs).append("|"); + builder.append("ExecutionProfileCnt: ").append(executionProfiles.size()).append("|"); + builder.append("ProfileOnStorageSize:").append(profileSize); + return builder.toString(); + } + + public void setQueryFinishTimestamp(long l) { + this.queryFinishTimestamp = l; + } + + public String getId() { + return summaryProfile.getProfileId(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java index ef9f2f7bbaf..f174b6b7dcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ProfileManager.java @@ -65,7 +65,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; * All attributes can be seen from the above. * * why the element in the finished profile array is not RuntimeProfile, - * the purpose is let coordinator can destruct earlier(the fragment profile is in Coordinator) + * the purpose is let coordinator can destruct earlier (the fragment profile is in Coordinator) * */ public class ProfileManager extends MasterDaemon { @@ -83,7 +83,7 @@ public class ProfileManager extends MasterDaemon { this.profile = profile; } - private final Profile profile; + final Profile profile; public Map<String, String> infoStrings = Maps.newHashMap(); public String errMsg = ""; @@ -655,11 +655,6 @@ public class ProfileManager extends MasterDaemon { } } - if (LOG.isDebugEnabled()) { - LOG.debug("{} profiles size on storage: {}", profileDeque.size(), - DebugUtil.printByteWithUnit(totalProfileSize)); - } - final int maxSpilledProfileNum = Config.max_spilled_profile_num; final long spilledProfileLimitBytes = Config.spilled_profile_storage_limit_bytes; List<ProfileElement> queryIdToBeRemoved = Lists.newArrayList(); @@ -803,29 +798,57 @@ public class ProfileManager extends MasterDaemon { } } + // The init value of query finish time of profile is MAX_VALUE, + // So a more recent query will be on the top of the heap. + PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTimeDesc() { + readLock.lock(); + try { + PriorityQueue<ProfileElement> queryIdDeque = new PriorityQueue<>(Comparator.comparingLong( + (ProfileElement profileElement) -> profileElement.profile.getQueryFinishTimestamp()).reversed()); + + queryIdToProfileMap.forEach((queryId, profileElement) -> { + queryIdDeque.add(profileElement); + }); + + return queryIdDeque; + } finally { + readLock.unlock(); + } + } + // The init value of query finish time of profile is MAX_VALUE - // So more recent query will be on the top of heap. - private PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTimeDesc() { - PriorityQueue<ProfileElement> queryIdDeque = new PriorityQueue<>(Comparator.comparingLong( - (ProfileElement profileElement) -> profileElement.profile.getQueryFinishTimestamp()).reversed()); + // So query finished earlier will be on the top of heap + PriorityQueue<ProfileElement> getProfileOrderByQueryFinishTime() { + readLock.lock(); + try { + PriorityQueue<ProfileElement> queryIdDeque = new PriorityQueue<>(Comparator.comparingLong( + (ProfileElement profileElement) -> profileElement.profile.getQueryFinishTimestamp())); - queryIdToProfileMap.forEach((queryId, profileElement) -> { - queryIdDeque.add(profileElement); - }); + queryIdToProfileMap.forEach((queryId, profileElement) -> { + queryIdDeque.add(profileElement); + }); - return queryIdDeque; + return queryIdDeque; + } finally { + readLock.unlock(); + } } // Older query will be on the top of heap - private PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() { - PriorityQueue<ProfileElement> queryIdDeque = new PriorityQueue<>(Comparator.comparingLong( - (ProfileElement profileElement) -> profileElement.profile.getSummaryProfile().getQueryBeginTime())); + PriorityQueue<ProfileElement> getProfileOrderByQueryStartTime() { + readLock.lock(); + try { + PriorityQueue<ProfileElement> queryIdDeque = new PriorityQueue<>(Comparator.comparingLong( + (ProfileElement profileElement) -> profileElement.profile.getSummaryProfile().getQueryBeginTime())); - queryIdToProfileMap.forEach((queryId, profileElement) -> { - queryIdDeque.add(profileElement); - }); + queryIdToProfileMap.forEach((queryId, profileElement) -> { + queryIdDeque.add(profileElement); + }); - return queryIdDeque; + return queryIdDeque; + } finally { + readLock.unlock(); + } } // When the query is finished, the execution profile should be marked as finished @@ -885,7 +908,7 @@ public class ProfileManager extends MasterDaemon { writeLock.unlock(); if (stringBuilder.length() != 0) { LOG.warn("Remove expired execution profiles {}, current execution profile map size {}," - + "Config.max_query_profile_num{}, Config.profile_async_collect_expire_time_secs {}", + + "Config.max_query_profile_num {}, Config.profile_async_collect_expire_time_secs {}", stringBuilder.toString(), executionProfileNum, Config.max_query_profile_num, Config.profile_async_collect_expire_time_secs); } @@ -894,7 +917,7 @@ public class ProfileManager extends MasterDaemon { private void deleteOutdatedProfilesFromMemory() { StringBuilder stringBuilder = new StringBuilder(); - int profileNum = 0; + StringBuilder stringBuilderTTL = new StringBuilder(); writeLock.lock(); try { @@ -903,7 +926,13 @@ public class ProfileManager extends MasterDaemon { for (ProfileElement profileElement : this.queryIdToProfileMap.values()) { if (profileElement.profile.shouldBeRemoveFromMemory()) { - profilesToRemove.add(profileElement.profile.getSummaryProfile().getProfileId()); + String profileId = profileElement.profile.getSummaryProfile().getProfileId(); + profilesToRemove.add(profileId); + stringBuilder.append(profileId).append(","); + if (LOG.isDebugEnabled()) { + LOG.debug("Profile {} should be filtered from memory, information {}", profileId, + profileElement.profile.debugInfo()); + } } } @@ -913,34 +942,54 @@ public class ProfileManager extends MasterDaemon { for (ExecutionProfile executionProfile : profileElement.profile.getExecutionProfiles()) { queryIdToExecutionProfiles.remove(executionProfile.getQueryId()); } - stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(","); } if (this.queryIdToProfileMap.size() <= Config.max_query_profile_num) { return; } - PriorityQueue<ProfileElement> queueIdDeque = getProfileOrderByQueryStartTime(); + // profile is ordered by query finish time + // query finished earlier will be on the top of heap + // query finished time of unfinished query is INT_MAX, so they will be on the bottom of the heap. + PriorityQueue<ProfileElement> queueIdDeque = getProfileOrderByQueryFinishTime(); - while (queueIdDeque.size() > Config.max_query_profile_num) { + while (queueIdDeque.size() > Config.max_query_profile_num && !queueIdDeque.isEmpty()) { ProfileElement profileElement = queueIdDeque.poll(); - - queryIdToProfileMap.remove(profileElement.profile.getSummaryProfile().getProfileId()); + String profileId = profileElement.profile.getSummaryProfile().getProfileId(); + stringBuilderTTL.append(profileId).append(","); + queryIdToProfileMap.remove(profileId); for (ExecutionProfile executionProfile : profileElement.profile.getExecutionProfiles()) { queryIdToExecutionProfiles.remove(executionProfile.getQueryId()); } - stringBuilder.append(profileElement.profile.getSummaryProfile().getProfileId()).append(","); + if (LOG.isDebugEnabled()) { + LOG.debug("Remove profile {} since ttl from memory, info {}", profileId, + profileElement.profile.debugInfo()); + } } } finally { - profileNum = queryIdToProfileMap.size(); + int profileNum = queryIdToProfileMap.size(); writeLock.unlock(); - if (stringBuilder.length() != 0) { - LOG.info("Remove outdated profiles {} from memoy, current profile map size {}", - stringBuilder.toString(), profileNum); + if (stringBuilder.length() != 0 || stringBuilderTTL.length() != 0) { + LOG.info("Filtered profiles {}, outdated profiles {}, they are removed from memory," + + " current profile map size {}", + stringBuilder.toString(), stringBuilderTTL.toString(), profileNum); + } + } + } + + String getDebugInfo() { + StringBuilder stringBuilder = new StringBuilder(); + readLock.lock(); + try { + for (ProfileElement profileElement : queryIdToProfileMap.values()) { + stringBuilder.append(profileElement.profile.debugInfo()).append("\n"); } + } finally { + readLock.unlock(); } + return stringBuilder.toString(); } public List<List<String>> getProfileMetaWithType(ProfileType profileType, long limit) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java rename to fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java index 19082959034..78ff0f58ed2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; +package org.apache.doris.common.profile; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.io.Text; -import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TCounter; import org.apache.doris.thrift.TRuntimeProfileNode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index 5b0d5ba3533..7215b8a9c65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -19,7 +19,6 @@ package org.apache.doris.common.profile; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TNetworkAddress; @@ -543,8 +542,8 @@ public class SummaryProfile { this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs(); } - public void setQueryBeginTime() { - this.queryBeginTime = TimeUtils.getStartTimeMs(); + public void setQueryBeginTime(long queryBeginTime) { + this.queryBeginTime = queryBeginTime; } public void setQueryAnalysisFinishTime() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index b2fd6746f2f..efb2fcc06d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -247,7 +247,7 @@ public class BrokerLoadJob extends BulkLoadJob { true, Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.PROFILE_LEVEL, "3")), Integer.valueOf(sessionVariables.getOrDefault(SessionVariable.AUTO_PROFILE_THRESHOLD_MS, "-1"))); - this.jobProfile.getSummaryProfile().setQueryBeginTime(); + this.jobProfile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs()); // TODO: 怎么给这些 load job 设置 profile 记录时间 // this.jobProfile.setId("BrokerLoadJob " + id + ". " + label); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index aed3cb4c1f2..69cbf762c2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -29,6 +29,7 @@ import org.apache.doris.common.NereidsException; import org.apache.doris.common.profile.Profile; import org.apache.doris.common.util.FileFormatConstants; import org.apache.doris.common.util.FileFormatUtils; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.job.base.JobExecuteType; import org.apache.doris.job.base.JobExecutionConfiguration; @@ -133,7 +134,7 @@ public class LoadCommand extends Command implements ForwardWithSync { ctx.getSessionVariable().enableProfile, ctx.getSessionVariable().profileLevel, ctx.getSessionVariable().getAutoProfileThresholdMs()); - profile.getSummaryProfile().setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs()); if (sourceInfos.size() == 1) { plans = ImmutableList.of(new InsertIntoTableCommand(completeQueryPlan(ctx, sourceInfos.get(0)), Optional.of(labelName), Optional.empty(), Optional.empty())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index d9b4583d71f..378d37d082f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -75,18 +75,25 @@ public final class QeProcessorImpl implements QeProcessor { private Status processQueryProfile(TQueryProfile profile, TNetworkAddress address, boolean isDone) { ExecutionProfile executionProfile = ProfileManager.getInstance().getExecutionProfile(profile.query_id); if (executionProfile == null) { - LOG.warn("Could not find execution profile with query id {}", DebugUtil.printId(profile.query_id)); + LOG.warn("Could not find execution profile, query {} be {}", + DebugUtil.printId(profile.query_id), address.toString()); return new Status(TStatusCode.NOT_FOUND, "Could not find execution profile with query id " + DebugUtil.printId(profile.query_id)); } - // Update profile may cost a lot of time, use a seperate pool to deal with it. - writeProfileExecutor.submit(new Runnable() { - @Override - public void run() { - executionProfile.updateProfile(profile, address, isDone); - } - }); + // Update profile may cost a lot of time, use a separate pool to deal with it. + try { + writeProfileExecutor.submit(new Runnable() { + @Override + public void run() { + executionProfile.updateProfile(profile, address, isDone); + } + }); + } catch (Exception e) { + LOG.warn("Failed to submit profile write task, query {} be {}", + DebugUtil.printId(profile.query_id), address.toString()); + return new Status(TStatusCode.INTERNAL_ERROR, "Failed to submit profile write task"); + } return Status.OK; } @@ -231,9 +238,16 @@ public final class QeProcessorImpl implements QeProcessor { // with profile in a single rpc, this will make FE ignore the exec status and may lead to bug in query // like insert into select. if (params.isSetBackendId() && params.isSetDone()) { + LOG.info("Receive profile {} report from {}, isDone {}, fragments {}", + DebugUtil.printId(params.getQueryProfile().getQueryId()), beAddr.toString(), + params.isDone(), params.getQueryProfile().fragment_id_to_profile.size()); + Backend backend = Env.getCurrentSystemInfo().getBackend(params.getBackendId()); - boolean isDone = params.isDone(); - if (backend != null) { + if (backend == null) { + LOG.warn("Invalid report profile req, backend {} not found, query id: {}", + params.getBackendId(), DebugUtil.printId(params.getQueryProfile().getQueryId())); + } else { + boolean isDone = params.isDone(); // the process status is ignored by design. // actually be does not care the process status of profile on fe. processQueryProfile(params.getQueryProfile(), backend.getHeartbeatAddress(), isDone); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java index 76b528464d6..c51ff24ca14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryStatisticsItem.java @@ -17,7 +17,7 @@ package org.apache.doris.qe; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.profile.RuntimeProfile; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index f693f5b82aa..8d712dda76a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -691,7 +691,7 @@ public class StmtExecutor { } context.setQueryId(queryId); context.setStartTime(); - profile.getSummaryProfile().setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs()); List<List<String>> changedSessionVar = VariableMgr.dumpChangedVars(context.getSessionVariable()); profile.setChangedSessionVar(DebugUtil.prettyPrintChangedSessionVar(changedSessionVar)); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); @@ -995,7 +995,7 @@ public class StmtExecutor { public void executeByLegacy(TUniqueId queryId) throws Exception { context.setStartTime(); - profile.getSummaryProfile().setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(TimeUtils.getStartTimeMs()); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); context.setQueryId(queryId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java new file mode 100644 index 00000000000..d0d17505861 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfileManagerTest.java @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +class ProfileManagerTest { + // We need a logger. + private static final Logger LOG = LogManager.getLogger(ProfilePersistentTest.class); + + private static ProfileManager profileManager; + + @BeforeAll + static void setUp() { + profileManager = new ProfileManager(); + } + + @BeforeEach + void cleanProfile() { + profileManager.cleanProfile(); + } + + @Test + void returnsEmptyQueueWhenNoProfiles() { + PriorityQueue<ProfileManager.ProfileElement> result = profileManager.getProfileOrderByQueryFinishTimeDesc(); + Assertions.assertTrue(result.isEmpty()); + result = profileManager.getProfileOrderByQueryFinishTime(); + Assertions.assertTrue(result.isEmpty()); + result = profileManager.getProfileOrderByQueryStartTime(); + Assertions.assertTrue(result.isEmpty()); + } + + static Profile constructProfile(String id) { + Profile profile = new Profile(); + SummaryProfile summaryProfile = new SummaryProfile(); + summaryProfile.getSummary().getInfoStrings().put(SummaryProfile.PROFILE_ID, id); + profile.setSummaryProfile(summaryProfile); + return profile; + } + + @Test + void getProfileByOrder() { + final int normalProfiles = 100; + for (int i = 0; i < normalProfiles; i++) { + Profile profile = constructProfile(String.valueOf(i)); + Random random = new Random(); + profile.setQueryFinishTimestamp(random.nextInt(200 - 101) + 101); + // set query start time in range of [0, 1000) + profile.getSummaryProfile().setQueryBeginTime(random.nextInt(100)); + profileManager.pushProfile(profile); + + if (i == 10) { + LOG.info("Profile manager debug info: {}", profileManager.getDebugInfo()); + } + } + + // Insert two profiles with default value. + Profile profile1 = constructProfile("Default 1"); + profileManager.pushProfile(profile1); + Profile profile2 = constructProfile("Default 2"); + profileManager.pushProfile(profile2); + + profile1 = constructProfile("Default 3"); + profile1.setQueryFinishTimestamp(1000L); + profileManager.pushProfile(profile1); + profile1 = constructProfile("Default 4"); + profile1.setQueryFinishTimestamp(1000L); + profileManager.pushProfile(profile1); + + profile1 = constructProfile("Default 5"); + profile1.getSummaryProfile().setQueryBeginTime(1000L); + profileManager.pushProfile(profile1); + profile1 = constructProfile("Default 6"); + profile1.getSummaryProfile().setQueryBeginTime(1000L); + profileManager.pushProfile(profile1); + + + Set<String> profileThatHasQueryFinishTime1000 = new HashSet<>(); + profileThatHasQueryFinishTime1000.add("Default 3"); + profileThatHasQueryFinishTime1000.add("Default 4"); + Set<String> profileThatHasQueryStartTime1000 = new HashSet<>(); + profileThatHasQueryStartTime1000.add("Default 5"); + profileThatHasQueryStartTime1000.add("Default 6"); + Set<String> profileThatHasDefaultQueryFinishTime = new HashSet<>(); + profileThatHasDefaultQueryFinishTime.add("Default 1"); + profileThatHasDefaultQueryFinishTime.add("Default 2"); + profileThatHasDefaultQueryFinishTime.add("Default 5"); + profileThatHasDefaultQueryFinishTime.add("Default 6"); + Set<String> profileThatHasDefaultQueryStartTime = new HashSet<>(); + profileThatHasDefaultQueryStartTime.add("Default 1"); + profileThatHasDefaultQueryStartTime.add("Default 2"); + profileThatHasDefaultQueryStartTime.add("Default 3"); + profileThatHasDefaultQueryStartTime.add("Default 4"); + + + // Profile should be ordered by query finish time in descending order. + // Meas that the profile with the latest query finish time should be at the top of the queue. + PriorityQueue<ProfileManager.ProfileElement> orderedResults = profileManager.getProfileOrderByQueryFinishTimeDesc(); + assert orderedResults != null; + assert !orderedResults.isEmpty(); + Assertions.assertEquals(106, orderedResults.size(), profileManager.getDebugInfo()); + + for (int i = 0; i < profileThatHasDefaultQueryFinishTime.size(); i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(profileThatHasDefaultQueryFinishTime.contains(result.profile.getId())); + } + for (int i = 0; i < profileThatHasQueryFinishTime1000.size(); i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(profileThatHasQueryFinishTime1000.contains(result.profile.getId())); + } + + long prevQueryFinishTime = 1000L; + for (int i = 0; i < normalProfiles; i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(result.profile.getQueryFinishTimestamp() <= prevQueryFinishTime); + prevQueryFinishTime = result.profile.getQueryFinishTimestamp(); + } + + orderedResults = profileManager.getProfileOrderByQueryFinishTime(); + Assertions.assertEquals(orderedResults.size(), 106); + // Profile should be ordered by query finish time in ascending order. + prevQueryFinishTime = Long.MIN_VALUE; + for (int i = 0; i < normalProfiles; i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(result.profile.getQueryFinishTimestamp() >= prevQueryFinishTime); + prevQueryFinishTime = result.profile.getQueryFinishTimestamp(); + } + for (int i = 0; i < profileThatHasQueryFinishTime1000.size(); i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(profileThatHasQueryFinishTime1000.contains(result.profile.getId())); + } + for (int i = 0; i < profileThatHasDefaultQueryFinishTime.size(); i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(profileThatHasDefaultQueryFinishTime.contains(result.profile.getId())); + } + + orderedResults = profileManager.getProfileOrderByQueryStartTime(); + Assertions.assertEquals(orderedResults.size(), 106); + // Profile should be ordered by query start time in ascending order. + long prevQueryStartTime = -1; + for (int i = 0; i < profileThatHasDefaultQueryStartTime.size(); i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(profileThatHasDefaultQueryStartTime.contains(result.profile.getId()), + result.profile.getId() + " " + result.profile.getSummaryProfile().getQueryBeginTime()); + } + + for (int i = 0; i < normalProfiles; i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(result.profile.getSummaryProfile().getQueryBeginTime() >= prevQueryStartTime); + prevQueryStartTime = result.profile.getSummaryProfile().getQueryBeginTime(); + } + + for (int i = 0; i < profileThatHasQueryStartTime1000.size(); i++) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertNotEquals(result, null); + Assertions.assertTrue(profileThatHasQueryStartTime1000.contains(result.profile.getId()), + result.profile.getId() + " " + result.profile.getSummaryProfile().getQueryBeginTime()); + } + } + + @Test + void getProfileByOrderParallel() throws InterruptedException { + // Test the parallel case. + // Create a thread pool with 3 threads. + final int threadNum = 3; + List<Thread> threads = new ArrayList<>(); + AtomicBoolean stopFlag = new AtomicBoolean(false); + + // These threads keep adding profiles to the profile manager. + // The profile they create has random name, random query finish time and random query start time. + for (int i = 0; i < threadNum; i++) { + threads.add(new Thread(() -> { + Random random = new Random(); + for (int j = 0; j < 100; j++) { + Profile profile = constructProfile(String.valueOf(random.nextInt(1000))); + profile.getSummaryProfile().setQueryBeginTime(random.nextInt(1000)); + profile.setQueryFinishTimestamp(random.nextInt(2000) + 1000); + profileManager.pushProfile(profile); + } + })); + } + // Create another thread to get the profile by different order. + for (int i = 0; i < threadNum; i++) { + threads.add(new Thread(() -> { + while (!stopFlag.get()) { + PriorityQueue<ProfileManager.ProfileElement> orderedResults = profileManager.getProfileOrderByQueryFinishTimeDesc(); + long prevQueryFinishTime = Long.MAX_VALUE; + while (!orderedResults.isEmpty()) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertTrue(result.profile.getQueryFinishTimestamp() <= prevQueryFinishTime); + prevQueryFinishTime = result.profile.getQueryFinishTimestamp(); + } + } + })); + } + + for (int i = 0; i < threadNum; i++) { + threads.add(new Thread(() -> { + while (!stopFlag.get()) { + PriorityQueue<ProfileManager.ProfileElement> orderedResults = profileManager.getProfileOrderByQueryStartTime(); + long prevQueryStartTime = -1; + while (!orderedResults.isEmpty()) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertTrue(result.profile.getSummaryProfile().getQueryBeginTime() >= prevQueryStartTime); + prevQueryStartTime = result.profile.getSummaryProfile().getQueryBeginTime(); + } + } + })); + } + + for (int i = 0; i < threadNum; i++) { + threads.add(new Thread(() -> { + while (!stopFlag.get()) { + PriorityQueue<ProfileManager.ProfileElement> orderedResults = profileManager.getProfileOrderByQueryFinishTime(); + long prevQueryFinishTime = Long.MIN_VALUE; + while (!orderedResults.isEmpty()) { + ProfileManager.ProfileElement result = orderedResults.poll(); + Assertions.assertTrue(result.profile.getQueryFinishTimestamp() >= prevQueryFinishTime); + prevQueryFinishTime = result.profile.getQueryFinishTimestamp(); + } + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + + Thread.sleep(5000); + + stopFlag.set(true); + + for (Thread thread : threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java similarity index 98% rename from fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java rename to fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java index c21c57ba1d4..cb1804e2d50 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/ProfilePersistentTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/ProfilePersistentTest.java @@ -15,12 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; +package org.apache.doris.common.profile; -import org.apache.doris.common.profile.ExecutionProfile; -import org.apache.doris.common.profile.Profile; -import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.QueryState; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUnit; @@ -77,7 +76,6 @@ public class ProfilePersistentTest { SummaryProfile summaryProfile = constructRandomSummaryProfile(); String stringUniqueId = summaryProfile.getProfileId(); TUniqueId thriftUniqueId = DebugUtil.parseTUniqueIdFromString(stringUniqueId); - profile.setId(stringUniqueId); profile.setSummaryProfile(summaryProfile); for (int i = 0; i < executionProfileNum; i++) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java similarity index 99% rename from fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java rename to fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java index 56ed66c0504..93cb5e35794 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/profile/RuntimeProfileTest.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common.util; +package org.apache.doris.common.profile; import org.apache.doris.thrift.TCounter; import org.apache.doris.thrift.TRuntimeProfileNode; diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf b/regression-test/pipeline/cloud_p0/conf/be_custom.conf index d201cad3fac..c35859cd6b4 100644 --- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf @@ -39,3 +39,5 @@ pipeline_task_leakage_detect_period_sec=1 crash_in_memory_tracker_inaccurate = true enable_table_size_correctness_check=true enable_brpc_connection_check=true + +sys_log_verbose_modules=query_context,runtime_query_statistics_mgr \ No newline at end of file diff --git a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf index 955409d9bb2..40d88d0f207 100644 --- a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf +++ b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf @@ -24,7 +24,10 @@ enable_debug_points = true disable_datev1=false disable_decimalv2=false -max_query_profile_num=1000 +sys_log_verbose_modules = org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl +# profile related +max_query_profile_num = 2000 +max_spilled_profile_num = 2000 statistics_sql_mem_limit_in_bytes=21474836480 cpu_resource_limit_per_analyze_task=-1 diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 0b73375b3fb..be798738e68 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -35,7 +35,7 @@ JEMALLOC_PROF_PRFIX="jemalloc_heap_profile_" # INFO, WARNING, ERROR, FATAL sys_log_level = INFO - +sys_log_verbose_modules=query_context,runtime_query_statistics_mgr be_port = 9161 webserver_port = 8141 heartbeat_service_port = 9151 diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 44688ff4adc..2f493ff1098 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -34,6 +34,7 @@ JAVA_OPTS_FOR_JDK_17="-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m sys_log_level = INFO sys_log_mode = NORMAL +sys_log_verbose_modules = org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl arrow_flight_sql_port = 8081 catalog_trash_expire_second=1 #enable ssl for test @@ -85,3 +86,7 @@ enable_deadlock_detection = true max_lock_hold_threshold_seconds = 10 force_olap_table_replication_allocation=tag.location.default:1 + +# profile related +max_query_profile_num = 2000 +max_spilled_profile_num = 2000 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org