This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dd99468b8f [fix](stats) Fix jdbc timeout with multiple FE when execute
analyze table (#21115)
dd99468b8f is described below
commit dd99468b8f6bf8237c654bbd34fbe0eab27411ab
Author: AKIRA <[email protected]>
AuthorDate: Sun Jun 25 16:49:36 2023 +0900
[fix](stats) Fix jdbc timeout with multiple FE when execute analyze table
(#21115)
SQL may forward to master to execute when connecting to follower node, the
result should be set to `StmtExecutor#proxyResultSet`
Before this PR, in above scenario , submit analyze sql by mysql
client/jdbc whould return get malformed packet/ Communication failed.
---
.../org/apache/doris/analysis/AnalyzeStmt.java | 7 +++++-
.../org/apache/doris/analysis/AnalyzeTblStmt.java | 5 -----
.../org/apache/doris/analysis/RedirectStatus.java | 4 ++--
.../main/java/org/apache/doris/catalog/Env.java | 14 ------------
.../main/java/org/apache/doris/qe/DdlExecutor.java | 6 ------
.../java/org/apache/doris/qe/StmtExecutor.java | 11 ++++++++++
.../apache/doris/statistics/AnalysisManager.java | 25 ++++++++++++++++------
7 files changed, 38 insertions(+), 34 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
index 202a870f12..6f1f7c64d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java
@@ -25,7 +25,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import java.util.Map;
-public class AnalyzeStmt extends DdlStmt {
+public class AnalyzeStmt extends StatementBase {
protected AnalyzeProperties analyzeProperties;
@@ -81,4 +81,9 @@ public class AnalyzeStmt extends DdlStmt {
public AnalyzeProperties getAnalyzeProperties() {
return analyzeProperties;
}
+
+ @Override
+ public RedirectStatus getRedirectStatus() {
+ return RedirectStatus.FORWARD_WITH_SYNC;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
index 71d25c97f6..03681bdb36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java
@@ -252,11 +252,6 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
return table instanceof HMSExternalTable &&
table.getPartitionNames().size() > partNum;
}
- @Override
- public RedirectStatus getRedirectStatus() {
- return RedirectStatus.FORWARD_NO_SYNC;
- }
-
private void checkAnalyzePriv(String dbName, String tblName) throws
AnalysisException {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.SELECT)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
index 478f9fdf07..628d5a3080 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RedirectStatus.java
@@ -28,7 +28,7 @@ public class RedirectStatus {
public RedirectStatus(boolean isForwardToMaster, boolean
needToWaitJournalSync) {
this.isForwardToMaster = isForwardToMaster;
- this.needToWaitJournalSync = needToWaitJournalSync;
+ this.needToWaitJournalSync = needToWaitJournalSync;
}
public boolean isForwardToMaster() {
@@ -47,7 +47,7 @@ public class RedirectStatus {
this.needToWaitJournalSync = needToWaitJournalSync;
}
- public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true,
false);
+ public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true,
false);
public static RedirectStatus FORWARD_WITH_SYNC = new RedirectStatus(true,
true);
public static RedirectStatus NO_FORWARD = new RedirectStatus(false,
false);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 90e4d06118..933b7714a6 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -39,7 +39,6 @@ import org.apache.doris.analysis.AlterMaterializedViewStmt;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
-import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
@@ -211,7 +210,6 @@ import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
-import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.StatisticsAutoAnalyzer;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
@@ -5345,18 +5343,6 @@ public class Env {
return count;
}
- public AnalysisTaskScheduler getAnalysisJobScheduler() {
- return analysisManager.taskScheduler;
- }
-
- // TODO:
- // 1. handle partition level analysis statement properly
- // 2. support sample job
- // 3. support period job
- public void createAnalysisJob(AnalyzeTblStmt analyzeTblStmt) throws
DdlException {
- analysisManager.createAnalysisJob(analyzeTblStmt);
- }
-
public AnalysisManager getAnalysisManager() {
return analysisManager;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
index faff8fdfb4..15b7bcc883 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -43,8 +43,6 @@ import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterUserStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.AlterWorkloadGroupStmt;
-import org.apache.doris.analysis.AnalyzeDBStmt;
-import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
@@ -297,8 +295,6 @@ public class DdlExecutor {
env.getRefreshManager().handleRefreshTable((RefreshTableStmt)
ddlStmt);
} else if (ddlStmt instanceof RefreshDbStmt) {
env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
- } else if (ddlStmt instanceof AnalyzeTblStmt) {
- env.createAnalysisJob((AnalyzeTblStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
} else if (ddlStmt instanceof AlterWorkloadGroupStmt) {
@@ -337,8 +333,6 @@ public class DdlExecutor {
env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt);
} else if (ddlStmt instanceof KillAnalysisJobStmt) {
env.getAnalysisManager().handleKillAnalyzeStmt((KillAnalysisJobStmt) ddlStmt);
- } else if (ddlStmt instanceof AnalyzeDBStmt) {
- env.getAnalysisManager().createAnalysisJobs((AnalyzeDBStmt)
ddlStmt);
} else if (ddlStmt instanceof CleanQueryStatsStmt) {
CleanQueryStatsStmt stmt = (CleanQueryStatsStmt) ddlStmt;
CleanQueryStatsInfo cleanQueryStatsInfo = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index ed68e03889..9195738b75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -736,6 +736,8 @@ public class StmtExecutor {
handleLockTablesStmt();
} else if (parsedStmt instanceof UnsupportedStmt) {
handleUnsupportedStmt();
+ } else if (parsedStmt instanceof AnalyzeStmt) {
+ handleAnalyzeStmt();
} else {
context.getState().setError(ErrorCode.ERR_NOT_SUPPORTED_YET,
"Do not support this query.");
}
@@ -1899,6 +1901,10 @@ public class StmtExecutor {
context.getState().setOk();
}
+ private void handleAnalyzeStmt() throws DdlException {
+ context.env.getAnalysisManager().createAnalyze((AnalyzeStmt)
parsedStmt, isProxy);
+ }
+
// Process switch catalog
private void handleSwitchStmt() throws AnalysisException {
SwitchStmt switchStmt = (SwitchStmt) parsedStmt;
@@ -2537,5 +2543,10 @@ public class StmtExecutor {
public void setProfileType(ProfileType profileType) {
this.profileType = profileType;
}
+
+
+ public void setProxyResultSet(ShowResultSet proxyResultSet) {
+ this.proxyResultSet = proxyResultSet;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index fc29affadb..c4443e5733 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -18,6 +18,7 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeDBStmt;
+import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.DropAnalyzeJobStmt;
import org.apache.doris.analysis.DropStatsStmt;
@@ -151,7 +152,15 @@ public class AnalysisManager extends Daemon implements
Writable {
return statisticsCache;
}
- public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt) throws
DdlException {
+ public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws
DdlException {
+ if (analyzeStmt instanceof AnalyzeDBStmt) {
+ createAnalysisJobs((AnalyzeDBStmt) analyzeStmt, proxy);
+ } else if (analyzeStmt instanceof AnalyzeTblStmt) {
+ createAnalysisJob((AnalyzeTblStmt) analyzeStmt, proxy);
+ }
+ }
+
+ public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy)
throws DdlException {
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
List<TableIf> tbls = db.getTables();
List<AnalysisInfo> analysisInfos = new ArrayList<>();
@@ -179,7 +188,7 @@ public class AnalysisManager extends Daemon implements
Writable {
analysisInfos.add(buildAndAssignJob(analyzeTblStmt));
}
if (!analyzeDBStmt.isSync()) {
- sendJobId(analysisInfos);
+ sendJobId(analysisInfos, proxy);
}
} finally {
db.readUnlock();
@@ -188,12 +197,12 @@ public class AnalysisManager extends Daemon implements
Writable {
}
// Each analyze stmt corresponding to an analysis job.
- public void createAnalysisJob(AnalyzeTblStmt stmt) throws DdlException {
+ public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws
DdlException {
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
if (jobInfo == null) {
return;
}
- sendJobId(ImmutableList.of(jobInfo));
+ sendJobId(ImmutableList.of(jobInfo), proxy);
}
@Nullable
@@ -259,7 +268,7 @@ public class AnalysisManager extends Daemon implements
Writable {
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}
- private void sendJobId(List<AnalysisInfo> analysisInfos) {
+ private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
List<Column> columns = new ArrayList<>();
columns.add(new Column("Catalog_Name",
ScalarType.createVarchar(1024)));
columns.add(new Column("DB_Name", ScalarType.createVarchar(1024)));
@@ -279,7 +288,11 @@ public class AnalysisManager extends Daemon implements
Writable {
}
ShowResultSet commonResultSet = new
ShowResultSet(commonResultSetMetaData, resultRows);
try {
- ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
+ if (!proxy) {
+
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
+ } else {
+
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
+ }
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]