This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit a90a1a76f168709d7695efaf1cbb4b49d2abc9ce Author: yiguolei <676222...@qq.com> AuthorDate: Fri Mar 15 17:41:47 2024 +0800 [bugfix](profile) support multi execution profile for brokerload (#32280) The bug is introduced by #27184 Profile Format is : Summary MergedProfile ExecutionProfile1 ExecutionProfile2 ... There maybe multiple execution profiles for broker load. --- .../doris/common/profile/AggregatedProfile.java | 41 ----- .../org/apache/doris/common/profile/Profile.java | 40 +++-- .../apache/doris/load/loadv2/LoadLoadingTask.java | 2 +- .../trees/plans/commands/InsertExecutor.java | 2 +- .../commands/insert/AbstractInsertExecutor.java | 194 +++++++++++++++++++++ .../java/org/apache/doris/qe/StmtExecutor.java | 6 +- 6 files changed, 222 insertions(+), 63 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java deleted file mode 100644 index f8481b21e65..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.profile; - -import org.apache.doris.common.util.RuntimeProfile; - -import java.util.Map; - -/** -* AggregatedProfile is part of a query profile. -* It contains the aggregated information of a query. -*/ -public class AggregatedProfile { - - public static final String PROFILE_NAME = "MergedProfile"; - private ExecutionProfile executionProfile; - - public AggregatedProfile(RuntimeProfile rootProfile, ExecutionProfile executionProfile) { - this.executionProfile = executionProfile; - } - - public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> planNodeMap) { - return executionProfile.getAggregatedFragmentsProfile(planNodeMap); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java index 19a51a1aaa6..5f3ed601630 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java @@ -21,11 +21,13 @@ import org.apache.doris.common.util.ProfileManager; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.planner.Planner; +import com.google.common.collect.Lists; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; /** @@ -33,20 +35,23 @@ import java.util.Map; * following structure: root profile: // summary of this profile, such as start * time, end time, query id, etc. [SummaryProfile] // each execution profile is * a complete execution of a query, a job may contain multiple queries. - * [List<ExecutionProfile>] + * [List<ExecutionProfile>]. + * There maybe multi execution profiles for one job, for example broker load job. + * It will create one execution profile for every single load task. * * SummaryProfile: Summary: Execution Summary: * * - * ExecutionProfile: Fragment 0: Fragment 1: ... + * ExecutionProfile1: Fragment 0: Fragment 1: ... + * ExecutionProfile2: Fragment 0: Fragment 1: ... + * */ public class Profile { private static final Logger LOG = LogManager.getLogger(Profile.class); private static final int MergedProfileLevel = 1; private RuntimeProfile rootProfile; private SummaryProfile summaryProfile; - private AggregatedProfile aggregatedProfile; - private ExecutionProfile executionProfile; + private List<ExecutionProfile> executionProfiles = Lists.newArrayList(); private boolean isFinished; private Map<Integer, String> planNodeMap; @@ -59,14 +64,13 @@ public class Profile { this.isFinished = !isEnable; } - public void setExecutionProfile(ExecutionProfile executionProfile) { + public void addExecutionProfile(ExecutionProfile executionProfile) { if (executionProfile == null) { LOG.warn("try to set a null excecution profile, it is abnormal", new Exception()); return; } - this.executionProfile = executionProfile; - this.executionProfile.addToProfileAsChild(rootProfile); - this.aggregatedProfile = new AggregatedProfile(rootProfile, executionProfile); + this.executionProfiles.add(executionProfile); + executionProfile.addToProfileAsChild(rootProfile); } public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished, @@ -75,12 +79,10 @@ public class Profile { if (this.isFinished) { return; } - if (executionProfile == null) { - // Sometimes execution profile is not set - return; - } summaryProfile.update(summaryInfo); - executionProfile.update(startTime, isFinished); + for (ExecutionProfile executionProfile : executionProfiles) { + executionProfile.update(startTime, isFinished); + } rootProfile.computeTimeInProfile(); // Nerids native insert not set planner, so it is null if (planner != null) { @@ -109,18 +111,22 @@ public class Profile { // add summary to builder summaryProfile.prettyPrint(builder); LOG.info(builder.toString()); - if (this.profileLevel == MergedProfileLevel) { + // Only generate merged profile for select, insert into select. + // Not support broker load now. + if (this.profileLevel == MergedProfileLevel && this.executionProfiles.size() == 1) { try { builder.append("\n MergedProfile \n"); - aggregatedProfile.getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " "); + this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder, " "); } catch (Throwable aggProfileException) { LOG.warn("build merged simple profile failed", aggProfileException); builder.append("build merged simple profile failed"); } } try { - builder.append("\n"); - executionProfile.getExecutionProfile().prettyPrint(builder, ""); + for (ExecutionProfile executionProfile : executionProfiles) { + builder.append("\n"); + executionProfile.getExecutionProfile().prettyPrint(builder, ""); + } } catch (Throwable aggProfileException) { LOG.warn("build profile failed", aggProfileException); builder.append("build profile failed"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 8fbabea8629..94fb49d6c85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -143,7 +143,7 @@ public class LoadLoadingTask extends LoadTask { Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), loadId, planner.getDescTable(), planner.getFragments(), planner.getScanNodes(), planner.getTimezone(), loadZeroTolerance); if (this.jobProfile != null) { - this.jobProfile.setExecutionProfile(curCoordinator.getExecutionProfile()); + this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile()); } curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java index da4a1c6ac68..295478349cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java @@ -222,7 +222,7 @@ public class InsertExecutor { try { coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); coordinator.setQueryType(TQueryType.LOAD); - executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile()); + executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); coordinator.exec(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java new file mode 100644 index 00000000000..77eeef22ebf --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.EnvFactory; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; +import org.apache.doris.planner.DataSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.Coordinator; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.qe.QeProcessorImpl.QueryInfo; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.task.LoadEtlTask; +import org.apache.doris.thrift.TQueryType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Abstract insert executor. + * The derived class should implement the abstract method for certain type of target table + */ +public abstract class AbstractInsertExecutor { + private static final Logger LOG = LogManager.getLogger(AbstractInsertExecutor.class); + protected long jobId; + protected final ConnectContext ctx; + protected final Coordinator coordinator; + protected String labelName; + protected final DatabaseIf database; + protected final TableIf table; + protected final long createTime = System.currentTimeMillis(); + protected long loadedRows = 0; + protected int filteredRows = 0; + + protected String errMsg = ""; + protected Optional<InsertCommandContext> insertCtx; + + /** + * Constructor + */ + public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner, + Optional<InsertCommandContext> insertCtx) { + this.ctx = ctx; + this.coordinator = EnvFactory.getInstance().createCoordinator(ctx, null, planner, ctx.getStatsErrorEstimator()); + this.labelName = labelName; + this.table = table; + this.database = table.getDatabase(); + this.insertCtx = insertCtx; + } + + public Coordinator getCoordinator() { + return coordinator; + } + + public DatabaseIf getDatabase() { + return database; + } + + public TableIf getTable() { + return table; + } + + public String getLabelName() { + return labelName; + } + + /** + * begin transaction if necessary + */ + public abstract void beginTransaction(); + + /** + * finalize sink to complete enough info for sink execution + */ + protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink); + + /** + * Do something before exec + */ + protected abstract void beforeExec(); + + /** + * Do something after exec finished + */ + protected abstract void onComplete() throws UserException; + + /** + * Do something when exec throw exception + */ + protected abstract void onFail(Throwable t); + + /** + * Do something after exec + */ + protected abstract void afterExec(StmtExecutor executor); + + protected final void execImpl(StmtExecutor executor, long jobId) throws Exception { + String queryId = DebugUtil.printId(ctx.queryId()); + coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict()); + coordinator.setQueryType(TQueryType.LOAD); + executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile()); + QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), executor.getOriginStmtInString(), coordinator); + QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo); + coordinator.exec(); + int execTimeout = ctx.getExecTimeout(); + if (LOG.isDebugEnabled()) { + LOG.debug("insert [{}] with query id {} execution timeout is {}", labelName, queryId, execTimeout); + } + boolean notTimeout = coordinator.join(execTimeout); + if (!coordinator.isDone()) { + coordinator.cancel(); + if (notTimeout) { + errMsg = coordinator.getExecStatus().getErrorMsg(); + ErrorReport.reportDdlException("there exists unhealthy backend. " + + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); + } + } + if (!coordinator.getExecStatus().ok()) { + errMsg = coordinator.getExecStatus().getErrorMsg(); + LOG.warn("insert [{}] with query id {} failed, {}", labelName, queryId, errMsg); + ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT); + } + if (LOG.isDebugEnabled()) { + LOG.debug("insert [{}] with query id {} delta files is {}", + labelName, queryId, coordinator.getDeltaUrls()); + } + if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { + loadedRows = Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + } + if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) { + filteredRows = Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)); + } + } + + private boolean checkStrictMode() { + // if in strict mode, insert will fail if there are filtered rows + if (ctx.getSessionVariable().getEnableInsertStrict()) { + if (filteredRows > 0) { + ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT, + "Insert has filtered data in strict mode, tracking_url=" + coordinator.getTrackingUrl()); + return false; + } + } + return true; + } + + /** + * execute insert txn for insert into select command. + */ + public void executeSingleInsert(StmtExecutor executor, long jobId) { + beforeExec(); + try { + execImpl(executor, jobId); + if (!checkStrictMode()) { + return; + } + onComplete(); + } catch (Throwable t) { + onFail(t); + return; + } finally { + executor.updateProfile(true); + QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId()); + } + afterExec(executor); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 063cbf0ad71..b6a8c882fae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1576,7 +1576,7 @@ public class StmtExecutor { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); - profile.setExecutionProfile(coord.getExecutionProfile()); + profile.addExecutionProfile(coord.getExecutionProfile()); coordBase = coord; } @@ -2053,7 +2053,7 @@ public class StmtExecutor { coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict()); coord.setQueryType(TQueryType.LOAD); - profile.setExecutionProfile(coord.getExecutionProfile()); + profile.addExecutionProfile(coord.getExecutionProfile()); QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), this.getOriginStmtInString(), coord); QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), queryInfo); @@ -2875,7 +2875,7 @@ public class StmtExecutor { } RowBatch batch; coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator()); - profile.setExecutionProfile(coord.getExecutionProfile()); + profile.addExecutionProfile(coord.getExecutionProfile()); try { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org