EmmyMiao87 commented on code in PR #8860:
URL: https://github.com/apache/incubator-doris/pull/8860#discussion_r854748367


##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java:
##########
@@ -18,62 +18,200 @@
 package org.apache.doris.statistics;
 
 import org.apache.doris.analysis.AnalyzeStmt;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 
-import com.google.common.collect.Maps;
+import com.clearspring.analytics.util.Lists;
 
-import org.glassfish.jersey.internal.guava.Sets;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.clearspring.analytics.util.Lists;
-
-/*
-Used to store statistics job info,
-including job status, progress, etc.
+/***
+ * Used to store statistics job info,
+ * including job status, progress, etc.
  */
 public class StatisticsJob {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsJob.class);
 
     public enum JobState {
         PENDING,
         SCHEDULING,
         RUNNING,
         FINISHED,
+        FAILED,
         CANCELLED
     }
 
-    private long id = -1;
+    private long id = Catalog.getCurrentCatalog().getNextId();
+
+    /**
+     * to be collected database stats.
+     */
+    private final long dbId;
+
+    /**
+     * to be collected table stats.
+     */
+    private final Set<Long> tblIds;
+
+    /**
+     * to be collected column stats.
+     */
+    private final Map<Long, List<String>> tableIdToColumnName;
+
+    /**
+     * timeout of a statistics task
+     */
+    private long taskTimeout;
+
+    /**
+     * to be executed tasks.
+     */
+    private List<StatisticsTask> tasks = Lists.newArrayList();
+
     private JobState jobState = JobState.PENDING;
-    // optional
-    // to be collected table stats
-    private List<Long> tableId = Lists.newArrayList();
-    // to be collected column stats
-    private Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
-    private Map<String, String> properties;
-    // end
+    private final List<String> errorMsgs = Lists.newArrayList();
 
-    private List<StatisticsTask> taskList = Lists.newArrayList();
+    private final long createTime = System.currentTimeMillis();
+    private long startTime = -1L;
+    private long finishTime = -1L;
+    private int progress = 0;
+
+    public StatisticsJob(Long dbId,
+                         Set<Long> tblIds,
+                         Map<Long, List<String>> tableIdToColumnName) {
+        this.dbId = dbId;
+        this.tblIds = tblIds;
+        this.tableIdToColumnName = tableIdToColumnName;
+    }
 
     public long getId() {
-        return id;
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getDbId() {
+        return this.dbId;
+    }
+
+    public Set<Long> getTblIds() {
+        return this.tblIds;
+    }
+
+    public Map<Long, List<String>> getTableIdToColumnName() {
+        return this.tableIdToColumnName;
+    }
+
+    public long getTaskTimeout() {
+        return taskTimeout;
     }
 
-    /*
-        AnalyzeStmt: Analyze t1(c1), t2
-        StatisticsJob:
-          tableId [t1, t2]
-          tableIdToColumnName <t1, [c1]> <t2, [c1,c2,c3]>
-         */
-    public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt analyzeStmt) {
-        // TODO
-        return new StatisticsJob();
+    public List<StatisticsTask> getTasks() {
+        return this.tasks;
+    }
+
+    public void setTasks(List<StatisticsTask> tasks) {
+        this.tasks = tasks;
+    }
+
+    public List<String> getErrorMsgs() {
+        return errorMsgs;
+    }
+
+    public JobState getJobState() {
+        return this.jobState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
+    public int getProgress() {
+        return this.progress;
+    }
+
+    public void setProgress(int progress) {
+        this.progress = progress;
+    }
+
+    private void setOptional(AnalyzeStmt stmt) {
+        if (stmt.getTaskTimeout() != -1) {
+            this.taskTimeout = stmt.getTaskTimeout();
+        }
+    }
+
+    public synchronized void updateJobState(JobState jobState) {
+        // PENDING -> SCHEDULING/FAILED/CANCELLED
+        if (this.jobState == JobState.PENDING) {
+            if (jobState == JobState.SCHEDULING) {
+                this.jobState = JobState.SCHEDULING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // SCHEDULING -> RUNNING/FAILED/CANCELLED
+        if (this.jobState == JobState.SCHEDULING) {
+            if (jobState == JobState.RUNNING) {
+                this.jobState = JobState.RUNNING;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+            return;
+        }
+
+        // RUNNING -> FINISHED/FAILED/CANCELLED
+        if (this.jobState == JobState.RUNNING) {
+            if (jobState == JobState.FINISHED) {
+                this.jobState = JobState.FINISHED;
+            } else if (jobState == JobState.FAILED) {
+                this.jobState = JobState.FAILED;
+            } else if (jobState == JobState.CANCELLED) {
+                this.jobState = JobState.CANCELLED;
+            }
+        }

Review Comment:
   Also update ```start, finish time``` in  here



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java:
##########
@@ -17,63 +17,95 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.StatisticsJob.JobState;
+import org.apache.doris.statistics.StatisticsTask.TaskState;
+import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory;
 
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
-import com.clearspring.analytics.util.Lists;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /*
 Schedule statistics task
  */
 public class StatisticsTaskScheduler extends MasterDaemon {
     private final static Logger LOG = 
LogManager.getLogger(StatisticsTaskScheduler.class);
 
-    private Queue<StatisticsTask> queue = Queues.newLinkedBlockingQueue();
+    private final Queue<StatisticsTask> queue = 
Queues.newLinkedBlockingQueue();
 
     public StatisticsTaskScheduler() {
         super("Statistics task scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        // step1: task n concurrent tasks from the queue
+        // task n concurrent tasks from the queue
         List<StatisticsTask> tasks = peek();
-        // step2: execute tasks
-        ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
-        List<Future<StatisticsTaskResult>> taskResultList = null;
-        try {
-            taskResultList = executor.invokeAll(tasks);
-        } catch (InterruptedException e) {
-            LOG.warn("Failed to execute this turn of statistics tasks", e);
-        }
-        // step3: update job and statistics
-        handleTaskResult(taskResultList);
-        // step4: remove task from queue
-        remove(tasks.size());
 
+        if (!tasks.isEmpty()) {
+            ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPool(tasks.size(),
+                    "statistic-pool", false);
+            StatisticsJobManager jobManager = 
Catalog.getCurrentCatalog().getStatisticsJobManager();
+            Map<Long, StatisticsJob> statisticsJobs = 
jobManager.getIdToStatisticsJob();
+            Map<Long, Future<StatisticsTaskResult>> taskMap = 
Maps.newLinkedHashMap();
+
+            long jobId = -1;
+            int taskSize = 0;
+            for (StatisticsTask task : tasks) {
+                this.queue.remove();

Review Comment:
   Check whether job stats is not cancelled or failed



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java:
##########
@@ -17,63 +17,95 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.StatisticsJob.JobState;
+import org.apache.doris.statistics.StatisticsTask.TaskState;
+import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory;
 
+import com.clearspring.analytics.util.Lists;

Review Comment:
   error import



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobScheduler.java:
##########
@@ -33,31 +47,211 @@
 Switch job state from pending to scheduling.
  */
 public class StatisticsJobScheduler extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(StatisticsJobScheduler.class);
+
+    /**
+     * If the table row-count is greater than the maximum number of Be scans 
for a single BE,
+     * we'll divide subtasks by partition. relevant 
values(3700000000L&600000000L) are derived from test.
+     * COUNT_MAX_SCAN_PER_TASK is for count(expr), NDV_MAX_SCAN_PER_TASK is 
for min(c1)/max(c1)/ndv(c1).
+     */
+    private static final long COUNT_MAX_SCAN_PER_TASK = 3700000000L;
+    private static final long NDV_MAX_SCAN_PER_TASK = 600000000L;
 
-    public Queue<StatisticsJob> pendingJobQueue = 
Queues.newLinkedBlockingQueue();
+    /**
+     * Different statistics need to be collected for the jobs submitted by 
users.
+     * if all statistics be collected at the same time, the cluster may be 
overburdened
+     * and normal query services may be affected. Therefore, we put the jobs 
into the queue
+     * and schedule them one by one, and finally divide each job to several 
subtasks and execute them.
+     */
+    public final Queue<StatisticsJob> pendingJobQueue = 
Queues.newLinkedBlockingQueue(Config.cbo_max_statistics_job_num);
 
     public StatisticsJobScheduler() {
         super("Statistics job scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        StatisticsJob pendingJob = pendingJobQueue.peek();
-        // step0: check job state again
-        // step1: divide statistics job to task
-        List<StatisticsTask> statisticsTaskList = divide(pendingJob);
-        // step2: submit
-        
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(statisticsTaskList);
+        StatisticsJob pendingJob = this.pendingJobQueue.poll();
+        if (pendingJob != null) {
+            try {
+                List<StatisticsTask> tasks = this.divide(pendingJob);
+                
Catalog.getCurrentCatalog().getStatisticsTaskScheduler().addTasks(tasks);
+                pendingJob.updateJobState(StatisticsJob.JobState.SCHEDULING);
+            } catch (DdlException e) {

Review Comment:
   Process ```IllegalStateException``` when queue is full
   



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java:
##########
@@ -104,7 +111,7 @@ public List<List<String>> showColumnStatsList(TableName 
tableName) throws Analys
         }
         // get stats
         List<List<String>> result = Lists.newArrayList();
-        Map<String, ColumnStats> nameToColumnStats = 
statistics.getColumnStats(table.getId());
+        Map<String, ColumnStats> nameToColumnStats = 
this.statistics.getColumnStats(table.getId());

Review Comment:
   I think it would be better to stick to the previous code style.
   If there is no confusion about the naming of input parameters and member 
variables in the function, the ```this``` keyword is not used.
   If there is confusion in the function, it can use the ```this``` keyword.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsManager.java:
##########
@@ -128,12 +135,47 @@ private List<String> showTableStats(Table table) throws 
AnalysisException {
         return row;
     }
 
+    public void alterTableStatistics(StatisticsTaskResult taskResult) throws 
AnalysisException {
+        StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
+        validateTableAndColumn(categoryDesc);
+        long tblId = categoryDesc.getTableId();
+        Map<String, String> statsNameToValue = 
taskResult.getStatsNameToValue();
+        this.statistics.updateTableStats(tblId, statsNameToValue);
+    }
+
+    public void alterColumnStatistics(StatisticsTaskResult taskResult) throws 
AnalysisException {
+        StatsCategoryDesc categoryDesc = taskResult.getCategoryDesc();
+        validateTableAndColumn(categoryDesc);
+        long dbId = categoryDesc.getDbId();
+        long tblId = categoryDesc.getTableId();
+        Database db = 
Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
+        Table table = db.getTableOrAnalysisException(tblId);
+        String columnName = categoryDesc.getColumnName();
+        Type columnType = table.getColumn(columnName).getType();
+        Map<String, String> statsNameToValue = 
taskResult.getStatsNameToValue();
+        this.statistics.updateColumnStats(tblId, columnName, columnType, 
statsNameToValue);
+    }
+
     private Table validateTableName(TableName dbTableName) throws 
AnalysisException {
         String dbName = dbTableName.getDb();
         String tableName = dbTableName.getTbl();
 
         Database db = 
Catalog.getCurrentCatalog().getDbOrAnalysisException(dbName);
-        Table table = db.getTableOrAnalysisException(tableName);
-        return table;
+        return db.getTableOrAnalysisException(tableName);
+    }
+
+    private void validateTableAndColumn(StatsCategoryDesc categoryDesc) throws 
AnalysisException {
+        long dbId = categoryDesc.getDbId();
+        long tblId = categoryDesc.getTableId();
+        String columnName = categoryDesc.getColumnName();
+
+        Database db = 
Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
+        Table table = db.getTableOrAnalysisException(tblId);
+        if (!Strings.isNullOrEmpty(columnName)) {
+            Column column = table.getColumn(columnName);
+            if (column == null) {

Review Comment:
   If the column does not exist, no error will be reported. Just print an info 
log.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskScheduler.java:
##########
@@ -17,63 +17,95 @@
 
 package org.apache.doris.statistics;
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.util.MasterDaemon;
+import org.apache.doris.statistics.StatisticsJob.JobState;
+import org.apache.doris.statistics.StatisticsTask.TaskState;
+import org.apache.doris.statistics.StatsCategoryDesc.StatsCategory;
 
+import com.clearspring.analytics.util.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-
-import com.clearspring.analytics.util.Lists;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /*
 Schedule statistics task
  */
 public class StatisticsTaskScheduler extends MasterDaemon {
     private final static Logger LOG = 
LogManager.getLogger(StatisticsTaskScheduler.class);
 
-    private Queue<StatisticsTask> queue = Queues.newLinkedBlockingQueue();
+    private final Queue<StatisticsTask> queue = 
Queues.newLinkedBlockingQueue();
 
     public StatisticsTaskScheduler() {
         super("Statistics task scheduler", 0);
     }
 
     @Override
     protected void runAfterCatalogReady() {
-        // TODO
-        // step1: task n concurrent tasks from the queue
+        // task n concurrent tasks from the queue
         List<StatisticsTask> tasks = peek();
-        // step2: execute tasks
-        ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
-        List<Future<StatisticsTaskResult>> taskResultList = null;
-        try {
-            taskResultList = executor.invokeAll(tasks);
-        } catch (InterruptedException e) {
-            LOG.warn("Failed to execute this turn of statistics tasks", e);
-        }
-        // step3: update job and statistics
-        handleTaskResult(taskResultList);
-        // step4: remove task from queue
-        remove(tasks.size());
 
+        if (!tasks.isEmpty()) {
+            ThreadPoolExecutor executor = 
ThreadPoolManager.newDaemonCacheThreadPool(tasks.size(),
+                    "statistic-pool", false);
+            StatisticsJobManager jobManager = 
Catalog.getCurrentCatalog().getStatisticsJobManager();
+            Map<Long, StatisticsJob> statisticsJobs = 
jobManager.getIdToStatisticsJob();
+            Map<Long, Future<StatisticsTaskResult>> taskMap = 
Maps.newLinkedHashMap();
+
+            long jobId = -1;
+            int taskSize = 0;
+            for (StatisticsTask task : tasks) {
+                this.queue.remove();
+                // handle task result for each job
+                if (taskSize > 0 && jobId != task.getJobId()) {

Review Comment:
   Now the logic you have implemented is that the task concurrency of a single 
job is parallel. Tasks of different job are serialized.
   My understanding is that as long as the number of concurrent tasks is 
controlled, it will be fine. There is no need to differentiate between jobs.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTaskResult.java:
##########
@@ -17,17 +17,40 @@
 
 package org.apache.doris.statistics;
 
+import com.google.common.collect.Maps;
 import java.util.Map;
 
 public class StatisticsTaskResult {
-    private StatsGranularityDesc granularityDesc;
-    private StatsCategoryDesc categoryDesc;
-    private Map<StatsType, String> statsTypeToValue;
+    private final StatsGranularityDesc granularityDesc;
+    private final StatsCategoryDesc categoryDesc;
+    private final Map<StatsType, String> statsTypeToValue;
 
     public StatisticsTaskResult(StatsGranularityDesc granularityDesc, 
StatsCategoryDesc categoryDesc,
                                 Map<StatsType, String> statsTypeToValue) {
         this.granularityDesc = granularityDesc;
         this.categoryDesc = categoryDesc;
         this.statsTypeToValue = statsTypeToValue;
     }
+
+    public StatsGranularityDesc getGranularityDesc() {
+        return this.granularityDesc;
+    }
+
+    public StatsCategoryDesc getCategoryDesc() {
+        return this.categoryDesc;
+    }
+
+    public Map<StatsType, String> getStatsTypeToValue() {
+        return this.statsTypeToValue;
+    }
+
+    public Map<String, String> getStatsNameToValue() {

Review Comment:
   Or you can directly input ```Map<String, String>``` into statistics ~



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java:
##########
@@ -33,23 +37,88 @@
  * @granularityDesc: StatsGranularity=partition
  */
 public class StatisticsTask implements Callable<StatisticsTaskResult> {
-    protected long id = Catalog.getCurrentCatalog().getNextId();;
+    protected static final Logger LOG = 
LogManager.getLogger(StatisticsTask.class);
+
+    public enum TaskState {
+        CREATED,
+        RUNNING,
+        FINISHED,
+        FAILED
+    }
+
+    protected long id = Catalog.getCurrentCatalog().getNextId();
+
     protected long jobId;
     protected StatsGranularityDesc granularityDesc;
     protected StatsCategoryDesc categoryDesc;
     protected List<StatsType> statsTypeList;
+    protected TaskState taskState = TaskState.CREATED;
+
+    protected final long createTime = System.currentTimeMillis();
+    protected long scheduleTime;
+    protected long finishTime;
 
-    public StatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                          StatsCategoryDesc categoryDesc, List<StatsType> 
statsTypeList) {
+    public StatisticsTask(long jobId,
+                          StatsGranularityDesc granularityDesc,
+                          StatsCategoryDesc categoryDesc,
+                          List<StatsType> statsTypeList) {
         this.jobId = jobId;
         this.granularityDesc = granularityDesc;
         this.categoryDesc = categoryDesc;
         this.statsTypeList = statsTypeList;
     }
 
+    public long getId() {
+        return this.id;
+    }
+
+    public long getJobId() {
+        return this.jobId;
+    }
+
+    public StatsGranularityDesc getGranularityDesc() {
+        return this.granularityDesc;
+    }
+
+    public StatsCategoryDesc getCategoryDesc() {
+        return this.categoryDesc;
+    }
+
+    public List<StatsType> getStatsTypeList() {
+        return this.statsTypeList;
+    }
+
+    public TaskState getTaskState() {
+        return this.taskState;
+    }
+
+    public void setTaskState(TaskState taskState) {
+        this.taskState = taskState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getScheduleTime() {
+        return this.scheduleTime;
+    }
+
+    public void setScheduleTime(long scheduleTime) {
+        this.scheduleTime = scheduleTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
     @Override
     public StatisticsTaskResult call() throws Exception {

Review Comment:
   Here you can set it as an interface



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java:
##########
@@ -33,23 +37,108 @@
  * @granularityDesc: StatsGranularity=partition
  */
 public class StatisticsTask implements Callable<StatisticsTaskResult> {
+    protected static final Logger LOG = 
LogManager.getLogger(StatisticsTask.class);
+
+    public enum TaskState {
+        PENDING,
+        RUNNING,
+        FINISHED,
+        FAILED
+    }
+
     protected long id = Catalog.getCurrentCatalog().getNextId();;
     protected long jobId;
     protected StatsGranularityDesc granularityDesc;
     protected StatsCategoryDesc categoryDesc;
     protected List<StatsType> statsTypeList;
+    protected TaskState taskState = TaskState.PENDING;
 
-    public StatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                          StatsCategoryDesc categoryDesc, List<StatsType> 
statsTypeList) {
+    protected final long createTime = System.currentTimeMillis();
+    protected long startTime = -1L;
+    protected long finishTime = -1L;
+
+    public StatisticsTask(long jobId,
+                          StatsGranularityDesc granularityDesc,
+                          StatsCategoryDesc categoryDesc,
+                          List<StatsType> statsTypeList) {
         this.jobId = jobId;
         this.granularityDesc = granularityDesc;
         this.categoryDesc = categoryDesc;
         this.statsTypeList = statsTypeList;
     }
 
+    public long getId() {
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getJobId() {
+        return this.jobId;
+    }
+
+    public StatsGranularityDesc getGranularityDesc() {
+        return this.granularityDesc;
+    }
+
+    public StatsCategoryDesc getCategoryDesc() {
+        return this.categoryDesc;
+    }
+
+    public List<StatsType> getStatsTypeList() {
+        return this.statsTypeList;
+    }
+
+    public TaskState getTaskState() {
+        return this.taskState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
     @Override
     public StatisticsTaskResult call() throws Exception {
-        // TODO
+        LOG.warn("execute invalid statistics task.");
         return null;
     }
+
+    public synchronized void updateTaskState(StatisticsTask.TaskState 
taskState) {
+        // PENDING -> RUNNING/FAILED

Review Comment:
   Update timestamp together ~



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java:
##########
@@ -19,55 +19,160 @@
 
 import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.common.ErrorReport;
+import org.apache.doris.common.UserException;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-/*
-For unified management of statistics job,
-including job addition, cancellation, scheduling, etc.
+/**
+ * For unified management of statistics job,
+ * including job addition, cancellation, scheduling, etc.
  */
 public class StatisticsJobManager {
     private static final Logger LOG = 
LogManager.getLogger(StatisticsJobManager.class);
 
-    // statistics job
-    private Map<Long, StatisticsJob> idToStatisticsJob = 
Maps.newConcurrentMap();
+    /**
+     * save statistics job status information
+     */
+    private final Map<Long, StatisticsJob> idToStatisticsJob = 
Maps.newConcurrentMap();
 
-    public void createStatisticsJob(AnalyzeStmt analyzeStmt) {
-        // step0: init statistics job by analyzeStmt
+    private final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock(true);
+
+    public void readLock() {
+        lock.readLock().lock();
+    }
+
+    public void readUnlock() {
+        lock.readLock().unlock();
+    }
+
+    private void writeLock() {
+        lock.writeLock().lock();
+    }
+
+    private void writeUnlock() {
+        lock.writeLock().unlock();
+    }
+
+    public Map<Long, StatisticsJob> getIdToStatisticsJob() {
+        return this.idToStatisticsJob;
+    }
+
+    public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws 
UserException {
+        // step1: init statistics job by analyzeStmt
         StatisticsJob statisticsJob = 
StatisticsJob.fromAnalyzeStmt(analyzeStmt);
-        // step1: get statistics to be analyzed
-        Set<Long> tableIdList = statisticsJob.relatedTableId();
-        // step2: check restrict
-        checkRestrict(tableIdList);
-        // step3: check permission
-        checkPermission();
-        // step4: create it
-        createStatisticsJob(statisticsJob);
+        writeLock();
+        try {
+            // step2: check restrict
+            this.checkRestrict(analyzeStmt.getDbId(), 
statisticsJob.getTblIds());
+            // step3: create it
+            this.createStatisticsJob(statisticsJob);
+        } finally {
+            writeUnlock();
+        }
     }
 
-    public void createStatisticsJob(StatisticsJob statisticsJob) {
-        idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
+    public void createStatisticsJob(StatisticsJob statisticsJob) throws 
DdlException {
+        this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
         try {
             
Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob);
         } catch (IllegalStateException e) {
             LOG.info("The pending statistics job is full. Please submit it 
again later.");
+            throw new DdlException("The pending statistics job is full, Please 
submit it again later.");
         }
     }
 
-    // Rule1: The same table cannot have two unfinished statistics jobs
-    // Rule2: The unfinished statistics job could not more then 
Config.max_statistics_job_num
-    // Rule3: The job for external table is not supported
-    private void checkRestrict(Set<Long> tableIdList) {
-        // TODO
+    /**
+     * The statistical job has the following restrict:
+     * - Rule1: The same table cannot have two unfinished statistics jobs
+     * - Rule2: The unfinished statistics job could not more than 
Config.max_statistics_job_num
+     * - Rule3: The job for external table is not supported
+     */
+    private void checkRestrict(long dbId, Set<Long> tableIds) throws 
AnalysisException {
+        Database db = 
Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId);
+        db.readLock();
+        try {
+            // check table type
+            for (Long tableId : tableIds) {
+                Table table = db.getTableOrAnalysisException(tableId);
+                if (table.getType() != Table.TableType.OLAP) {
+                    
ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_OLAP_TABLE, 
db.getFullName(), table.getName(), "ANALYZE");
+                }
+            }
+        } finally {
+            db.readUnlock();
+        }
+
+
+        int unfinishedJobs = 0;
+
+        // check table unfinished job
+        for (StatisticsJob statisticsJob : this.idToStatisticsJob.values()) {
+            StatisticsJob.JobState jobState = statisticsJob.getJobState();
+            Set<Long> tblIds = statisticsJob.getTblIds();
+            if (jobState == StatisticsJob.JobState.PENDING
+                    || jobState == StatisticsJob.JobState.SCHEDULING
+                    || jobState == StatisticsJob.JobState.RUNNING) {
+                for (Long tableId : tableIds) {
+                    if (tblIds.contains(tableId)) {
+                        throw new AnalysisException("The table(id=" + tableId 
+ ") have unfinished statistics jobs");
+                    }
+                }
+                unfinishedJobs++;
+            }
+        }
+
+        // check the number of unfinished tasks
+        if (unfinishedJobs > Config.cbo_max_statistics_job_num) {
+            throw new AnalysisException("The unfinished statistics job could 
not more than cbo_max_statistics_job_num: " +
+                    Config.cbo_max_statistics_job_num);
+        }
     }
 
-    private void checkPermission() {
-        // TODO
+    public void alterStatisticsJobInfo(Long jobId, Long taskId, String 
errorMsg)  {

Review Comment:
   This logic has nothing to do with the statistics manager, but it would be 
better to put it in statistics jobs.



##########
fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsTask.java:
##########
@@ -33,23 +37,108 @@
  * @granularityDesc: StatsGranularity=partition
  */
 public class StatisticsTask implements Callable<StatisticsTaskResult> {
+    protected static final Logger LOG = 
LogManager.getLogger(StatisticsTask.class);
+
+    public enum TaskState {
+        PENDING,
+        RUNNING,
+        FINISHED,
+        FAILED
+    }
+
     protected long id = Catalog.getCurrentCatalog().getNextId();;
     protected long jobId;
     protected StatsGranularityDesc granularityDesc;
     protected StatsCategoryDesc categoryDesc;
     protected List<StatsType> statsTypeList;
+    protected TaskState taskState = TaskState.PENDING;
 
-    public StatisticsTask(long jobId, StatsGranularityDesc granularityDesc,
-                          StatsCategoryDesc categoryDesc, List<StatsType> 
statsTypeList) {
+    protected final long createTime = System.currentTimeMillis();
+    protected long startTime = -1L;
+    protected long finishTime = -1L;
+
+    public StatisticsTask(long jobId,
+                          StatsGranularityDesc granularityDesc,
+                          StatsCategoryDesc categoryDesc,
+                          List<StatsType> statsTypeList) {
         this.jobId = jobId;
         this.granularityDesc = granularityDesc;
         this.categoryDesc = categoryDesc;
         this.statsTypeList = statsTypeList;
     }
 
+    public long getId() {
+        return this.id;
+    }
+
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    public long getJobId() {
+        return this.jobId;
+    }
+
+    public StatsGranularityDesc getGranularityDesc() {
+        return this.granularityDesc;
+    }
+
+    public StatsCategoryDesc getCategoryDesc() {
+        return this.categoryDesc;
+    }
+
+    public List<StatsType> getStatsTypeList() {
+        return this.statsTypeList;
+    }
+
+    public TaskState getTaskState() {
+        return this.taskState;
+    }
+
+    public long getCreateTime() {
+        return this.createTime;
+    }
+
+    public long getStartTime() {
+        return this.startTime;
+    }
+
+    public void setStartTime(long startTime) {
+        this.startTime = startTime;
+    }
+
+    public long getFinishTime() {
+        return this.finishTime;
+    }
+
+    public void setFinishTime(long finishTime) {
+        this.finishTime = finishTime;
+    }
+
     @Override
     public StatisticsTaskResult call() throws Exception {
-        // TODO
+        LOG.warn("execute invalid statistics task.");
         return null;
     }
+
+    public synchronized void updateTaskState(StatisticsTask.TaskState 
taskState) {
+        // PENDING -> RUNNING/FAILED
+        if (this.taskState == TaskState.PENDING) {
+            if (taskState == TaskState.RUNNING) {
+                this.taskState = TaskState.RUNNING;
+            } else if (taskState == TaskState.FAILED) {
+                this.taskState = TaskState.FAILED;
+            }
+            return;
+        }
+
+        // RUNNING -> FINISHED/FAILED
+        if (this.taskState == TaskState.RUNNING) {
+            if (taskState == TaskState.FINISHED) {
+                this.taskState = TaskState.FINISHED;
+            } else if (taskState == TaskState.FAILED) {
+                this.taskState = TaskState.FAILED;
+            }
+        }
+    }

Review Comment:
   If there is a invalid state switch, it will thrown exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to