This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new b640991 [Enhance] Add profile for load job (#5052) b640991 is described below commit b640991e43147b8659045cd0a976de047aaf4431 Author: Mingyu Chen <morningman....@gmail.com> AuthorDate: Wed Dec 16 23:52:10 2020 +0800 [Enhance] Add profile for load job (#5052) Add viewable profile for broker load. Similar to the query profile, the user can submit the import job by setting the session variable is_report_success to true, and then view the running profile of the job on the FE web page for easy analysis and debugging. --- .../load-data/broker-load-manual.md | 8 +++ .../load-data/broker-load-manual.md | 8 +++ .../apache/doris/common/util/RuntimeProfile.java | 19 +++++-- .../apache/doris/load/loadv2/BrokerLoadJob.java | 58 ++++++++++++++++++++-- .../apache/doris/load/loadv2/LoadLoadingTask.java | 25 +++++++++- .../doris/load/loadv2/BrokerLoadJobTest.java | 35 +++++++------ 6 files changed, 127 insertions(+), 26 deletions(-) diff --git a/docs/en/administrator-guide/load-data/broker-load-manual.md b/docs/en/administrator-guide/load-data/broker-load-manual.md index 6275576..f6dd7e1 100644 --- a/docs/en/administrator-guide/load-data/broker-load-manual.md +++ b/docs/en/administrator-guide/load-data/broker-load-manual.md @@ -456,6 +456,14 @@ We will only discuss the case of a single BE. If the user cluster has more than ``` +### Performance analysis + +You can execute `set is_report_success=true` to open the load job profile before submitting the import job. After the import job is completed, you can view the profile of the import job in the `Queris` tab of the FE web page. + +This profile can help analyze the running status of the import job. + +Currently, the profile can only be viewed after the job is successfully executed. + ### Complete examples Data situation: User data in HDFS, file address is hdfs://abc.com:8888/store_sales, HDFS authentication user name is root, password is password, data size is about 30G, hope to import into database bj_sales table store_sales. diff --git a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md index ff6b0f0..33969fd 100644 --- a/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md +++ b/docs/zh-CN/administrator-guide/load-data/broker-load-manual.md @@ -464,6 +464,14 @@ LoadFinishTime: 2019-07-27 11:50:16 注意:一般用户的环境可能达不到 10M/s 的速度,所以建议超过 500G 的文件都进行文件切分,再导入。 ``` + +### 性能分析 + +可以在提交 LOAD 作业前,先执行 `set is_report_success=true` 打开会话变量。然后提交导入作业。待导入作业完成后,可以在 FE 的 web 页面的 `Queris` 标签中查看到导入作业的 Profile。 + +这个 Profile 可以帮助分析导入作业的运行状态。 + +当前只有作业成功执行后,才能查看 Profile。 ### 完整例子 diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java index 8d10831..0eb78d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RuntimeProfile.java @@ -24,16 +24,17 @@ import org.apache.doris.thrift.TRuntimeProfileNode; import org.apache.doris.thrift.TRuntimeProfileTree; import org.apache.doris.thrift.TUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.Collections; import java.util.Comparator; import java.util.Formatter; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -58,7 +59,7 @@ public class RuntimeProfile { private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap(); private Map<String, TreeSet<String>> childCounterMap = Maps.newHashMap(); - private List<Pair<RuntimeProfile, Boolean>> childList = Lists.newArrayList(); + private LinkedList<Pair<RuntimeProfile, Boolean>> childList = Lists.newLinkedList(); private String name; @@ -318,6 +319,16 @@ public class RuntimeProfile { this.childList.add(pair); } + public void addFirstChild(RuntimeProfile child) { + if (child == null) { + return; + } + + this.childMap.put(child.name, child); + Pair<RuntimeProfile, Boolean> pair = Pair.create(child, true); + this.childList.addFirst(pair); + } + // Because the profile of summary and child fragment is not a real parent-child relationship // Each child profile needs to calculate the time proportion consumed by itself public void computeTimeInChildProfile() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index cba085f..aeae05b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -29,13 +29,18 @@ import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.ProfileManager; +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TUniqueId; @@ -44,12 +49,12 @@ import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + import java.util.List; import java.util.Map; import java.util.UUID; @@ -65,7 +70,12 @@ public class BrokerLoadJob extends BulkLoadJob { private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class); - // only for log replay + // Profile of this load job, including all tasks' profiles + private RuntimeProfile jobProfile; + // If set to true, the profile of load job with be pushed to ProfileManager + private boolean isReportSuccess = false; + + // for log replay and unit test public BrokerLoadJob() { super(); this.jobType = EtlJobType.BROKER; @@ -78,6 +88,9 @@ public class BrokerLoadJob extends BulkLoadJob { this.timeoutSecond = Config.broker_load_default_timeout_second; this.brokerDesc = brokerDesc; this.jobType = EtlJobType.BROKER; + if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isReportSucc()) { + isReportSuccess = true; + } } @Override @@ -173,6 +186,7 @@ public class BrokerLoadJob extends BulkLoadJob { private void createLoadingTask(Database db, BrokerPendingTaskAttachment attachment) throws UserException { // divide job into broker loading task by table List<LoadLoadingTask> newLoadingTasks = Lists.newArrayList(); + this.jobProfile = new RuntimeProfile("BrokerLoadJob " + id + ". " + label); db.readLock(); try { for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : fileGroupAggInfo.getAggKeyToFileGroups().entrySet()) { @@ -193,7 +207,8 @@ public class BrokerLoadJob extends BulkLoadJob { // Generate loading task and init the plan of task LoadLoadingTask task = new LoadLoadingTask(db, table, brokerDesc, brokerFileGroups, getDeadlineMs(), execMemLimit, - strictMode, transactionId, this, timezone, timeoutSecond); + strictMode, transactionId, this, timezone, timeoutSecond, + isReportSuccess ? jobProfile : null); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); task.init(loadId, attachment.getFileStatusByTable(aggKey), @@ -300,6 +315,32 @@ public class BrokerLoadJob extends BulkLoadJob { } } + private void writeProfile() { + if (!isReportSuccess) { + return; + } + + RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); + summaryProfile.addInfoString(ProfileManager.QUERY_ID, String.valueOf(id)); + summaryProfile.addInfoString(ProfileManager.START_TIME, TimeUtils.longToTimeString(createTimestamp)); + summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(finishTimestamp)); + summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(finishTimestamp - createTimestamp)); + + summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load"); + summaryProfile.addInfoString(ProfileManager.QUERY_STATE, "N/A"); + summaryProfile.addInfoString(ProfileManager.USER, "N/A"); + summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, "N/A"); + summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, "N/A"); + summaryProfile.addInfoString(ProfileManager.IS_CACHED, "N/A"); + + // Add the summary profile to the first + jobProfile.addFirstChild(summaryProfile); + jobProfile.computeTimeInChildProfile(); + StringBuilder builder = new StringBuilder(); + jobProfile.prettyPrint(builder, ""); + ProfileManager.getInstance().pushProfile(jobProfile); + } + private void updateLoadingStatus(BrokerLoadingTaskAttachment attachment) { loadingStatus.replaceCounter(DPP_ABNORMAL_ALL, increaseCounter(DPP_ABNORMAL_ALL, attachment.getCounter(DPP_ABNORMAL_ALL))); @@ -327,4 +368,11 @@ public class BrokerLoadJob extends BulkLoadJob { } return String.valueOf(value); } + + @Override + public void afterVisible(TransactionState txnState, boolean txnOperated) { + super.afterVisible(txnState, txnOperated); + writeProfile(); + } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 33f0049..0e288ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -27,6 +27,8 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.RuntimeProfile; +import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.FailMsg; import org.apache.doris.qe.Coordinator; @@ -64,11 +66,15 @@ public class LoadLoadingTask extends LoadTask { private LoadingTaskPlanner planner; + private RuntimeProfile jobProfile; + private RuntimeProfile profile; + private long beginTime; + public LoadLoadingTask(Database db, OlapTable table, BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups, long jobDeadlineMs, long execMemLimit, boolean strictMode, long txnId, LoadTaskCallback callback, String timezone, - long timeoutS) { + long timeoutS, RuntimeProfile profile) { super(callback, TaskType.LOADING); this.db = db; this.table = table; @@ -82,6 +88,7 @@ public class LoadLoadingTask extends LoadTask { this.retryTime = 2; // 2 times is enough this.timezone = timezone; this.timeoutS = timeoutS; + this.jobProfile = profile; } public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException { @@ -100,6 +107,7 @@ public class LoadLoadingTask extends LoadTask { LOG.info("begin to execute loading task. load id: {} job: {}. db: {}, tbl: {}. left retry: {}", DebugUtil.printId(loadId), callback.getCallbackId(), db.getFullName(), table.getName(), retryTime); retryTime--; + beginTime = System.nanoTime(); executeOnce(); } @@ -148,6 +156,8 @@ public class LoadLoadingTask extends LoadTask { curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl(), TabletCommitInfo.fromThrift(curCoordinator.getCommitInfos())); + // Create profile of this task and add to the job profile. + createProfile(curCoordinator); } else { throw new LoadException(status.getErrorMsg()); } @@ -160,6 +170,19 @@ public class LoadLoadingTask extends LoadTask { return jobDeadlineMs - System.currentTimeMillis(); } + public void createProfile(Coordinator coord) { + if (jobProfile == null) { + // No need to gather profile + return; + } + // Summary profile + profile = new RuntimeProfile("LoadTask: " + DebugUtil.printId(loadId)); + coord.getQueryProfile().getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(beginTime)); + coord.endProfile(); + profile.addChild(coord.getQueryProfile()); + jobProfile.addChild(profile); + } + @Override public void updateRetryInfo() { super.updateRetryInfo(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 10a5732..c23cf95 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -17,22 +17,23 @@ package org.apache.doris.load.loadv2; +import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; -import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.catalog.BrokerTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.BrokerFileGroupAggInfo; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; @@ -45,29 +46,29 @@ import org.apache.doris.planner.BrokerScanNode; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.task.MasterTaskExecutor; +import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.UUID; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import mockit.Expectations; import mockit.Injectable; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; -import org.apache.doris.thrift.TUniqueId; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; public class BrokerLoadJobTest { @@ -358,8 +359,10 @@ public class BrokerLoadJobTest { fileGroups.add(brokerFileGroup); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + RuntimeProfile jobProfile = new RuntimeProfile("test"); LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups, - 100, 100,false, 100, callback, "", 100); + 100, 100, false, 100, callback, "", 100, + jobProfile); try { UserIdentity userInfo = new UserIdentity("root", "localhost"); userInfo.setIsAnalyzed(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org