This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0-beta in repository https://gitbox.apache.org/repos/asf/doris.git
commit c43dcb222e4bb8f060842e6833db8a133f2b84a7 Author: AKIRA <33112463+kikyou1...@users.noreply.github.com> AuthorDate: Fri Jun 9 16:43:11 2023 +0900 [fix](stats) set analysis job status to finished when be crashed by mistake (#20485) If BE crashed the error would be logged, and the analysis task would be mark as finished, which is incorrect. In this PR, update analysis task according to the query state --- .../doris/catalog/InternalSchemaInitializer.java | 2 +- .../main/java/org/apache/doris/qe/StmtExecutor.java | 10 ++++------ .../org/apache/doris/statistics/AnalysisManager.java | 16 +++++++++------- .../apache/doris/statistics/AnalysisTaskExecutor.java | 2 +- .../apache/doris/statistics/AnalysisTaskWrapper.java | 4 ++-- .../org/apache/doris/statistics/ColumnStatistic.java | 3 ++- .../org/apache/doris/statistics/OlapAnalysisTask.java | 12 ++++++------ .../doris/statistics/StatisticsAutoAnalyzer.java | 1 - .../apache/doris/statistics/util/StatisticsUtil.java | 4 +++- .../org/apache/doris/statistics/AnalysisJobTest.java | 9 +++++++++ .../doris/statistics/AnalysisTaskExecutorTest.java | 18 ++++++++++++++++++ 11 files changed, 55 insertions(+), 26 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index c5a3197dee..04bbbe263a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -147,7 +147,7 @@ public class InternalSchemaInitializer extends Thread { columnDefs.add(partId); columnDefs.add(new ColumnDef("count", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("ndv", TypeDef.create(PrimitiveType.BIGINT))); - columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT))); + columnDefs.add(new ColumnDef("null_count", TypeDef.create(PrimitiveType.BIGINT), true)); columnDefs.add(new ColumnDef("min", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true)); columnDefs.add(new ColumnDef("max", TypeDef.createVarchar(ScalarType.MAX_VARCHAR_LENGTH), true)); columnDefs.add(new ColumnDef("data_size_in_bytes", TypeDef.create(PrimitiveType.BIGINT))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 021a179c46..9b0b8708aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2421,8 +2421,7 @@ public class StmtExecutor { analyze(context.getSessionVariable().toThrift()); } } catch (Exception e) { - LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e); - return resultRows; + throw new RuntimeException("Failed to execute internal SQL", e); } planner.getFragments(); RowBatch batch; @@ -2432,7 +2431,7 @@ public class StmtExecutor { QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord)); } catch (UserException e) { - LOG.warn(e.getMessage(), e); + throw new RuntimeException("Failed to execute internal SQL", e); } Span queryScheduleSpan = context.getTracer() @@ -2441,7 +2440,7 @@ public class StmtExecutor { coord.exec(); } catch (Exception e) { queryScheduleSpan.recordException(e); - LOG.warn("Unexpected exception when SQL running", e); + throw new RuntimeException("Failed to execute internal SQL", e); } finally { queryScheduleSpan.end(); } @@ -2457,9 +2456,8 @@ public class StmtExecutor { } } } catch (Exception e) { - LOG.warn("Unexpected exception when SQL running", e); fetchResultSpan.recordException(e); - return resultRows; + throw new RuntimeException("Failed to execute internal SQL", e); } finally { fetchResultSpan.end(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index d48eed9618..a138a356d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -234,10 +234,10 @@ public class AnalysisManager extends Daemon { Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>(); createTaskForEachColumns(jobInfo, analysisTaskInfos, false); createTaskForMVIdx(jobInfo, analysisTaskInfos, false); - - persistAnalysisJob(jobInfo); - analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); - + if (!jobInfo.jobType.equals(JobType.SYSTEM)) { + persistAnalysisJob(jobInfo); + analysisJobIdToTaskMap.put(jobInfo.jobId, analysisTaskInfos); + } try { updateTableStats(jobInfo); } catch (Throwable e) { @@ -502,7 +502,9 @@ public class AnalysisManager extends Daemon { continue; } try { - logCreateAnalysisTask(analysisInfo); + if (!jobInfo.jobType.equals(JobType.SYSTEM)) { + logCreateAnalysisTask(analysisInfo); + } } catch (Exception e) { throw new DdlException("Failed to create analysis task", e); } @@ -510,13 +512,13 @@ public class AnalysisManager extends Daemon { } private void logCreateAnalysisTask(AnalysisInfo analysisInfo) { - Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo); analysisTaskInfoMap.put(analysisInfo.taskId, analysisInfo); + Env.getCurrentEnv().getEditLog().logCreateAnalysisTasks(analysisInfo); } private void logCreateAnalysisJob(AnalysisInfo analysisJob) { - Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob); analysisJobInfoMap.put(analysisJob.jobId, analysisJob); + Env.getCurrentEnv().getEditLog().logCreateAnalysisJob(analysisJob); } private void createTaskForExternalTable(AnalysisInfo jobInfo, diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index 301d46644d..3f22d1ccf3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -78,7 +78,7 @@ public class AnalysisTaskExecutor extends Thread { long timeout = TimeUnit.MINUTES.toMillis(Config.analyze_task_timeout_in_minutes); taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); } catch (Exception e) { - taskWrapper.cancel(); + taskWrapper.cancel(e.getMessage()); } } catch (Throwable throwable) { LOG.warn(throwable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 4590e138f6..b2615e5d05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -76,12 +76,12 @@ public class AnalysisTaskWrapper extends FutureTask<Void> { } } - public boolean cancel() { + public boolean cancel(String msg) { try { LOG.warn("{} cancelled, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); task.cancel(); } catch (Exception e) { - LOG.warn(String.format("Cancel job failed job info : %s", task.toString())); + LOG.warn(String.format("Cancel job failed job info : %s", msg)); } return super.cancel(false); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index 9485ecf662..5735c01430 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -134,7 +134,8 @@ public class ColumnStatistic { ndv = count; } columnStatisticBuilder.setNdv(ndv); - columnStatisticBuilder.setNumNulls(Double.parseDouble(resultRow.getColumnValue("null_count"))); + String nullCount = resultRow.getColumnValue("null_count"); + columnStatisticBuilder.setNumNulls(nullCount == null ? 0 : Double.parseDouble(nullCount)); columnStatisticBuilder.setDataSize(Double .parseDouble(resultRow.getColumnValue("data_size_in_bytes"))); columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize() diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java index a65553a838..4391d87f52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -20,8 +20,8 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.common.FeConstants; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.qe.QueryState; +import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.annotations.VisibleForTesting; @@ -108,10 +108,10 @@ public class OlapAnalysisTask extends BaseAnalysisTask { @VisibleForTesting public void execSQL(String sql) throws Exception { - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); + QueryState queryState = StatisticsUtil.execUpdate(sql); + if (queryState.getStateType().equals(MysqlStateType.ERR)) { + throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s", + info.catalogName, info.dbName, info.colName, queryState.getErrorMessage())); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java index 8128d3a7d3..5120bd23ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoAnalyzer.java @@ -39,7 +39,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - public class StatisticsAutoAnalyzer extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(StatisticsAutoAnalyzer.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 8f77df2748..2ee244fbe6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -47,6 +47,7 @@ import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral; import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo; @@ -105,12 +106,13 @@ public class StatisticsUtil { } } - public static void execUpdate(String sql) throws Exception { + public static QueryState execUpdate(String sql) throws Exception { try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); r.connectContext.setExecutor(stmtExecutor); stmtExecutor.execute(); + return r.connectContext.getState(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index e8984cd209..86c3d63463 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -26,6 +26,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; +import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.utframe.TestWithFeService; @@ -39,6 +40,7 @@ import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Set; public class AnalysisJobTest extends TestWithFeService { @@ -80,6 +82,13 @@ public class AnalysisJobTest extends TestWithFeService { } }; + new MockUp<StmtExecutor>() { + @Mock + public List<ResultRow> executeInternalQuery() { + return Collections.emptyList(); + } + }; + new MockUp<ConnectContext>() { @Mock diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java index c2b355067d..522a28d9c2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -19,11 +19,13 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.AnalysisMode; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.util.BlockingCounter; +import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.Maps; @@ -36,6 +38,7 @@ import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Set; import java.util.concurrent.BlockingQueue; @@ -94,6 +97,21 @@ public class AnalysisTaskExecutorTest extends TestWithFeService { @Test public void testTaskExecution() throws Exception { + + new MockUp<StmtExecutor>() { + @Mock + public List<ResultRow> executeInternalQuery() { + return Collections.emptyList(); + } + }; + + new MockUp<OlapAnalysisTask>() { + @Mock + public void execSQL(String sql) throws Exception { + + } + }; + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); HashMap<String, Set<String>> colToPartitions = Maps.newHashMap(); colToPartitions.put("col1", Collections.singleton("t1")); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org