This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3f6688407974ba5b6dc692fa6f20891404f8fe0d
Author: morrySnow <101034200+morrys...@users.noreply.github.com>
AuthorDate: Thu Aug 24 09:49:35 2023 +0800

    [fix](Nereids) use Stopwatch to do timeout checker (#23383)
    
    1. avoid thread leak if exception thrown in planning
    2. avoid memory release delays since the timer task hold CascadesContext
       object
---
 .../src/main/java/org/apache/doris/nereids/NereidsPlanner.java | 10 ++--------
 .../main/java/org/apache/doris/nereids/StatementContext.java   |  7 +++++++
 .../doris/nereids/jobs/scheduler/SimpleJobScheduler.java       |  5 ++++-
 .../org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java    |  3 ++-
 4 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 25e5ff6baa..4bef000639 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -67,7 +67,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -112,7 +111,9 @@ public class NereidsPlanner extends Planner {
         NereidsTracer.logImportantTime("EndParsePlan");
         setParsedPlan(parsedPlan);
         PhysicalProperties requireProperties = buildInitRequireProperties();
+        statementContext.getStopwatch().start();
         Plan resultPlan = plan(parsedPlan, requireProperties, explainLevel);
+        statementContext.getStopwatch().stop();
         setOptimizedPlan(resultPlan);
         if (explainLevel.isPlanLevel) {
             return;
@@ -179,7 +180,6 @@ public class NereidsPlanner extends Planner {
 
         try (Lock lock = new Lock(plan, cascadesContext)) {
             // resolve column, table and function
-
             Span queryAnalysisSpan =
                     statementContext.getConnectContext().getTracer()
                             .spanBuilder("query 
analysis").setParent(Context.current()).startSpan();
@@ -216,11 +216,6 @@ public class NereidsPlanner extends Planner {
                 }
             }
 
-            Optional<ScheduledExecutorService> timeoutExecutor = 
Optional.empty();
-            if 
(statementContext.getConnectContext().getSessionVariable().enableNereidsTimeout)
 {
-                timeoutExecutor = Optional.of(runTimeoutExecutor());
-            }
-
             // rule-based optimize
             rewrite();
             if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == 
ExplainLevel.ALL_PLAN) {
@@ -258,7 +253,6 @@ public class NereidsPlanner extends Planner {
                         
DebugUtil.printId(statementContext.getConnectContext().queryId()));
             }
             NereidsTracer.output(statementContext.getConnectContext());
-            timeoutExecutor.ifPresent(ExecutorService::shutdown);
 
             return physicalPlan;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index 21246a5c70..c5c5eafcda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -35,6 +35,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.OriginStatement;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
@@ -54,6 +55,8 @@ public class StatementContext {
 
     private ConnectContext connectContext;
 
+    private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+
     @GuardedBy("this")
     private final Map<String, Supplier<Object>> contextCacheMap = 
Maps.newLinkedHashMap();
 
@@ -107,6 +110,10 @@ public class StatementContext {
         return originStatement;
     }
 
+    public Stopwatch getStopwatch() {
+        return stopwatch;
+    }
+
     public void setMaxNAryInnerJoin(int maxNAryInnerJoin) {
         if (maxNAryInnerJoin > this.maxNAryInnerJoin) {
             this.maxNAryInnerJoin = maxNAryInnerJoin;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
index 940f9d572f..c52e2e90f1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java
@@ -20,6 +20,8 @@ package org.apache.doris.nereids.jobs.scheduler;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.jobs.Job;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Single thread, serial scheduler.
  */
@@ -29,7 +31,8 @@ public class SimpleJobScheduler implements JobScheduler {
         JobPool pool = scheduleContext.getJobPool();
         while (!pool.isEmpty()) {
             CascadesContext context = (CascadesContext) scheduleContext;
-            if (context.isTimeout()) {
+            if 
(context.getConnectContext().getSessionVariable().enableNereidsTimeout
+                    && 
context.getStatementContext().getStopwatch().elapsed(TimeUnit.MILLISECONDS) > 
5000) {
                 throw new RuntimeException("Nereids cost too much time ( > 5s 
)");
             }
             Job job = pool.pop();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
index 65d9607a1c..a465df6da1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
@@ -25,6 +25,7 @@ import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.CTEId;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
@@ -89,7 +90,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory {
             innerCascadesCtx.newAnalyzer().analyze();
             LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
             checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = cascadesContext.getStatementContext().getNextCTEId();
+            CTEId cteId = StatementScopeIdGenerator.newCTEId();
             LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
                     aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
             outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to