This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a90a1a76f168709d7695efaf1cbb4b49d2abc9ce
Author: yiguolei <676222...@qq.com>
AuthorDate: Fri Mar 15 17:41:47 2024 +0800

    [bugfix](profile) support multi execution profile for brokerload (#32280)
    
    The bug is introduced by #27184
    Profile Format is :
    Summary
    MergedProfile
    ExecutionProfile1
    ExecutionProfile2
    ...
    
    There maybe multiple execution profiles for broker load.
---
 .../doris/common/profile/AggregatedProfile.java    |  41 -----
 .../org/apache/doris/common/profile/Profile.java   |  40 +++--
 .../apache/doris/load/loadv2/LoadLoadingTask.java  |   2 +-
 .../trees/plans/commands/InsertExecutor.java       |   2 +-
 .../commands/insert/AbstractInsertExecutor.java    | 194 +++++++++++++++++++++
 .../java/org/apache/doris/qe/StmtExecutor.java     |   6 +-
 6 files changed, 222 insertions(+), 63 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java
 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java
deleted file mode 100644
index f8481b21e65..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/AggregatedProfile.java
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.common.profile;
-
-import org.apache.doris.common.util.RuntimeProfile;
-
-import java.util.Map;
-
-/**
-* AggregatedProfile is part of a query profile.
-* It contains the aggregated information of a query.
-*/
-public class AggregatedProfile {
-
-    public static final String PROFILE_NAME = "MergedProfile";
-    private ExecutionProfile executionProfile;
-
-    public AggregatedProfile(RuntimeProfile rootProfile, ExecutionProfile 
executionProfile) {
-        this.executionProfile = executionProfile;
-    }
-
-    public RuntimeProfile getAggregatedFragmentsProfile(Map<Integer, String> 
planNodeMap) {
-        return executionProfile.getAggregatedFragmentsProfile(planNodeMap);
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 19a51a1aaa6..5f3ed601630 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -21,11 +21,13 @@ import org.apache.doris.common.util.ProfileManager;
 import org.apache.doris.common.util.RuntimeProfile;
 import org.apache.doris.planner.Planner;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -33,20 +35,23 @@ import java.util.Map;
  * following structure: root profile: // summary of this profile, such as start
  * time, end time, query id, etc. [SummaryProfile] // each execution profile is
  * a complete execution of a query, a job may contain multiple queries.
- * [List<ExecutionProfile>]
+ * [List<ExecutionProfile>].
+ * There maybe multi execution profiles for one job, for example broker load 
job.
+ * It will create one execution profile for every single load task.
  *
  * SummaryProfile: Summary: Execution Summary:
  *
  *
- * ExecutionProfile: Fragment 0: Fragment 1: ...
+ * ExecutionProfile1: Fragment 0: Fragment 1: ...
+ * ExecutionProfile2: Fragment 0: Fragment 1: ...
+ *
  */
 public class Profile {
     private static final Logger LOG = LogManager.getLogger(Profile.class);
     private static final int MergedProfileLevel = 1;
     private RuntimeProfile rootProfile;
     private SummaryProfile summaryProfile;
-    private AggregatedProfile aggregatedProfile;
-    private ExecutionProfile executionProfile;
+    private List<ExecutionProfile> executionProfiles = Lists.newArrayList();
     private boolean isFinished;
     private Map<Integer, String> planNodeMap;
 
@@ -59,14 +64,13 @@ public class Profile {
         this.isFinished = !isEnable;
     }
 
-    public void setExecutionProfile(ExecutionProfile executionProfile) {
+    public void addExecutionProfile(ExecutionProfile executionProfile) {
         if (executionProfile == null) {
             LOG.warn("try to set a null excecution profile, it is abnormal", 
new Exception());
             return;
         }
-        this.executionProfile = executionProfile;
-        this.executionProfile.addToProfileAsChild(rootProfile);
-        this.aggregatedProfile = new AggregatedProfile(rootProfile, 
executionProfile);
+        this.executionProfiles.add(executionProfile);
+        executionProfile.addToProfileAsChild(rootProfile);
     }
 
     public synchronized void update(long startTime, Map<String, String> 
summaryInfo, boolean isFinished,
@@ -75,12 +79,10 @@ public class Profile {
             if (this.isFinished) {
                 return;
             }
-            if (executionProfile == null) {
-                // Sometimes execution profile is not set
-                return;
-            }
             summaryProfile.update(summaryInfo);
-            executionProfile.update(startTime, isFinished);
+            for (ExecutionProfile executionProfile : executionProfiles) {
+                executionProfile.update(startTime, isFinished);
+            }
             rootProfile.computeTimeInProfile();
             // Nerids native insert not set planner, so it is null
             if (planner != null) {
@@ -109,18 +111,22 @@ public class Profile {
         // add summary to builder
         summaryProfile.prettyPrint(builder);
         LOG.info(builder.toString());
-        if (this.profileLevel == MergedProfileLevel) {
+        // Only generate merged profile for select, insert into select.
+        // Not support broker load now.
+        if (this.profileLevel == MergedProfileLevel && 
this.executionProfiles.size() == 1) {
             try {
                 builder.append("\n MergedProfile \n");
-                
aggregatedProfile.getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
 "     ");
+                
this.executionProfiles.get(0).getAggregatedFragmentsProfile(planNodeMap).prettyPrint(builder,
 "     ");
             } catch (Throwable aggProfileException) {
                 LOG.warn("build merged simple profile failed", 
aggProfileException);
                 builder.append("build merged simple profile failed");
             }
         }
         try {
-            builder.append("\n");
-            executionProfile.getExecutionProfile().prettyPrint(builder, "");
+            for (ExecutionProfile executionProfile : executionProfiles) {
+                builder.append("\n");
+                executionProfile.getExecutionProfile().prettyPrint(builder, 
"");
+            }
         } catch (Throwable aggProfileException) {
             LOG.warn("build profile failed", aggProfileException);
             builder.append("build  profile failed");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
index 8fbabea8629..94fb49d6c85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java
@@ -143,7 +143,7 @@ public class LoadLoadingTask extends LoadTask {
         Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), 
loadId, planner.getDescTable(),
                 planner.getFragments(), planner.getScanNodes(), 
planner.getTimezone(), loadZeroTolerance);
         if (this.jobProfile != null) {
-            
this.jobProfile.setExecutionProfile(curCoordinator.getExecutionProfile());
+            
this.jobProfile.addExecutionProfile(curCoordinator.getExecutionProfile());
         }
         curCoordinator.setQueryType(TQueryType.LOAD);
         curCoordinator.setExecMemoryLimit(execMemLimit);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
index da4a1c6ac68..295478349cf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java
@@ -222,7 +222,7 @@ public class InsertExecutor {
         try {
             
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
             coordinator.setQueryType(TQueryType.LOAD);
-            
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
+            
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
             QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), 
executor.getOriginStmtInString(), coordinator);
             QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
             coordinator.exec();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
new file mode 100644
index 00000000000..77eeef22ebf
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.insert;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.EnvFactory;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.DebugUtil;
+import org.apache.doris.nereids.NereidsPlanner;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
+import org.apache.doris.planner.DataSink;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.Coordinator;
+import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.task.LoadEtlTask;
+import org.apache.doris.thrift.TQueryType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Abstract insert executor.
+ * The derived class should implement the abstract method for certain type of 
target table
+ */
+public abstract class AbstractInsertExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(AbstractInsertExecutor.class);
+    protected long jobId;
+    protected final ConnectContext ctx;
+    protected final Coordinator coordinator;
+    protected String labelName;
+    protected final DatabaseIf database;
+    protected final TableIf table;
+    protected final long createTime = System.currentTimeMillis();
+    protected long loadedRows = 0;
+    protected int filteredRows = 0;
+
+    protected String errMsg = "";
+    protected Optional<InsertCommandContext> insertCtx;
+
+    /**
+     * Constructor
+     */
+    public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String 
labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx) {
+        this.ctx = ctx;
+        this.coordinator = EnvFactory.getInstance().createCoordinator(ctx, 
null, planner, ctx.getStatsErrorEstimator());
+        this.labelName = labelName;
+        this.table = table;
+        this.database = table.getDatabase();
+        this.insertCtx = insertCtx;
+    }
+
+    public Coordinator getCoordinator() {
+        return coordinator;
+    }
+
+    public DatabaseIf getDatabase() {
+        return database;
+    }
+
+    public TableIf getTable() {
+        return table;
+    }
+
+    public String getLabelName() {
+        return labelName;
+    }
+
+    /**
+     * begin transaction if necessary
+     */
+    public abstract void beginTransaction();
+
+    /**
+     * finalize sink to complete enough info for sink execution
+     */
+    protected abstract void finalizeSink(PlanFragment fragment, DataSink sink, 
PhysicalSink physicalSink);
+
+    /**
+     * Do something before exec
+     */
+    protected abstract void beforeExec();
+
+    /**
+     * Do something after exec finished
+     */
+    protected abstract void onComplete() throws UserException;
+
+    /**
+     * Do something when exec throw exception
+     */
+    protected abstract void onFail(Throwable t);
+
+    /**
+     * Do something after exec
+     */
+    protected abstract void afterExec(StmtExecutor executor);
+
+    protected final void execImpl(StmtExecutor executor, long jobId) throws 
Exception {
+        String queryId = DebugUtil.printId(ctx.queryId());
+        
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
+        coordinator.setQueryType(TQueryType.LOAD);
+        
executor.getProfile().addExecutionProfile(coordinator.getExecutionProfile());
+        QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), 
executor.getOriginStmtInString(), coordinator);
+        QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
+        coordinator.exec();
+        int execTimeout = ctx.getExecTimeout();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("insert [{}] with query id {} execution timeout is {}", 
labelName, queryId, execTimeout);
+        }
+        boolean notTimeout = coordinator.join(execTimeout);
+        if (!coordinator.isDone()) {
+            coordinator.cancel();
+            if (notTimeout) {
+                errMsg = coordinator.getExecStatus().getErrorMsg();
+                ErrorReport.reportDdlException("there exists unhealthy 
backend. "
+                        + errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
+            } else {
+                ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
+            }
+        }
+        if (!coordinator.getExecStatus().ok()) {
+            errMsg = coordinator.getExecStatus().getErrorMsg();
+            LOG.warn("insert [{}] with query id {} failed, {}", labelName, 
queryId, errMsg);
+            ErrorReport.reportDdlException(errMsg, 
ErrorCode.ERR_FAILED_WHEN_INSERT);
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("insert [{}] with query id {} delta files is {}",
+                    labelName, queryId, coordinator.getDeltaUrls());
+        }
+        if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != 
null) {
+            loadedRows = 
Long.parseLong(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
+        }
+        if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != 
null) {
+            filteredRows = 
Integer.parseInt(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
+        }
+    }
+
+    private boolean checkStrictMode() {
+        // if in strict mode, insert will fail if there are filtered rows
+        if (ctx.getSessionVariable().getEnableInsertStrict()) {
+            if (filteredRows > 0) {
+                ctx.getState().setError(ErrorCode.ERR_FAILED_WHEN_INSERT,
+                        "Insert has filtered data in strict mode, 
tracking_url=" + coordinator.getTrackingUrl());
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * execute insert txn for insert into select command.
+     */
+    public void executeSingleInsert(StmtExecutor executor, long jobId) {
+        beforeExec();
+        try {
+            execImpl(executor, jobId);
+            if (!checkStrictMode()) {
+                return;
+            }
+            onComplete();
+        } catch (Throwable t) {
+            onFail(t);
+            return;
+        } finally {
+            executor.updateProfile(true);
+            QeProcessorImpl.INSTANCE.unregisterQuery(ctx.queryId());
+        }
+        afterExec(executor);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 063cbf0ad71..b6a8c882fae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1576,7 +1576,7 @@ public class StmtExecutor {
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
             QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                     new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));
-            profile.setExecutionProfile(coord.getExecutionProfile());
+            profile.addExecutionProfile(coord.getExecutionProfile());
             coordBase = coord;
         }
 
@@ -2053,7 +2053,7 @@ public class StmtExecutor {
                 coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
                 
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
                 coord.setQueryType(TQueryType.LOAD);
-                profile.setExecutionProfile(coord.getExecutionProfile());
+                profile.addExecutionProfile(coord.getExecutionProfile());
                 QueryInfo queryInfo = new QueryInfo(ConnectContext.get(), 
this.getOriginStmtInString(), coord);
                 QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), 
queryInfo);
 
@@ -2875,7 +2875,7 @@ public class StmtExecutor {
             }
             RowBatch batch;
             coord = new Coordinator(context, analyzer, planner, 
context.getStatsErrorEstimator());
-            profile.setExecutionProfile(coord.getExecutionProfile());
+            profile.addExecutionProfile(coord.getExecutionProfile());
             try {
                 QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
                         new QeProcessorImpl.QueryInfo(context, 
originStmt.originStmt, coord));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to