This is an automated email from the ASF dual-hosted git repository. adonisling pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f5d958ccf9 [fix](MTMV) Reset insert timeout in handleInsert (#17249) f5d958ccf9 is described below commit f5d958ccf9127e13385bfbbaff8fa2ac1afac2b6 Author: huangzhaowei <carlmartin...@gmail.com> AuthorDate: Fri Mar 3 11:32:50 2023 +0800 [fix](MTMV) Reset insert timeout in handleInsert (#17249) In #16343, we split the timeout variable into two ones (one is for query and another is for insertion). The function `ConnectProcessor::handleQuery` uses the corresponding session variable to change the timeout for the queries requested by MySQL client. However, the function `StmtExecutor::handleInsert` doesn't use the session variable to change the timeout, so we can't change the timeout for the CTAS and MTMV insertion job. --- .../main/java/org/apache/doris/analysis/InsertStmt.java | 2 +- .../main/java/org/apache/doris/qe/ConnectContext.java | 17 +++++++++-------- .../main/java/org/apache/doris/qe/ConnectProcessor.java | 2 -- .../src/main/java/org/apache/doris/qe/StmtExecutor.java | 4 +++- .../java/org/apache/doris/qe/ConnectContextTest.java | 4 ++-- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 79173f42e8..10cffc53d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -305,7 +305,7 @@ public class InsertStmt extends DdlStmt { db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); // create label and begin transaction - long timeoutSecond = ConnectContext.get().getExecTimeout(); + long timeoutSecond = ConnectContext.get().resetExecTimeoutByInsert(); if (Strings.isNullOrEmpty(label)) { label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index fb8c650004..1a5143a5f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -155,8 +155,8 @@ public class ConnectContext { * the global execution timeout in seconds, currently set according to query_timeout and insert_timeout. * <p> * when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt, - * then it is set to max(query_timeout, insert_timeout) with {@link #resetExecTimeout()} in - * after the StmtExecutor is specified. + * then it is set to max(executionTimeoutS, insert_timeout) using {@link #setExecTimeout(int timeout)} at + * {@link StmtExecutor}. */ private int executionTimeoutS; @@ -640,12 +640,13 @@ public class ConnectContext { return currentConnectedFEIp; } - public void resetExecTimeout() { - if (executor != null && executor.isInsertStmt()) { - // particular timeout for insert stmt, we can make other particular timeout in the same way. - // set the execution timeout as max(insert_timeout,query_timeout) to be compatible with older versions - executionTimeoutS = Math.max(sessionVariable.getInsertTimeoutS(), executionTimeoutS); - } + public void setExecTimeout(int timeout) { + executionTimeoutS = timeout; + } + + public long resetExecTimeoutByInsert() { + executionTimeoutS = Math.max(executionTimeoutS, sessionVariable.getInsertTimeoutS()); + return executionTimeoutS; } public int getExecTimeout() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 6574929d7d..b45564ec74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -408,8 +408,6 @@ public class ConnectProcessor { parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); executor = new StmtExecutor(ctx, parsedStmt); ctx.setExecutor(executor); - // reset the executionTimeout corresponding with the StmtExecutor - ctx.resetExecTimeout(); try { executor.execute(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5c032c28db..bf34dd03a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1489,7 +1489,7 @@ public class StmtExecutor implements ProfileWriter { InterruptedException, ExecutionException, TimeoutException { TransactionEntry txnEntry = context.getTxnEntry(); TTxnParams txnConf = txnEntry.getTxnConf(); - long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS(); + long timeoutSecond = ConnectContext.get().getExecTimeout(); TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; Database dbObj = Env.getCurrentInternalCatalog() .getDbOrException(dbName, s -> new TException("database is invalid for dbName: " + s)); @@ -1550,6 +1550,8 @@ public class StmtExecutor implements ProfileWriter { } analyzeVariablesInStmt(insertStmt.getQueryStmt()); + // reset the executionTimeout since query hint maybe change the insert_timeout again + context.resetExecTimeoutByInsert(); long createTime = System.currentTimeMillis(); Throwable throwable = null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index a0f3a942a4..68e4394f67 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -176,14 +176,14 @@ public class ConnectContextTest { // sleep no time out Assert.assertFalse(ctx.isKilled()); ctx.setExecutor(executor); - ctx.resetExecTimeout(); + ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS()); long now = ctx.getExecTimeout() * 1000L - 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); // Timeout ctx.setExecutor(executor); - ctx.resetExecTimeout(); + ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS()); now = ctx.getExecTimeout() * 1000L + 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org