This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 42bac3343d [Refactor](StmtExecutor)(step-1) Extract profile logic from StmtExecutor and Coordinator (#19219) 42bac3343d is described below commit 42bac3343d568062a554702f20e33a8c28e7d149 Author: Mingyu Chen <morning...@163.com> AuthorDate: Sat May 6 09:01:51 2023 +0800 [Refactor](StmtExecutor)(step-1) Extract profile logic from StmtExecutor and Coordinator (#19219) Previously, we use RuntimeProfile class directly, and because there are multiple level in profile, so you can see there may be several RuntimeProfile instances be to maintain. I created several new classes for profile: class Profile: The root profile of a execution task(query or load) class SummaryProfile: The profile that contains summary info of a execution task, such as start time, end time, query id. etc. class ExecutionProfile: The profile for a single Coordinator. Each Coordinator will have a ExecutionProfile. The profile structure is as following: Profile: SummaryProfile: ExecutionProfile 1: Fragment 0: Instance 0: Instance 1: ... Fragment 1: ... ExecutionProfile 2: ... You can see, each Profile has a SummaryProfile and one or more ExecutionProfile. For most kinds of job, such as query/insert, there is only one ExecutionProfile. But for broker load job, will may be more than one ExecutionProfile, corresponding to each sub task of the load job. How to use For query/insert, etc: Each StmtExcutor will have a Profile instance. Each Coordinator will have a ExecutionProfile instance. StmtExcutor is responsible for the SummaryProfile, it will update the SummaryProfile during the execution. Coordinator is responsible for the ExecutionProfile, it will first add ExecutionProfile to the child of Profile, and update the ExecutionProfile periodically during the execution. For Load/Export, etc: Each job will hava a Profile instance. For each Coordinator of this job, add its ExecutionProfile to the children of job's Profile. Behavior Change The columns of show load profile/show query profile and QueryProfile Web UI has changed to: | Profile ID | Task Type | Start Time | End Time | Total | Task State | User | Default Db| Sql Statement | Is Cached | Total Instances Num | Instances Num Per BE | Parallel Fragment Exec Instance Num | Trace ID | The Query Id and Job Id is removed and using Profile ID instead. For load job, the profile id is job id, for query/insert, is query id. --- build-for-release.sh | 2 +- .../doris/analysis/ShowQueryProfileStmt.java | 30 +-- .../doris/common/profile/ExecutionProfile.java | 150 ++++++++++++ .../org/apache/doris/common/profile/Profile.java | 80 +++++++ .../doris/common/profile/SummaryProfile.java | 264 +++++++++++++++++++++ .../java/org/apache/doris/common/util/Counter.java | 5 +- .../apache/doris/common/util/ProfileManager.java | 56 +---- .../apache/doris/common/util/ProfileWriter.java | 24 -- .../doris/common/util/QueryPlannerProfile.java | 130 ---------- .../apache/doris/common/util/RuntimeProfile.java | 12 +- .../org/apache/doris/common/util/TimeUtils.java | 8 +- .../httpv2/controller/QueryProfileController.java | 40 +--- .../main/java/org/apache/doris/load/ExportJob.java | 8 - .../apache/doris/load/loadv2/BrokerLoadJob.java | 46 ++-- .../org/apache/doris/load/loadv2/BulkLoadJob.java | 7 +- .../apache/doris/load/loadv2/LoadLoadingTask.java | 19 +- .../org/apache/doris/nereids/NereidsPlanner.java | 2 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 2 +- .../main/java/org/apache/doris/qe/Coordinator.java | 135 +++-------- .../java/org/apache/doris/qe/QeProcessorImpl.java | 22 +- .../java/org/apache/doris/qe/StmtExecutor.java | 177 +++++--------- .../org/apache/doris/task/ExportExportingTask.java | 43 ---- .../doris/common/util/RuntimeProfileTest.java | 8 +- .../apache/doris/common/util/TimeUtilsTest.java | 4 +- .../doris/load/loadv2/BrokerLoadJobTest.java | 10 +- .../org/apache/doris/qe/SessionVariablesTest.java | 9 - .../java/org/apache/doris/qe/StmtExecutorTest.java | 8 - .../suites/query_profile/test_profile.groovy | 14 +- 28 files changed, 689 insertions(+), 626 deletions(-) diff --git a/build-for-release.sh b/build-for-release.sh index 2c27cf6c42..e8bd282c0a 100755 --- a/build-for-release.sh +++ b/build-for-release.sh @@ -125,7 +125,7 @@ if [[ "${_USE_AVX2}" == "0" && "${ARCH}" == "x86_64" ]]; then OUTPUT_BE="${OUTPUT_BE}-noavx2" fi -echo "Pakage Name:" +echo "Package Name:" echo "FE: ${OUTPUT_FE}" echo "BE: ${OUTPUT_BE}" echo "JAR: ${OUTPUT_DEPS}" diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java index 50409eea0d..7477d1673b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowQueryProfileStmt.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.qe.ShowResultSetMetaData; import com.google.common.base.Strings; @@ -32,26 +33,7 @@ import com.google.common.base.Strings; // show query profile "/e0f7390f5363419e-b416a2a79996083e/0/e0f7390f5363419e-b416a2a799960906" # show instance's graph public class ShowQueryProfileStmt extends ShowStmt { // This should be same as ProfileManager.PROFILE_HEADERS - public static final ShowResultSetMetaData META_DATA_QUERY_IDS = - ShowResultSetMetaData.builder() - .addColumn(new Column("JobId", ScalarType.createVarchar(128))) - .addColumn(new Column("QueryId", ScalarType.createVarchar(128))) - .addColumn(new Column("User", ScalarType.createVarchar(128))) - .addColumn(new Column("DefaultDb", ScalarType.createVarchar(128))) - .addColumn(new Column("SQL", ScalarType.createVarchar(65535))) - .addColumn(new Column("QueryType", ScalarType.createVarchar(128))) - .addColumn(new Column("StartTime", ScalarType.createVarchar(128))) - .addColumn(new Column("EndTime", ScalarType.createVarchar(128))) - .addColumn(new Column("TotalTime", ScalarType.createVarchar(128))) - .addColumn(new Column("QueryState", ScalarType.createVarchar(128))) - .addColumn(new Column("TraceId", ScalarType.createVarchar(128))) - .addColumn(new Column("AnalysisTime", ScalarType.createVarchar(128))) - .addColumn(new Column("PlanTime", ScalarType.createVarchar(128))) - .addColumn(new Column("ScheduleTime", ScalarType.createVarchar(128))) - .addColumn(new Column("FetchResultTime", ScalarType.createVarchar(128))) - .addColumn(new Column("WriteResultTime", ScalarType.createVarchar(128))) - .addColumn(new Column("WaitAndFetchResultTime", ScalarType.createVarchar(128))) - .build(); + public static final ShowResultSetMetaData META_DATA_QUERY_IDS; public static final ShowResultSetMetaData META_DATA_FRAGMENTS = ShowResultSetMetaData.builder() @@ -68,6 +50,14 @@ public class ShowQueryProfileStmt extends ShowStmt { .addColumn(new Column("Instance", ScalarType.createVarchar(65535))) .build(); + static { + ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); + for (String key : SummaryProfile.SUMMARY_KEYS) { + builder.addColumn(new Column(key, ScalarType.createStringType())); + } + META_DATA_QUERY_IDS = builder.build(); + } + public enum PathType { QUERY_IDS, FRAGMENTS, diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java new file mode 100644 index 0000000000..e4c6c7c48d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/ExecutionProfile.java @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Status; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TUnit; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + + +/** + * ExecutionProfile is used to collect profile of a complete query plan(including query or load). + * Need to call addToProfileAsChild() to add it to the root profile. + * It has the following structure: + * Execution Profile: + * Fragment 0: + * Instance 0: + * ... + * Fragment 1: + * Instance 0: + * ... + * ... + * LoadChannels: // only for load job + */ +public class ExecutionProfile { + private static final Logger LOG = LogManager.getLogger(ExecutionProfile.class); + + // The root profile of this execution task + private RuntimeProfile executionProfile; + // Profiles for each fragment. And the InstanceProfile is the child of fragment profile. + // Which will be added to fragment profile when calling Coordinator::sendFragment() + private List<RuntimeProfile> fragmentProfiles; + // Profile for load channels. Only for load job. + private RuntimeProfile loadChannelProfile; + // A countdown latch to mark the completion of each instance. + // instance id -> dummy value + private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal; + + public ExecutionProfile(TUniqueId queryId, int fragmentNum) { + executionProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); + RuntimeProfile fragmentsProfile = new RuntimeProfile("Fragments"); + executionProfile.addChild(fragmentsProfile); + fragmentProfiles = Lists.newArrayList(); + for (int i = 0; i < fragmentNum; i++) { + fragmentProfiles.add(new RuntimeProfile("Fragment " + i)); + fragmentsProfile.addChild(fragmentProfiles.get(i)); + } + loadChannelProfile = new RuntimeProfile("LoadChannels"); + executionProfile.addChild(loadChannelProfile); + } + + public RuntimeProfile getExecutionProfile() { + return executionProfile; + } + + public RuntimeProfile getLoadChannelProfile() { + return loadChannelProfile; + } + + public void addToProfileAsChild(RuntimeProfile rootProfile) { + rootProfile.addChild(executionProfile); + } + + public void markInstances(Set<TUniqueId> instanceIds) { + profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size()); + for (TUniqueId instanceId : instanceIds) { + profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); + } + } + + public void update(long startTime, boolean isFinished) { + if (startTime > 0) { + executionProfile.getCounterTotalTime().setValue(TUnit.TIME_MS, TimeUtils.getElapsedTimeMs(startTime)); + } + // Wait for all backends to finish reporting when writing profile last time. + if (isFinished && profileDoneSignal != null) { + try { + profileDoneSignal.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e1) { + LOG.warn("signal await error", e1); + } + } + + for (RuntimeProfile fragmentProfile : fragmentProfiles) { + fragmentProfile.sortChildren(); + } + } + + public void onCancel() { + if (profileDoneSignal != null) { + // count down to zero to notify all objects waiting for this + profileDoneSignal.countDownToZero(new Status()); + LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks() + .stream().map(e -> DebugUtil.printId(e.getKey())).toArray()); + } + } + + public void markOneInstanceDone(TUniqueId fragmentInstanceId) { + if (profileDoneSignal != null) { + profileDoneSignal.markedCountDown(fragmentInstanceId, -1L); + } + } + + public boolean awaitAllInstancesDone(long waitTimeS) throws InterruptedException { + if (profileDoneSignal == null) { + return true; + } + return profileDoneSignal.await(waitTimeS, TimeUnit.SECONDS); + } + + public boolean isAllInstancesDone() { + if (profileDoneSignal == null) { + return true; + } + return profileDoneSignal.getCount() == 0; + } + + public void addInstanceProfile(int instanceIdx, RuntimeProfile instanceProfile) { + Preconditions.checkArgument(instanceIdx < fragmentProfiles.size(), + instanceIdx + " vs. " + fragmentProfiles.size()); + fragmentProfiles.get(instanceIdx).addChild(instanceProfile); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java new file mode 100644 index 0000000000..e1336ce5af --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.common.util.RuntimeProfile; + +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; + +/** + * Profile is a class to record the execution time of a query. + * It has the following structure: + * root profile: + * // summary of this profile, such as start time, end time, query id, etc. + * [SummaryProfile] + * // each execution profile is a complete execution of a query, a job may contain multiple queries. + * [List<ExecutionProfile>] + * + * SummaryProfile: + * Summary: + * Execution Summary: + * + * ExecutionProfile: + * Fragment 0: + * Fragment 1: + * ... + */ +public class Profile { + private RuntimeProfile rootProfile; + private SummaryProfile summaryProfile; + private List<ExecutionProfile> executionProfiles = Lists.newArrayList(); + private boolean isFinished; + + public Profile(String name, boolean isEnable) { + this.rootProfile = new RuntimeProfile(name); + this.summaryProfile = new SummaryProfile(rootProfile); + // if disabled, just set isFinished to true, so that update() will do nothing + this.isFinished = !isEnable; + } + + public void addExecutionProfile(ExecutionProfile executionProfile) { + this.executionProfiles.add(executionProfile); + executionProfile.addToProfileAsChild(rootProfile); + } + + public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished) { + if (this.isFinished) { + return; + } + summaryProfile.update(summaryInfo); + for (ExecutionProfile executionProfile : executionProfiles) { + executionProfile.update(startTime, isFinished); + } + rootProfile.computeTimeInProfile(); + ProfileManager.getInstance().pushProfile(rootProfile); + this.isFinished = isFinished; + } + + public SummaryProfile getSummaryProfile() { + return summaryProfile; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java new file mode 100644 index 0000000000..e3e2586307 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -0,0 +1,264 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.profile; + +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.thrift.TUnit; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +import java.util.Map; + +/** + * SummaryProfile is part of a query profile. + * It contains the summary information of a query. + */ +public class SummaryProfile { + // Summary + public static final String PROFILE_ID = "Profile ID"; + public static final String TASK_TYPE = "Task Type"; + public static final String START_TIME = "Start Time"; + public static final String END_TIME = "End Time"; + public static final String TOTAL_TIME = "Total"; + public static final String TASK_STATE = "Task State"; + public static final String USER = "User"; + public static final String DEFAULT_DB = "Default Db"; + public static final String SQL_STATEMENT = "Sql Statement"; + public static final String IS_CACHED = "Is Cached"; + public static final String TOTAL_INSTANCES_NUM = "Total Instances Num"; + public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE"; + public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num"; + public static final String TRACE_ID = "Trace ID"; + + // Execution Summary + public static final String ANALYSIS_TIME = "Analysis Time"; + public static final String PLAN_TIME = "Plan Time"; + public static final String SCHEDULE_TIME = "Schedule Time"; + public static final String FETCH_RESULT_TIME = "Fetch Result Time"; + public static final String WRITE_RESULT_TIME = "Write Result Time"; + public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time"; + + public static final ImmutableList<String> SUMMARY_KEYS = ImmutableList.of(PROFILE_ID, TASK_TYPE, + START_TIME, END_TIME, TOTAL_TIME, TASK_STATE, USER, DEFAULT_DB, SQL_STATEMENT, IS_CACHED, + TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID); + + public static final ImmutableList<String> EXECUTION_SUMMARY_KEYS = ImmutableList.of(ANALYSIS_TIME, PLAN_TIME, + SCHEDULE_TIME, FETCH_RESULT_TIME, WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME); + + private RuntimeProfile summaryProfile; + private RuntimeProfile executionSummaryProfile; + + // timestamp of query begin + private long queryBeginTime = -1; + // Analysis end time + private long queryAnalysisFinishTime = -1; + // Plan end time + private long queryPlanFinishTime = -1; + // Fragment schedule and send end time + private long queryScheduleFinishTime = -1; + // Query result fetch end time + private long queryFetchResultFinishTime = -1; + private long tempStarTime = -1; + private long queryFetchResultConsumeTime = 0; + private long queryWriteResultConsumeTime = 0; + + public SummaryProfile(RuntimeProfile rootProfile) { + summaryProfile = new RuntimeProfile("Summary"); + executionSummaryProfile = new RuntimeProfile("Execution Summary"); + init(); + rootProfile.addChild(summaryProfile); + rootProfile.addChild(executionSummaryProfile); + } + + private void init() { + for (String key : SUMMARY_KEYS) { + summaryProfile.addInfoString(key, "N/A"); + } + for (String key : EXECUTION_SUMMARY_KEYS) { + executionSummaryProfile.addInfoString(key, "N/A"); + } + } + + public void update(Map<String, String> summaryInfo) { + updateSummaryProfile(summaryInfo); + updateExecutionSummaryProfile(); + } + + private void updateSummaryProfile(Map<String, String> infos) { + for (String key : infos.keySet()) { + if (SUMMARY_KEYS.contains(key)) { + summaryProfile.addInfoString(key, infos.get(key)); + } + } + } + + private void updateExecutionSummaryProfile() { + executionSummaryProfile.addInfoString(ANALYSIS_TIME, getPrettyQueryAnalysisFinishTime()); + executionSummaryProfile.addInfoString(PLAN_TIME, getPrettyQueryPlanFinishTime()); + executionSummaryProfile.addInfoString(SCHEDULE_TIME, getPrettyQueryScheduleFinishTime()); + executionSummaryProfile.addInfoString(FETCH_RESULT_TIME, + RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(WRITE_RESULT_TIME, + RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_MS)); + executionSummaryProfile.addInfoString(WAIT_FETCH_RESULT_TIME, getPrettyQueryFetchResultFinishTime()); + } + + public void setQueryBeginTime() { + this.queryBeginTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryAnalysisFinishTime() { + this.queryAnalysisFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryPlanFinishTime() { + this.queryPlanFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryScheduleFinishTime() { + this.queryScheduleFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setQueryFetchResultFinishTime() { + this.queryFetchResultFinishTime = TimeUtils.getStartTimeMs(); + } + + public void setTempStartTime() { + this.tempStarTime = TimeUtils.getStartTimeMs(); + } + + public void freshFetchResultConsumeTime() { + this.queryFetchResultConsumeTime += TimeUtils.getStartTimeMs() - tempStarTime; + } + + public void freshWriteResultConsumeTime() { + this.queryWriteResultConsumeTime += TimeUtils.getStartTimeMs() - tempStarTime; + } + + public long getQueryBeginTime() { + return queryBeginTime; + } + + public static class SummaryBuilder { + private Map<String, String> map = Maps.newHashMap(); + + public SummaryBuilder profileId(String val) { + map.put(PROFILE_ID, val); + return this; + } + + public SummaryBuilder taskType(String val) { + map.put(TASK_TYPE, val); + return this; + } + + public SummaryBuilder startTime(String val) { + map.put(START_TIME, val); + return this; + } + + public SummaryBuilder endTime(String val) { + map.put(END_TIME, val); + return this; + } + + public SummaryBuilder totalTime(String val) { + map.put(TOTAL_TIME, val); + return this; + } + + public SummaryBuilder taskState(String val) { + map.put(TASK_STATE, val); + return this; + } + + public SummaryBuilder user(String val) { + map.put(USER, val); + return this; + } + + public SummaryBuilder defaultDb(String val) { + map.put(DEFAULT_DB, val); + return this; + } + + public SummaryBuilder sqlStatement(String val) { + map.put(SQL_STATEMENT, val); + return this; + } + + public SummaryBuilder isCached(String val) { + map.put(IS_CACHED, val); + return this; + } + + public SummaryBuilder totalInstancesNum(String val) { + map.put(TOTAL_INSTANCES_NUM, val); + return this; + } + + public SummaryBuilder instancesNumPerBe(String val) { + map.put(INSTANCES_NUM_PER_BE, val); + return this; + } + + public SummaryBuilder parallelFragmentExecInstance(String val) { + map.put(PARALLEL_FRAGMENT_EXEC_INSTANCE, val); + return this; + } + + public SummaryBuilder traceId(String val) { + map.put(TRACE_ID, val); + return this; + } + + public Map<String, String> build() { + return map; + } + } + + private String getPrettyQueryAnalysisFinishTime() { + if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryAnalysisFinishTime - queryBeginTime, TUnit.TIME_MS); + } + + private String getPrettyQueryPlanFinishTime() { + if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryPlanFinishTime - queryAnalysisFinishTime, TUnit.TIME_MS); + } + + private String getPrettyQueryScheduleFinishTime() { + if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryScheduleFinishTime - queryPlanFinishTime, TUnit.TIME_MS); + } + + private String getPrettyQueryFetchResultFinishTime() { + if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) { + return "N/A"; + } + return RuntimeProfile.printCounter(queryFetchResultFinishTime - queryScheduleFinishTime, TUnit.TIME_MS); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java index 8ec969944f..cbd5e88c79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java @@ -28,8 +28,9 @@ public class Counter { return value; } - public void setValue(long newValue) { - value = newValue; + public void setValue(TUnit type, long value) { + this.type = type.getValue(); + this.value = value; } public TUnit getType() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java index 6d0f575d38..dbfb7e83f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileManager.java @@ -25,6 +25,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.profile.MultiProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeBuilder; import org.apache.doris.common.profile.ProfileTreeNode; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.nereids.stats.StatsErrorEstimator; import com.google.common.base.Strings; @@ -34,8 +35,6 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Arrays; -import java.util.Collections; import java.util.Deque; import java.util.Iterator; import java.util.LinkedList; @@ -58,48 +57,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; public class ProfileManager { private static final Logger LOG = LogManager.getLogger(ProfileManager.class); private static volatile ProfileManager INSTANCE = null; - // private static final int ARRAY_SIZE = 100; - // private static final int TOTAL_LEN = 1000 * ARRAY_SIZE ; - // just use for load profile and export profile - public static final String JOB_ID = "Job ID"; - public static final String QUERY_ID = "Query ID"; - public static final String START_TIME = "Start Time"; - public static final String END_TIME = "End Time"; - public static final String TOTAL_TIME = "Total"; - public static final String QUERY_TYPE = "Query Type"; - public static final String QUERY_STATE = "Query State"; - public static final String DORIS_VERSION = "Doris Version"; - public static final String USER = "User"; - public static final String DEFAULT_DB = "Default Db"; - public static final String SQL_STATEMENT = "Sql Statement"; - public static final String IS_CACHED = "Is Cached"; - - public static final String TOTAL_INSTANCES_NUM = "Total Instances Num"; - - public static final String INSTANCES_NUM_PER_BE = "Instances Num Per BE"; - - public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE = "Parallel Fragment Exec Instance Num"; - - public static final String TRACE_ID = "Trace ID"; - public static final String ANALYSIS_TIME = "Analysis Time"; - public static final String FETCH_RESULT_TIME = "Fetch Result Time"; - public static final String PLAN_TIME = "Plan Time"; - public static final String SCHEDULE_TIME = "Schedule Time"; - public static final String WRITE_RESULT_TIME = "Write Result Time"; - public static final String WAIT_FETCH_RESULT_TIME = "Wait and Fetch Result Time"; public enum ProfileType { QUERY, LOAD, } - public static final List<String> PROFILE_HEADERS = Collections.unmodifiableList( - Arrays.asList(JOB_ID, QUERY_ID, USER, DEFAULT_DB, SQL_STATEMENT, QUERY_TYPE, - START_TIME, END_TIME, TOTAL_TIME, QUERY_STATE, TRACE_ID)); - public static final List<String> EXECUTION_HEADERS = Collections.unmodifiableList( - Arrays.asList(ANALYSIS_TIME, PLAN_TIME, SCHEDULE_TIME, FETCH_RESULT_TIME, - WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME)); - public static class ProfileElement { public ProfileElement(RuntimeProfile profile) { this.profile = profile; @@ -164,13 +127,13 @@ public class ProfileManager { public ProfileElement createElement(RuntimeProfile profile) { ProfileElement element = new ProfileElement(profile); RuntimeProfile summaryProfile = profile.getChildList().get(0).first; - for (String header : PROFILE_HEADERS) { + for (String header : SummaryProfile.SUMMARY_KEYS) { element.infoStrings.put(header, summaryProfile.getInfoString(header)); } List<Pair<RuntimeProfile, Boolean>> childList = summaryProfile.getChildList(); if (!childList.isEmpty()) { RuntimeProfile executionProfile = childList.get(0).first; - for (String header : EXECUTION_HEADERS) { + for (String header : SummaryProfile.EXECUTION_SUMMARY_KEYS) { element.infoStrings.put(header, executionProfile.getInfoString(header)); } } @@ -194,7 +157,7 @@ public class ProfileManager { ProfileElement element = createElement(profile); // 'insert into' does have job_id, put all profiles key with query_id - String key = element.infoStrings.get(ProfileManager.QUERY_ID); + String key = element.infoStrings.get(SummaryProfile.PROFILE_ID); // check when push in, which can ensure every element in the list has QUERY_ID column, // so there is no need to check when remove element from list. if (Strings.isNullOrEmpty(key)) { @@ -235,15 +198,12 @@ public class ProfileManager { continue; } Map<String, String> infoStrings = profileElement.infoStrings; - if (type != null && !infoStrings.get(QUERY_TYPE).equalsIgnoreCase(type.name())) { + if (type != null && !infoStrings.get(SummaryProfile.TASK_TYPE).equalsIgnoreCase(type.name())) { continue; } List<String> row = Lists.newArrayList(); - for (String str : PROFILE_HEADERS) { - row.add(infoStrings.get(str)); - } - for (String str : EXECUTION_HEADERS) { + for (String str : SummaryProfile.SUMMARY_KEYS) { row.add(infoStrings.get(str)); } result.add(row); @@ -285,7 +245,7 @@ public class ProfileManager { if (element == null) { throw new AuthenticationException("query with id " + queryId + " not found"); } - if (!element.infoStrings.get(USER).equals(user)) { + if (!element.infoStrings.get(SummaryProfile.USER).equals(user)) { throw new AuthenticationException("Access deny to view query with id: " + queryId); } } finally { @@ -377,7 +337,7 @@ public class ProfileManager { readLock.lock(); try { for (Map.Entry<String, ProfileElement> entry : queryIdToProfileMap.entrySet()) { - if (entry.getValue().infoStrings.getOrDefault(TRACE_ID, "").equals(traceId)) { + if (entry.getValue().infoStrings.getOrDefault(SummaryProfile.TRACE_ID, "").equals(traceId)) { return entry.getKey(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java deleted file mode 100644 index 3a472708de..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/ProfileWriter.java +++ /dev/null @@ -1,24 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -// this interface is used to write profile to ProfileManager when a task is running. -public interface ProfileWriter { - - void writeProfile(boolean waitReportDone); -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java deleted file mode 100644 index df5ae5aee2..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/QueryPlannerProfile.java +++ /dev/null @@ -1,130 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.util; - -import org.apache.doris.thrift.TUnit; - -/** - * This profile is mainly used to record the time-consuming situation related to - * executing SQL parsing, planning, scheduling, and fetching results on the FE side. - * Can be expanded later. - * - * All timestamp is in nona second - */ -public class QueryPlannerProfile { - public static final String KEY_ANALYSIS = "Analysis Time"; - public static final String KEY_PLAN = "Plan Time"; - public static final String KEY_SCHEDULE = "Schedule Time"; - public static final String KEY_WAIT_AND_FETCH = "Wait and Fetch Result Time"; - - public static final String KEY_FETCH = "Fetch Result Time"; - - public static final String KEY_WRITE = "Write Result Time"; - - // timestamp of query begin - private long queryBeginTime = -1; - // Analysis end time - private long queryAnalysisFinishTime = -1; - // Plan end time - private long queryPlanFinishTime = -1; - // Fragment schedule and send end time - private long queryScheduleFinishTime = -1; - // Query result fetch end time - private long queryFetchResultFinishTime = -1; - - private long tempStarTime = -1; - - private long queryFetchResultConsumeTime = 0; - - private long queryWriteResultConsumeTime = 0; - - public void setQueryBeginTime() { - this.queryBeginTime = TimeUtils.getStartTime(); - } - - public void setQueryAnalysisFinishTime() { - this.queryAnalysisFinishTime = TimeUtils.getStartTime(); - } - - public void setQueryPlanFinishTime() { - this.queryPlanFinishTime = TimeUtils.getStartTime(); - } - - public void setQueryScheduleFinishTime() { - this.queryScheduleFinishTime = TimeUtils.getStartTime(); - } - - public void setQueryFetchResultFinishTime() { - this.queryFetchResultFinishTime = TimeUtils.getStartTime(); - } - - public void setTempStartTime() { - this.tempStarTime = TimeUtils.getStartTime(); - } - - public void freshFetchResultConsumeTime() { - this.queryFetchResultConsumeTime += TimeUtils.getStartTime() - tempStarTime; - } - - public void freshWriteResultConsumeTime() { - this.queryWriteResultConsumeTime += TimeUtils.getStartTime() - tempStarTime; - } - - public long getQueryBeginTime() { - return queryBeginTime; - } - - private String getPrettyQueryAnalysisFinishTime() { - if (queryBeginTime == -1 || queryAnalysisFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryAnalysisFinishTime - queryBeginTime, TUnit.TIME_NS); - } - - private String getPrettyQueryPlanFinishTime() { - if (queryAnalysisFinishTime == -1 || queryPlanFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryPlanFinishTime - queryAnalysisFinishTime, TUnit.TIME_NS); - } - - private String getPrettyQueryScheduleFinishTime() { - if (queryPlanFinishTime == -1 || queryScheduleFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryScheduleFinishTime - queryPlanFinishTime, TUnit.TIME_NS); - } - - private String getPrettyQueryFetchResultFinishTime() { - if (queryScheduleFinishTime == -1 || queryFetchResultFinishTime == -1) { - return "N/A"; - } - return RuntimeProfile.printCounter(queryFetchResultFinishTime - queryScheduleFinishTime, TUnit.TIME_NS); - } - - public void initRuntimeProfile(RuntimeProfile plannerProfile) { - plannerProfile.addInfoString(KEY_ANALYSIS, getPrettyQueryAnalysisFinishTime()); - plannerProfile.addInfoString(KEY_PLAN, getPrettyQueryPlanFinishTime()); - plannerProfile.addInfoString(KEY_SCHEDULE, getPrettyQueryScheduleFinishTime()); - plannerProfile.addInfoString(KEY_FETCH, - RuntimeProfile.printCounter(queryFetchResultConsumeTime, TUnit.TIME_NS)); - plannerProfile.addInfoString(KEY_WRITE, - RuntimeProfile.printCounter(queryWriteResultConsumeTime, TUnit.TIME_NS)); - plannerProfile.addInfoString(KEY_WAIT_AND_FETCH, getPrettyQueryFetchResultFinishTime()); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 6073ef35f9..075893e498 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -174,7 +174,7 @@ public class RuntimeProfile { LOG.error("Cannot update counters with the same name but different types" + " type=" + tcounter.type); } else { - counter.setValue(tcounter.value); + counter.setValue(tcounter.type, tcounter.value); } } } @@ -349,6 +349,15 @@ public class RuntimeProfile { } break; } + case TIME_MS: { + if (tmpValue >= DebugUtil.THOUSAND) { + // If the time is over a second, print it up to ms. + DebugUtil.printTimeMs(tmpValue, builder); + } else { + builder.append(tmpValue).append("ms"); + } + break; + } case BYTES: { Pair<Double, String> pair = DebugUtil.getByteUint(tmpValue); Formatter fmt = new Formatter(); @@ -505,3 +514,4 @@ public class RuntimeProfile { return infoStrings; } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java index 9d73d0b368..72443b5a26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java @@ -116,12 +116,12 @@ public class TimeUtils { } } - public static long getStartTime() { - return System.nanoTime(); + public static long getStartTimeMs() { + return System.currentTimeMillis(); } - public static long getEstimatedTime(long startTime) { - return System.nanoTime() - startTime; + public static long getElapsedTimeMs(long startTime) { + return System.currentTimeMillis() - startTime; } public static synchronized String getCurrentFormatTime() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java index 775d59c0aa..674c6d9014 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/controller/QueryProfileController.java @@ -17,6 +17,7 @@ package org.apache.doris.httpv2.controller; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.ProfileManager; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; @@ -70,50 +71,25 @@ public class QueryProfileController extends BaseController { private void addFinishedQueryInfo(Map<String, Object> result) { List<List<String>> finishedQueries = ProfileManager.getInstance().getAllQueries(); List<String> columnHeaders = Lists.newLinkedList(); - columnHeaders.addAll(ProfileManager.PROFILE_HEADERS); - columnHeaders.addAll(ProfileManager.EXECUTION_HEADERS); - int jobIdIndex = -1; - int queryIdIndex = -1; - int queryTypeIndex = -1; - for (int i = 0; i < columnHeaders.size(); ++i) { - if (columnHeaders.get(i).equals(ProfileManager.JOB_ID)) { - jobIdIndex = i; - continue; - } - if (columnHeaders.get(i).equals(ProfileManager.QUERY_ID)) { - queryIdIndex = i; - continue; - } - if (columnHeaders.get(i).equals(ProfileManager.QUERY_TYPE)) { - queryTypeIndex = i; - continue; - } - } - // set href as the first column - columnHeaders.add(0, DETAIL_COL); + columnHeaders.addAll(SummaryProfile.SUMMARY_KEYS); result.put("column_names", columnHeaders); - result.put("href_column", Lists.newArrayList(DETAIL_COL)); + // The first column is profile id, which is also a href column + result.put("href_column", Lists.newArrayList(columnHeaders.get(0))); List<Map<String, Object>> list = Lists.newArrayList(); result.put("rows", list); for (List<String> row : finishedQueries) { - List<String> realRow = Lists.newLinkedList(row); - - String queryType = realRow.get(queryTypeIndex); - String id = (QUERY_ID_TYPES.contains(queryType)) ? realRow.get(queryIdIndex) : realRow.get(jobIdIndex); - - realRow.add(0, id); Map<String, Object> rowMap = new HashMap<>(); - for (int i = 0; i < realRow.size(); ++i) { - rowMap.put(columnHeaders.get(i), realRow.get(i)); + for (int i = 0; i < row.size(); ++i) { + rowMap.put(columnHeaders.get(i), row.get(i)); } // add hyper link - if (Strings.isNullOrEmpty(id)) { + if (Strings.isNullOrEmpty(row.get(0))) { rowMap.put("__hrefPaths", Lists.newArrayList("/query_profile/-1")); } else { - rowMap.put("__hrefPaths", Lists.newArrayList("/query_profile/" + id)); + rowMap.put("__hrefPaths", Lists.newArrayList("/query_profile/" + row.get(0))); } list.add(rowMap); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index ac1dec8d7a..dfd213b047 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -158,9 +158,6 @@ public class ExportJob implements Writable { private String sql = ""; - // If set to true, the profile of export job with be pushed to ProfileManager - private volatile boolean enableProfile = false; - // The selectStmt is sql 'select ... into outfile ...' @Getter private List<QueryStmt> selectStmtList = Lists.newArrayList(); @@ -220,7 +217,6 @@ public class ExportJob implements Writable { this.exportPath = path; this.sessionVariables = stmt.getSessionVariables(); this.timeoutSecond = sessionVariables.getQueryTimeoutS(); - this.enableProfile = sessionVariables.enableProfile(); this.qualifiedUser = stmt.getQualifiedUser(); this.userIdentity = stmt.getUserIdentity(); @@ -619,10 +615,6 @@ public class ExportJob implements Writable { return queryId; } - public boolean getEnableProfile() { - return enableProfile; - } - @Override public String toString() { return "ExportJob [jobId=" + id diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 01bbd795f4..bfdce1dc8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -31,12 +31,13 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.Profile; +import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.MetaLockUtils; -import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; @@ -73,7 +74,7 @@ public class BrokerLoadJob extends BulkLoadJob { private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class); // Profile of this load job, including all tasks' profiles - private RuntimeProfile jobProfile; + private Profile jobProfile; // If set to true, the profile of load job with be pushed to ProfileManager private boolean enableProfile = false; @@ -188,7 +189,7 @@ public class BrokerLoadJob extends BulkLoadJob { Lists.newArrayList(fileGroupAggInfo.getAllTableIds())); // divide job into broker loading task by table List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList(); - this.jobProfile = new RuntimeProfile("BrokerLoadJob " + id + ". " + label); + this.jobProfile = new Profile("BrokerLoadJob " + id + ". " + label, true); MetaLockUtils.readLockTables(tableList); try { for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry @@ -314,27 +315,24 @@ public class BrokerLoadJob extends BulkLoadJob { if (!enableProfile) { return; } + jobProfile.update(createTimestamp, getSummaryInfo(true), true); + } - RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); - summaryProfile.addInfoString(ProfileManager.JOB_ID, String.valueOf(this.id)); - summaryProfile.addInfoString(ProfileManager.QUERY_ID, this.queryId); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(createTimestamp)); - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(finishTimestamp)); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, - DebugUtil.getPrettyStringMs(finishTimestamp - createTimestamp)); - - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load"); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, "N/A"); - summaryProfile.addInfoString(ProfileManager.USER, - getUserInfo() != null ? getUserInfo().getQualifiedUser() : "N/A"); - summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, getDefaultDb()); - summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, this.getOriginStmt().originStmt); - summaryProfile.addInfoString(ProfileManager.IS_CACHED, "N/A"); - - // Add the summary profile to the first - jobProfile.addFirstChild(summaryProfile); - jobProfile.computeTimeInChildProfile(); - ProfileManager.getInstance().pushProfile(jobProfile); + private Map<String, String> getSummaryInfo(boolean isFinished) { + long currentTimestamp = System.currentTimeMillis(); + SummaryBuilder builder = new SummaryBuilder(); + builder.profileId(String.valueOf(id)); + builder.taskType(ProfileType.LOAD.name()); + builder.startTime(TimeUtils.longToTimeString(createTimestamp)); + if (isFinished) { + builder.endTime(TimeUtils.longToTimeString(currentTimestamp)); + builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - createTimestamp)); + } + builder.taskState("FINISHED"); + builder.user(getUserInfo() != null ? getUserInfo().getQualifiedUser() : "N/A"); + builder.defaultDb(getDefaultDb()); + builder.sqlStatement(getOriginStmt().originStmt); + return builder.build(); } private String getDefaultDb() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index cc8248004c..677bd449e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -33,7 +33,6 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.annotation.LogException; import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.common.util.SqlParserUtils; @@ -77,11 +76,9 @@ public abstract class BulkLoadJob extends LoadJob { // input params protected BrokerDesc brokerDesc; - // queryId of OriginStatement - protected String queryId; // this param is used to persist the expr of columns // the origin stmt is persisted instead of columns expr - // the expr of columns will be reanalyze when the log is replayed + // the expr of columns will be reanalyzed when the log is replayed private OriginStatement originStmt; // include broker desc and data desc @@ -104,11 +101,9 @@ public abstract class BulkLoadJob extends LoadJob { this.userInfo = userInfo; if (ConnectContext.get() != null) { - this.queryId = DebugUtil.printId(ConnectContext.get().queryId()); SessionVariable var = ConnectContext.get().getSessionVariable(); sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); } else { - this.queryId = "N/A"; sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index cb90c075ce..13e8a5beaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -25,11 +25,10 @@ import org.apache.doris.common.Config; import org.apache.doris.common.LoadException; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.Profile; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; -import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.Coordinator; @@ -74,7 +73,7 @@ public class LoadLoadingTask extends LoadTask { private LoadingTaskPlanner planner; - private RuntimeProfile jobProfile; + private Profile jobProfile; private long beginTime; public LoadLoadingTask(Database db, OlapTable table, @@ -82,7 +81,7 @@ public class LoadLoadingTask extends LoadTask { long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, - boolean loadZeroTolerance, RuntimeProfile profile, boolean singleTabletLoadPerSink, + boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, boolean useNewLoadScanNode) { super(callback, TaskType.LOADING); this.db = db; @@ -100,7 +99,7 @@ public class LoadLoadingTask extends LoadTask { this.loadParallelism = loadParallelism; this.sendBatchParallelism = sendBatchParallelism; this.loadZeroTolerance = loadZeroTolerance; - this.jobProfile = profile; + this.jobProfile = jobProfile; this.singleTabletLoadPerSink = singleTabletLoadPerSink; this.useNewLoadScanNode = useNewLoadScanNode; } @@ -123,7 +122,7 @@ public class LoadLoadingTask extends LoadTask { LOG.info("begin to execute loading task. load id: {} job id: {}. db: {}, tbl: {}. left retry: {}", DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime); retryTime--; - beginTime = System.nanoTime(); + beginTime = System.currentTimeMillis(); if (!((BrokerLoadJob) callback).updateState(JobState.LOADING)) { // job may already be cancelled return; @@ -135,9 +134,13 @@ public class LoadLoadingTask extends LoadTask { // New one query id, Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), loadId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance); + if (this.jobProfile != null) { + this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile()); + } curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); curCoordinator.setExecPipEngine(Config.enable_pipeline_load); + /* * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. * And the variable 'load_mem_limit' does not make any effect. @@ -200,9 +203,7 @@ public class LoadLoadingTask extends LoadTask { return; } // Summary profile - coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime)); - coord.endProfile(); - jobProfile.addChild(coord.getQueryProfile()); + coord.getExecutionProfile().update(beginTime, true); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 4235a96bb5..334983b0c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -181,7 +181,7 @@ public class NereidsPlanner extends Planner { } if (statementContext.getConnectContext().getExecutor() != null) { - statementContext.getConnectContext().getExecutor().getPlannerProfile().setQueryAnalysisFinishTime(); + statementContext.getConnectContext().getExecutor().getSummaryProfile().setQueryAnalysisFinishTime(); } if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 018176bbb1..50546cc2cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -651,7 +651,7 @@ public class ConnectProcessor { && (executor.getParsedStmt() instanceof QueryStmt // currently only QueryStmt and insert need profile || executor.getParsedStmt() instanceof LogicalPlanAdapter || executor.getParsedStmt() instanceof InsertStmt)) { - executor.writeProfile(true); + executor.updateProfile(true); StatsErrorEstimator statsErrorEstimator = ConnectContext.get().getStatsErrorEstimator(); if (statsErrorEstimator != null) { statsErrorEstimator.updateProfile(ConnectContext.get().queryId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 87282dcaac..7becc8d35c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -24,17 +24,16 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.Config; -import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.telemetry.ScopedSpan; import org.apache.doris.common.telemetry.Telemetry; import org.apache.doris.common.util.ConsistentHash; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ListUtil; -import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.VectorizedUtil; @@ -189,14 +188,6 @@ public class Coordinator { // Once this is set to true, errors from remote fragments are ignored. private boolean returnedAllResults; - private RuntimeProfile queryProfile; - - private RuntimeProfile fragmentsProfile; - private List<RuntimeProfile> fragmentProfile; - private RuntimeProfile loadChannelProfile; - - private ProfileWriter profileWriter; - // populated in computeFragmentExecParams() private final Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap(); @@ -219,8 +210,6 @@ public class Coordinator { // set in computeFragmentExecParams(); // same as backend_exec_states_.size() after Exec() private final Set<TUniqueId> instanceIds = Sets.newHashSet(); - // instance id -> dummy value - private MarkedCountDownLatch<TUniqueId, Long> profileDoneSignal; private final boolean isBlockQuery; @@ -270,6 +259,12 @@ public class Coordinator { private List<TPipelineResourceGroup> tResourceGroups = Lists.newArrayList(); + private final ExecutionProfile executionProfile; + + public ExecutionProfile getExecutionProfile() { + return executionProfile; + } + private static class BackendHash implements Funnel<Backend> { @Override public void funnel(Backend backend, PrimitiveSink primitiveSink) { @@ -289,13 +284,14 @@ public class Coordinator { } } + // Used for query/insert public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner, StatsErrorEstimator statsErrorEstimator) { this(context, analyzer, planner); this.statsErrorEstimator = statsErrorEstimator; } - // Used for query/insert + // Used for query/insert/test public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.isBlockQuery = planner.isBlockQuery(); this.queryId = context.queryId(); @@ -350,12 +346,13 @@ public class Coordinator { nextInstanceId.setLo(queryId.lo + 1); this.assignedRuntimeFilters = planner.getRuntimeFilters(); this.tResourceGroups = analyzer == null ? null : analyzer.getResourceGroups(); + this.executionProfile = new ExecutionProfile(queryId, fragments.size()); + } // Used for broker load task/export task/update coordinator - public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, - List<PlanFragment> fragments, List<ScanNode> scanNodes, String timezone, - boolean loadZeroTolerance) { + public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, List<PlanFragment> fragments, + List<ScanNode> scanNodes, String timezone, boolean loadZeroTolerance) { this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; @@ -372,6 +369,7 @@ public class Coordinator { this.nextInstanceId = new TUniqueId(); nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); + this.executionProfile = new ExecutionProfile(queryId, fragments.size()); } private void setFromUserProperty(ConnectContext connectContext) { @@ -427,18 +425,6 @@ public class Coordinator { return queryStatus; } - public RuntimeProfile getQueryProfile() { - return queryProfile; - } - - public ProfileWriter getProfileWriter() { - return profileWriter; - } - - public void setProfileWriter(ProfileWriter profileWriter) { - this.profileWriter = profileWriter; - } - public List<String> getDeltaUrls() { return deltaUrls; } @@ -525,20 +511,6 @@ public class Coordinator { coordAddress = new TNetworkAddress(localIP, Config.rpc_port); - int fragmentSize = fragments.size(); - queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(queryId)); - - fragmentsProfile = new RuntimeProfile("Fragments"); - queryProfile.addChild(fragmentsProfile); - fragmentProfile = new ArrayList<RuntimeProfile>(); - for (int i = 0; i < fragmentSize; i++) { - fragmentProfile.add(new RuntimeProfile("Fragment " + i)); - fragmentsProfile.addChild(fragmentProfile.get(i)); - } - - loadChannelProfile = new RuntimeProfile("LoadChannels"); - queryProfile.addChild(loadChannelProfile); - this.idToBackend = Env.getCurrentSystemInfo().getIdToBackend(); if (LOG.isDebugEnabled()) { LOG.debug("idToBackend size={}", idToBackend.size()); @@ -636,14 +608,7 @@ public class Coordinator { relatedBackendIds); LOG.info("dispatch load job: {} to {}", DebugUtil.printId(queryId), addressToBackendID.keySet()); } - - // to keep things simple, make async Cancel() calls wait until plan fragment - // execution has been initiated, otherwise we might try to cancel fragment - // execution at backends where it hasn't even started - profileDoneSignal = new MarkedCountDownLatch<TUniqueId, Long>(instanceIds.size()); - for (TUniqueId instanceId : instanceIds) { - profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */); - } + executionProfile.markInstances(instanceIds); if (!isPointQuery) { if (enablePipelineEngine) { sendPipelineCtx(); @@ -736,7 +701,8 @@ public class Coordinator { for (TExecPlanFragmentParams tParam : tParams) { BackendExecState execState = new BackendExecState(fragment.getFragmentId(), instanceId++, - profileFragmentId, tParam, this.addressToBackendID, loadChannelProfile); + profileFragmentId, tParam, this.addressToBackendID, + executionProfile.getLoadChannelProfile()); // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. // Notice. load fragment has a small probability that FragmentNumOnHost is 0, for unknown reasons. @@ -1273,12 +1239,7 @@ public class Coordinator { return; } cancelRemoteFragmentsAsync(cancelReason); - if (profileDoneSignal != null) { - // count down to zero to notify all objects waiting for this - profileDoneSignal.countDownToZero(new Status()); - LOG.info("unfinished instance: {}", profileDoneSignal.getLeftMarks() - .stream().map(e -> DebugUtil.printId(e.getKey())).toArray()); - } + executionProfile.onCancel(); } private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { @@ -2164,7 +2125,7 @@ public class Coordinator { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - profileDoneSignal.markedCountDown(params.getFragmentInstanceId(), -1L); + executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } if (params.isSetLoadedRows()) { @@ -2222,7 +2183,7 @@ public class Coordinator { if (params.isSetErrorTabletInfos()) { updateErrorTabletInfos(params.getErrorTabletInfos()); } - profileDoneSignal.markedCountDown(params.getFragmentInstanceId(), -1L); + executionProfile.markOneInstanceDone(params.getFragmentInstanceId()); } if (params.isSetLoadedRows()) { @@ -2233,35 +2194,6 @@ public class Coordinator { } } - public void endProfile() { - endProfile(true); - } - - public void endProfile(boolean waitProfileDone) { - if (enablePipelineEngine) { - if (pipelineExecContexts.isEmpty()) { - return; - } - } else { - if (backendExecStates.isEmpty()) { - return; - } - } - - // Wait for all backends to finish reporting when writing profile last time. - if (waitProfileDone && needReport) { - try { - profileDoneSignal.await(2, TimeUnit.SECONDS); - } catch (InterruptedException e1) { - LOG.warn("signal await error", e1); - } - } - - for (int i = 1; i < fragmentProfile.size(); ++i) { - fragmentProfile.get(i).sortChildren(); - } - } - /* * Waiting the coordinator finish executing. * return false if waiting timeout. @@ -2284,7 +2216,7 @@ public class Coordinator { long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime); boolean awaitRes = false; try { - awaitRes = profileDoneSignal.await(waitTime, TimeUnit.SECONDS); + awaitRes = executionProfile.awaitAllInstancesDone(waitTime); } catch (InterruptedException e) { // Do nothing } @@ -2327,7 +2259,7 @@ public class Coordinator { } public boolean isDone() { - return profileDoneSignal.getCount() == 0; + return executionProfile.isAllInstancesDone(); } // map from an impalad host address to the per-node assigned scan ranges; @@ -2577,7 +2509,7 @@ public class Coordinator { volatile boolean done; boolean hasCanceled; int profileFragmentId; - RuntimeProfile profile; + RuntimeProfile instanceProfile; RuntimeProfile loadChannelProfile; TNetworkAddress brpcAddress; TNetworkAddress address; @@ -2601,7 +2533,7 @@ public class Coordinator { String name = "Instance " + DebugUtil.printId(fi.instanceId) + " (host=" + address + ")"; this.loadChannelProfile = loadChannelProfile; - this.profile = new RuntimeProfile(name); + this.instanceProfile = new RuntimeProfile(name); this.hasCanceled = false; this.lastMissingHeartbeatTime = backend.getLastMissingHeartbeatTime(); } @@ -2628,7 +2560,7 @@ public class Coordinator { return false; } if (params.isSetProfile()) { - profile.update(params.profile); + instanceProfile.update(params.profile); } if (params.isSetLoadChannelProfile()) { loadChannelProfile.update(params.loadChannelProfile); @@ -2641,8 +2573,8 @@ public class Coordinator { } public synchronized void printProfile(StringBuilder builder) { - this.profile.computeTimeInProfile(); - this.profile.prettyPrint(builder, ""); + this.instanceProfile.computeTimeInProfile(); + this.instanceProfile.prettyPrint(builder, ""); } // cancel the fragment instance. @@ -2695,7 +2627,7 @@ public class Coordinator { LOG.warn("profileFragmentId {} should be in [0, {})", profileFragmentId, maxFragmentId); return false; } - profile.computeTimeInProfile(); + instanceProfile.computeTimeInProfile(); return true; } @@ -3378,18 +3310,13 @@ public class Coordinator { private void attachInstanceProfileToFragmentProfile() { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { - if (!ctx.computeTimeInProfile(fragmentProfile.size())) { - return; - } ctx.fragmentInstancesMap.values().stream() - .forEach(p -> fragmentProfile.get(ctx.profileFragmentId).addChild(p)); + .forEach(p -> executionProfile.addInstanceProfile(ctx.profileFragmentId, p)); } } else { for (BackendExecState backendExecState : backendExecStates) { - if (!backendExecState.computeTimeInProfile(fragmentProfile.size())) { - return; - } - fragmentProfile.get(backendExecState.profileFragmentId).addChild(backendExecState.profile); + executionProfile.addInstanceProfile(backendExecState.profileFragmentId, + backendExecState.instanceProfile); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java index 9b20fe4f7f..eea008f143 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java @@ -20,8 +20,8 @@ package org.apache.doris.qe; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; +import org.apache.doris.common.profile.ExecutionProfile; import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.metric.MetricRepo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TQueryType; @@ -163,16 +163,12 @@ public final class QeProcessorImpl implements QeProcessor { continue; } final String queryIdStr = DebugUtil.printId(info.getConnectContext().queryId()); - final QueryStatisticsItem item = new QueryStatisticsItem.Builder() - .queryId(queryIdStr) - .queryStartTime(info.getStartExecTime()) - .sql(info.getSql()) - .user(context.getQualifiedUser()) - .connId(String.valueOf(context.getConnectionId())) - .db(context.getDatabase()) + final QueryStatisticsItem item = new QueryStatisticsItem.Builder().queryId(queryIdStr) + .queryStartTime(info.getStartExecTime()).sql(info.getSql()).user(context.getQualifiedUser()) + .connId(String.valueOf(context.getConnectionId())).db(context.getDatabase()) .catalog(context.getDefaultCatalog()) .fragmentInstanceInfos(info.getCoord().getFragmentInstanceInfos()) - .profile(info.getCoord().getQueryProfile()) + .profile(info.getCoord().getExecutionProfile().getExecutionProfile()) .isReportSucc(context.getSessionVariable().enableProfile()).build(); querySet.put(queryIdStr, item); } @@ -203,7 +199,7 @@ public final class QeProcessorImpl implements QeProcessor { } try { info.getCoord().updateFragmentExecStatus(params); - if (info.getCoord().getProfileWriter() != null && params.isSetProfile()) { + if (params.isSetProfile()) { writeProfileExecutor.submit(new WriteProfileTask(params, info)); } } catch (Exception e) { @@ -276,10 +272,8 @@ public final class QeProcessorImpl implements QeProcessor { return; } - ProfileWriter profileWriter = info.getCoord().getProfileWriter(); - if (profileWriter != null) { - profileWriter.writeProfile(false); - } + ExecutionProfile executionProfile = info.getCoord().getExecutionProfile(); + executionProfile.update(-1, false); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c70554b1c7..d77d21e8b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -82,14 +82,13 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; -import org.apache.doris.common.Version; +import org.apache.doris.common.profile.Profile; +import org.apache.doris.common.profile.SummaryProfile; +import org.apache.doris.common.profile.SummaryProfile.SummaryBuilder; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LiteralUtils; import org.apache.doris.common.util.MetaLockUtils; -import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.ProfileWriter; -import org.apache.doris.common.util.QueryPlannerProfile; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.ProfileManager.ProfileType; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; @@ -176,7 +175,7 @@ import java.util.stream.Collectors; // Do one COM_QUERY process. // first: Parse receive byte array to statement struct. // second: Do handle function for statement. -public class StmtExecutor implements ProfileWriter { +public class StmtExecutor { private static final Logger LOG = LogManager.getLogger(StmtExecutor.class); private static final AtomicLong STMT_ID_GENERATOR = new AtomicLong(0); @@ -189,11 +188,7 @@ public class StmtExecutor implements ProfileWriter { private OriginStatement originStmt; private StatementBase parsedStmt; private Analyzer analyzer; - private RuntimeProfile profile; - private RuntimeProfile summaryProfile; - private RuntimeProfile plannerRuntimeProfile; - private volatile boolean isFinishedProfile = false; - private String queryType = "Query"; + private ProfileType profileType = ProfileType.QUERY; private volatile Coordinator coord = null; private MasterOpExecutor masterOpExecutor = null; private RedirectStatus redirectStatus = null; @@ -202,12 +197,13 @@ public class StmtExecutor implements ProfileWriter { private ShowResultSet proxyResultSet = null; private Data.PQueryStatistics.Builder statisticsForAuditLog; private boolean isCached; - private QueryPlannerProfile plannerProfile = new QueryPlannerProfile(); private String stmtName; private PrepareStmt prepareStmt = null; private String mysqlLoadId; // Distinguish from prepare and execute command private boolean isExecuteStmt = false; + // The profile of this execution + private final Profile profile; // The result schema if "dry_run_query" is true. // Only one column to indicate the real return row numbers. @@ -222,8 +218,10 @@ public class StmtExecutor implements ProfileWriter { this.isProxy = isProxy; this.statementContext = new StatementContext(context, originStmt); this.context.setStatementContext(statementContext); + this.profile = new Profile("Query", this.context.getSessionVariable().enableProfile); } + // for test public StmtExecutor(ConnectContext context, String stmt) { this(context, new OriginStatement(stmt, 0), false); this.stmtName = stmt; @@ -246,6 +244,7 @@ public class StmtExecutor implements ProfileWriter { this.statementContext.setParsedStatement(parsedStmt); } this.context.setStatementContext(statementContext); + this.profile = new Profile("Query", context.getSessionVariable().enableProfile()); } private static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException { @@ -269,74 +268,31 @@ public class StmtExecutor implements ProfileWriter { return row.build(); } - // At the end of query execution, we begin to add up profile - private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) { - RuntimeProfile queryProfile; - // when a query hits the sql cache, `coord` is null. - if (coord == null) { - queryProfile = new RuntimeProfile("Execution Profile " + DebugUtil.printId(context.queryId())); - } else { - queryProfile = coord.getQueryProfile(); - } - if (profile == null) { - profile = new RuntimeProfile("Query"); - summaryProfile = new RuntimeProfile("Summary"); - profile.addChild(summaryProfile); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(context.getStartTime())); - updateSummaryProfile(waiteBeReport); - for (Map.Entry<String, String> entry : getSummaryInfo().entrySet()) { - summaryProfile.addInfoString(entry.getKey(), entry.getValue()); - } - summaryProfile.addInfoString(ProfileManager.TRACE_ID, context.getSessionVariable().getTraceId()); - plannerRuntimeProfile = new RuntimeProfile("Execution Summary"); - summaryProfile.addChild(plannerRuntimeProfile); - profile.addChild(queryProfile); - } else { - updateSummaryProfile(waiteBeReport); - } - plannerProfile.initRuntimeProfile(plannerRuntimeProfile); - - queryProfile.getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(plannerProfile.getQueryBeginTime())); - endProfile(waiteBeReport); - } - - private void endProfile(boolean waitProfileDone) { - if (context != null && context.getSessionVariable().enableProfile() && coord != null) { - coord.endProfile(waitProfileDone); - } - } - - private void updateSummaryProfile(boolean waiteBeReport) { - Preconditions.checkNotNull(summaryProfile); + private Map<String, String> getSummaryInfo(boolean isFinished) { long currentTimestamp = System.currentTimeMillis(); - long totalTimeMs = currentTimestamp - context.getStartTime(); - summaryProfile.addInfoString(ProfileManager.END_TIME, - waiteBeReport ? TimeUtils.longToTimeString(currentTimestamp) : "N/A"); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, - !waiteBeReport && context.getState().getStateType().equals(MysqlStateType.OK) ? "RUNNING" : - context.getState().toString()); - } - - private Map<String, String> getSummaryInfo() { - Map<String, String> infos = Maps.newLinkedHashMap(); - infos.put(ProfileManager.JOB_ID, "N/A"); - infos.put(ProfileManager.QUERY_ID, DebugUtil.printId(context.queryId())); - infos.put(ProfileManager.QUERY_TYPE, queryType); - infos.put(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); - infos.put(ProfileManager.USER, context.getQualifiedUser()); - infos.put(ProfileManager.DEFAULT_DB, context.getDatabase()); - infos.put(ProfileManager.SQL_STATEMENT, originStmt.originStmt); - infos.put(ProfileManager.IS_CACHED, isCached ? "Yes" : "No"); - - Map<String, Integer> beToInstancesNum = - coord == null ? Maps.newTreeMap() : coord.getBeToInstancesNum(); - infos.put(ProfileManager.TOTAL_INSTANCES_NUM, - String.valueOf(beToInstancesNum.values().stream().reduce(0, Integer::sum))); - infos.put(ProfileManager.INSTANCES_NUM_PER_BE, beToInstancesNum.toString()); - infos.put(ProfileManager.PARALLEL_FRAGMENT_EXEC_INSTANCE, - String.valueOf(context.sessionVariable.parallelExecInstanceNum)); - return infos; + SummaryBuilder builder = new SummaryBuilder(); + builder.profileId(DebugUtil.printId(context.queryId())); + builder.taskType(profileType.name()); + builder.startTime(TimeUtils.longToTimeString(context.getStartTime())); + if (isFinished) { + builder.endTime(TimeUtils.longToTimeString(currentTimestamp)); + builder.totalTime(DebugUtil.getPrettyStringMs(currentTimestamp - context.getStartTime())); + } + builder.taskState(!isFinished && context.getState().getStateType().equals(MysqlStateType.OK) ? "RUNNING" + : context.getState().toString()); + builder.user(context.getQualifiedUser()); + builder.defaultDb(context.getDatabase()); + builder.sqlStatement(originStmt.originStmt); + builder.isCached(isCached ? "Yes" : "No"); + + Map<String, Integer> beToInstancesNum = coord == null ? Maps.newTreeMap() : coord.getBeToInstancesNum(); + builder.totalInstancesNum(String.valueOf(beToInstancesNum.values().stream().reduce(0, Integer::sum))); + builder.instancesNumPerBe( + beToInstancesNum.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue()) + .collect(Collectors.joining(","))); + builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.parallelExecInstanceNum)); + builder.traceId(context.getSessionVariable().getTraceId()); + return builder.build(); } public void addProfileToSpan() { @@ -344,7 +300,7 @@ public class StmtExecutor implements ProfileWriter { if (!span.isRecording()) { return; } - for (Map.Entry<String, String> entry : getSummaryInfo().entrySet()) { + for (Map.Entry<String, String> entry : getSummaryInfo(true).entrySet()) { span.setAttribute(entry.getKey(), entry.getValue()); } } @@ -490,7 +446,7 @@ public class StmtExecutor implements ProfileWriter { LOG.info("Nereids start to execute query:\n {}", originStmt.originStmt); context.setQueryId(queryId); context.setStartTime(); - plannerProfile.setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); parseByNereids(); Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, @@ -549,7 +505,7 @@ public class StmtExecutor implements ProfileWriter { if (checkBlockRules()) { return; } - plannerProfile.setQueryPlanFinishTime(); + profile.getSummaryProfile().setQueryPlanFinishTime(); handleQueryWithRetry(queryId); } } @@ -595,7 +551,7 @@ public class StmtExecutor implements ProfileWriter { // The final profile report occurs after be returns the query data, and the profile cannot be // received after unregisterQuery(), causing the instance profile to be lost, so we should wait // for the profile before unregisterQuery(). - endProfile(true); + updateProfile(true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } } @@ -610,7 +566,7 @@ public class StmtExecutor implements ProfileWriter { public void executeByLegacy(TUniqueId queryId) throws Exception { context.setStartTime(); - plannerProfile.setQueryBeginTime(); + profile.getSummaryProfile().setQueryBeginTime(); context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); context.setQueryId(queryId); // set isQuery first otherwise this state will be lost if some error occurs @@ -687,10 +643,10 @@ public class StmtExecutor implements ProfileWriter { handleCtasStmt(); } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InsertStmt is its subclass try { - handleInsertStmt(); if (!((InsertStmt) parsedStmt).getQueryStmt().isExplain()) { - queryType = "Load"; + profileType = ProfileType.LOAD; } + handleInsertStmt(); } catch (Throwable t) { LOG.warn("handle insert stmt fail: {}", t.getMessage()); // the transaction of this insert may already begin, we will abort it at outer finally block. @@ -801,27 +757,18 @@ public class StmtExecutor implements ProfileWriter { } } - @Override - public void writeProfile(boolean isLastWriteProfile) { + public void updateProfile(boolean isFinished) { if (!context.getSessionVariable().enableProfile()) { return; } - synchronized (writeProfileLock) { - if (isFinishedProfile) { - return; - } - initProfile(plannerProfile, isLastWriteProfile); - profile.computeTimeInChildProfile(); - ProfileManager.getInstance().pushProfile(profile); - isFinishedProfile = isLastWriteProfile; - } + profile.update(context.startTime, getSummaryInfo(isFinished), isFinished); } // Analyze one statement to structure in memory. public void analyze(TQueryOptions tQueryOptions) throws UserException { if (LOG.isDebugEnabled()) { - LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", - context.getStmtId(), context.getForwardedStmtId()); + LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", context.getStmtId(), + context.getForwardedStmtId()); } parseByLegacy(); @@ -1071,15 +1018,12 @@ public class StmtExecutor implements ProfileWriter { } } } - plannerProfile.setQueryAnalysisFinishTime(); + profile.getSummaryProfile().setQueryAnalysisFinishTime(); planner = new OriginalPlanner(analyzer); if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) { planner.plan(parsedStmt, tQueryOptions); } - // TODO(zc): - // Preconditions.checkState(!analyzer.hasUnassignedConjuncts()); - - plannerProfile.setQueryPlanFinishTime(); + profile.getSummaryProfile().setQueryPlanFinishTime(); } private void resetAnalyzerAndStmt() { @@ -1333,7 +1277,7 @@ public class StmtExecutor implements ProfileWriter { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - coord.setProfileWriter(this); + profile.addExecutionProfile(coord.getExecutionProfile()); Span queryScheduleSpan = context.getTracer().spanBuilder("query schedule").setParent(Context.current()).startSpan(); try (Scope scope = queryScheduleSpan.makeCurrent()) { @@ -1344,15 +1288,15 @@ public class StmtExecutor implements ProfileWriter { } finally { queryScheduleSpan.end(); } - plannerProfile.setQueryScheduleFinishTime(); - writeProfile(false); + profile.getSummaryProfile().setQueryScheduleFinishTime(); + updateProfile(false); Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan(); try (Scope scope = fetchResultSpan.makeCurrent()) { while (true) { // register the fetch result time. - plannerProfile.setTempStartTime(); + profile.getSummaryProfile().setTempStartTime(); batch = coord.getNext(); - plannerProfile.freshFetchResultConsumeTime(); + profile.getSummaryProfile().freshFetchResultConsumeTime(); // for outfile query, there will be only one empty batch send back with eos flag if (batch.getBatch() != null) { @@ -1361,7 +1305,7 @@ public class StmtExecutor implements ProfileWriter { } // register send field result time. - plannerProfile.setTempStartTime(); + profile.getSummaryProfile().setTempStartTime(); // For some language driver, getting error packet after fields packet // will be recognized as a success result // so We need to send fields after first batch arrived @@ -1376,7 +1320,7 @@ public class StmtExecutor implements ProfileWriter { for (ByteBuffer row : batch.getBatch().getRows()) { channel.sendOnePacket(row); } - plannerProfile.freshWriteResultConsumeTime(); + profile.getSummaryProfile().freshWriteResultConsumeTime(); context.updateReturnRows(batch.getBatch().getRows().size()); context.setResultAttachedInfo(batch.getBatch().getAttachedInfos()); } @@ -1413,7 +1357,7 @@ public class StmtExecutor implements ProfileWriter { statisticsForAuditLog = batch.getQueryStatistics() == null ? null : batch.getQueryStatistics().toBuilder(); context.getState().setEof(); - plannerProfile.setQueryFetchResultFinishTime(); + profile.getSummaryProfile().setQueryFetchResultFinishTime(); } catch (Exception e) { // notify all be cancel runing fragment // in some case may block all fragment handle threads @@ -1686,6 +1630,7 @@ public class StmtExecutor implements ProfileWriter { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict()); coord.setQueryType(TQueryType.LOAD); + profile.addExecutionProfile(coord.getExecutionProfile()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord); @@ -1775,7 +1720,7 @@ public class StmtExecutor implements ProfileWriter { */ throwable = t; } finally { - endProfile(true); + updateProfile(true); QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); } @@ -2210,6 +2155,7 @@ public class StmtExecutor implements ProfileWriter { planner.getFragments(); RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); + profile.addExecutionProfile(coord.getExecutionProfile()); try { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); @@ -2217,7 +2163,6 @@ public class StmtExecutor implements ProfileWriter { LOG.warn(e.getMessage(), e); } - coord.setProfileWriter(this); Span queryScheduleSpan = context.getTracer() .spanBuilder("internal SQL schedule").setParent(Context.current()).startSpan(); try (Scope scope = queryScheduleSpan.makeCurrent()) { @@ -2273,8 +2218,8 @@ public class StmtExecutor implements ProfileWriter { return resultRows; } - public QueryPlannerProfile getPlannerProfile() { - return plannerProfile; + public SummaryProfile getSummaryProfile() { + return profile.getSummaryProfile(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index ba0843d5e6..9f96bd689b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -21,11 +21,6 @@ import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.common.Version; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.ExportFailMsg; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportJob.JobState; @@ -48,10 +43,6 @@ public class ExportExportingTask extends MasterTask { private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class); protected final ExportJob job; - - private RuntimeProfile profile = new RuntimeProfile("Export"); - private List<RuntimeProfile> fragmentProfiles = Lists.newArrayList(); - private StmtExecutor stmtExecutor; public ExportExportingTask(ExportJob job) { @@ -123,13 +114,11 @@ public class ExportExportingTask extends MasterTask { LOG.info("Exporting task progress is {}%, export job: {}", progress, job.getId()); if (isFailed) { - registerProfile(); job.cancel(errorMsg.getCancelType(), errorMsg.getMsg()); LOG.warn("Exporting task failed because Exception: {}", errorMsg.getMsg()); return; } - registerProfile(); if (job.finish(outfileInfoList)) { LOG.info("export job success. job: {}", job); // TODO(ftw): when we implement exporting tablet one by one, we should release snapshot here @@ -172,38 +161,6 @@ public class ExportExportingTask extends MasterTask { return outfileInfo; } - private void initProfile() { - profile = new RuntimeProfile("ExportJob"); - RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); - summaryProfile.addInfoString(ProfileManager.JOB_ID, String.valueOf(job.getId())); - summaryProfile.addInfoString(ProfileManager.QUERY_ID, job.getQueryId()); - summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(job.getStartTimeMs())); - - long currentTimestamp = System.currentTimeMillis(); - long totalTimeMs = currentTimestamp - job.getStartTimeMs(); - summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(currentTimestamp)); - summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); - - summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Export"); - summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString()); - summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); - summaryProfile.addInfoString(ProfileManager.USER, job.getQualifiedUser()); - summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, String.valueOf(job.getDbId())); - summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, job.getSql()); - profile.addChild(summaryProfile); - } - - private void registerProfile() { - if (!job.getEnableProfile()) { - return; - } - initProfile(); - for (RuntimeProfile p : fragmentProfiles) { - profile.addChild(p); - } - ProfileManager.getInstance().pushProfile(profile); - } - private void handleInQueueState() { long dbId = job.getDbId(); Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java index 5e1c396f57..15b4175759 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/RuntimeProfileTest.java @@ -42,9 +42,9 @@ public class RuntimeProfileTest { RuntimeProfile profile1 = new RuntimeProfile("profile1"); RuntimeProfile profile2 = new RuntimeProfile("profile2"); RuntimeProfile profile3 = new RuntimeProfile("profile3"); - profile1.getCounterTotalTime().setValue(1); - profile2.getCounterTotalTime().setValue(3); - profile3.getCounterTotalTime().setValue(2); + profile1.getCounterTotalTime().setValue(TUnit.TIME_NS, 1); + profile2.getCounterTotalTime().setValue(TUnit.TIME_NS, 3); + profile3.getCounterTotalTime().setValue(TUnit.TIME_NS, 2); profile.addChild(profile1); profile.addChild(profile2); profile.addChild(profile3); @@ -102,7 +102,7 @@ public class RuntimeProfileTest { profile.addCounter("key", TUnit.UNIT, ""); Assert.assertNotNull(profile.getCounterMap().get("key")); Assert.assertNull(profile.getCounterMap().get("key2")); - profile.getCounterMap().get("key").setValue(1); + profile.getCounterMap().get("key").setValue(TUnit.TIME_NS, 1); Assert.assertEquals(profile.getCounterMap().get("key").getValue(), 1); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java index 5ba3c16657..8e0db1e0dc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/TimeUtilsTest.java @@ -55,8 +55,8 @@ public class TimeUtilsTest { @Test public void testNormal() { Assert.assertNotNull(TimeUtils.getCurrentFormatTime()); - Assert.assertNotNull(TimeUtils.getStartTime()); - Assert.assertTrue(TimeUtils.getEstimatedTime(0L) > 0); + Assert.assertNotNull(TimeUtils.getStartTimeMs()); + Assert.assertTrue(TimeUtils.getElapsedTimeMs(0L) > 0); Assert.assertEquals(-62167420800000L, TimeUtils.MIN_DATE.getTime()); Assert.assertEquals(253402185600000L, TimeUtils.MAX_DATE.getTime()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 526fad2fd7..c64a616aaf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -33,7 +33,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.profile.Profile; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo; @@ -358,11 +358,9 @@ public class BrokerLoadJobTest { fileGroups.add(brokerFileGroup); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - RuntimeProfile jobProfile = new RuntimeProfile("test"); - LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, - 100, 100, false, 100, callback, "", - 100, 1, 1, true, jobProfile, false, - false); + Profile jobProfile = new Profile("test", false); + LoadLoadingTask task = new LoadLoadingTask(database, olapTable, brokerDesc, fileGroups, 100, 100, false, 100, + callback, "", 100, 1, 1, true, jobProfile, false, false); try { UserIdentity userInfo = new UserIdentity("root", "localhost"); userInfo.setIsAnalyzed(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index eea4788f5d..2ba2291bc4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -182,14 +182,6 @@ public class SessionVariablesTest extends TestWithFeService { } }; - new Expectations(profileManager) { - { - profileManager.pushProfile((RuntimeProfile) any); - // if enable_profile=true, method pushProfile will be called once - times = 1; - } - }; - ExportExportingTask task = new ExportExportingTask(job); task.run(); Assertions.assertTrue(job.isFinalState()); @@ -197,7 +189,6 @@ public class SessionVariablesTest extends TestWithFeService { e.printStackTrace(); Assertions.fail(e.getMessage()); } - } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 24d46ce1ca..507102fb0d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -33,7 +33,6 @@ import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Env; import org.apache.doris.common.DdlException; import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.MysqlChannel; @@ -215,13 +214,6 @@ public class StmtExecutorTest { coordinator.exec(); minTimes = 0; - coordinator.endProfile(); - minTimes = 0; - - coordinator.getQueryProfile(); - minTimes = 0; - result = new RuntimeProfile(); - coordinator.getNext(); minTimes = 0; result = new RowBatch(); diff --git a/regression-test/suites/query_profile/test_profile.groovy b/regression-test/suites/query_profile/test_profile.groovy index 783c60b3b4..84779270eb 100644 --- a/regression-test/suites/query_profile/test_profile.groovy +++ b/regression-test/suites/query_profile/test_profile.groovy @@ -82,10 +82,8 @@ suite('test_profile') { def insert_order = len - i - 1 def stmt_query_info = obj.data.rows[i] - assertNotNull(stmt_query_info["Query ID"]) - assertNotEquals(stmt_query_info["Query ID"], "N/A") - assertNotNull(stmt_query_info["Detail"]) - assertNotEquals(stmt_query_info["Detail"], "N/A") + assertNotNull(stmt_query_info["Profile ID"]) + assertNotEquals(stmt_query_info["Profile ID"], "N/A") assertEquals(stmt_query_info['Sql Statement'].toString(), """ INSERT INTO ${table} values (${id_data[insert_order]}, "${value_data[insert_order]}") """.toString()) @@ -116,7 +114,7 @@ suite('test_profile') { for(int i = 0 ; i < QUERY_NUM ; i++){ def insert_order = QUERY_NUM - i - 1 def current_obj = show_query_profile_obj[i] - def stmt_query_info = current_obj[4] + def stmt_query_info = current_obj[8] assertNotEquals(current_obj[1].toString(), "N/A".toString()) assertEquals(stmt_query_info.toString(), """ SELECT * FROM ${table} WHERE cost ${ops[insert_order]} ${nums[insert_order]} """.toString()) } @@ -134,10 +132,8 @@ suite('test_profile') { def insert_order = QUERY_NUM - i - 1 def stmt_query_info = obj.data.rows[i] - assertNotNull(stmt_query_info["Query ID"]) - assertNotEquals(stmt_query_info["Query ID"].toString(), "N/A".toString()) - assertNotNull(stmt_query_info["Detail"]) - assertNotEquals(stmt_query_info["Detail"], "N/A") + assertNotNull(stmt_query_info["Profile ID"]) + assertNotEquals(stmt_query_info["Profile ID"].toString(), "N/A".toString()) assertEquals(stmt_query_info['Sql Statement'].toString(), """ SELECT * FROM ${table} WHERE cost ${ops[insert_order]} ${nums[insert_order]} """.toString()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org