This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3f6688407974ba5b6dc692fa6f20891404f8fe0d Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Thu Aug 24 09:49:35 2023 +0800 [fix](Nereids) use Stopwatch to do timeout checker (#23383) 1. avoid thread leak if exception thrown in planning 2. avoid memory release delays since the timer task hold CascadesContext object --- .../src/main/java/org/apache/doris/nereids/NereidsPlanner.java | 10 ++-------- .../main/java/org/apache/doris/nereids/StatementContext.java | 7 +++++++ .../doris/nereids/jobs/scheduler/SimpleJobScheduler.java | 5 ++++- .../org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java | 3 ++- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 25e5ff6baa..4bef000639 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -67,7 +67,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -112,7 +111,9 @@ public class NereidsPlanner extends Planner { NereidsTracer.logImportantTime("EndParsePlan"); setParsedPlan(parsedPlan); PhysicalProperties requireProperties = buildInitRequireProperties(); + statementContext.getStopwatch().start(); Plan resultPlan = plan(parsedPlan, requireProperties, explainLevel); + statementContext.getStopwatch().stop(); setOptimizedPlan(resultPlan); if (explainLevel.isPlanLevel) { return; @@ -179,7 +180,6 @@ public class NereidsPlanner extends Planner { try (Lock lock = new Lock(plan, cascadesContext)) { // resolve column, table and function - Span queryAnalysisSpan = statementContext.getConnectContext().getTracer() .spanBuilder("query analysis").setParent(Context.current()).startSpan(); @@ -216,11 +216,6 @@ public class NereidsPlanner extends Planner { } } - Optional<ScheduledExecutorService> timeoutExecutor = Optional.empty(); - if (statementContext.getConnectContext().getSessionVariable().enableNereidsTimeout) { - timeoutExecutor = Optional.of(runTimeoutExecutor()); - } - // rule-based optimize rewrite(); if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) { @@ -258,7 +253,6 @@ public class NereidsPlanner extends Planner { DebugUtil.printId(statementContext.getConnectContext().queryId())); } NereidsTracer.output(statementContext.getConnectContext()); - timeoutExecutor.ifPresent(ExecutorService::shutdown); return physicalPlan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 21246a5c70..c5c5eafcda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; +import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; @@ -54,6 +55,8 @@ public class StatementContext { private ConnectContext connectContext; + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + @GuardedBy("this") private final Map<String, Supplier<Object>> contextCacheMap = Maps.newLinkedHashMap(); @@ -107,6 +110,10 @@ public class StatementContext { return originStatement; } + public Stopwatch getStopwatch() { + return stopwatch; + } + public void setMaxNAryInnerJoin(int maxNAryInnerJoin) { if (maxNAryInnerJoin > this.maxNAryInnerJoin) { this.maxNAryInnerJoin = maxNAryInnerJoin; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java index 940f9d572f..c52e2e90f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/scheduler/SimpleJobScheduler.java @@ -20,6 +20,8 @@ package org.apache.doris.nereids.jobs.scheduler; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.Job; +import java.util.concurrent.TimeUnit; + /** * Single thread, serial scheduler. */ @@ -29,7 +31,8 @@ public class SimpleJobScheduler implements JobScheduler { JobPool pool = scheduleContext.getJobPool(); while (!pool.isEmpty()) { CascadesContext context = (CascadesContext) scheduleContext; - if (context.isTimeout()) { + if (context.getConnectContext().getSessionVariable().enableNereidsTimeout + && context.getStatementContext().getStopwatch().elapsed(TimeUnit.MILLISECONDS) > 5000) { throw new RuntimeException("Nereids cost too much time ( > 5s )"); } Job job = pool.pop(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java index 65d9607a1c..a465df6da1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.CTEId; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; @@ -89,7 +90,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { innerCascadesCtx.newAnalyzer().analyze(); LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = cascadesContext.getStatementContext().getNextCTEId(); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org