weizhengte commented on code in PR #8860: URL: https://github.com/apache/incubator-doris/pull/8860#discussion_r855814433
########## fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java: ########## @@ -19,55 +19,160 @@ import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; -/* -For unified management of statistics job, -including job addition, cancellation, scheduling, etc. +/** + * For unified management of statistics job, + * including job addition, cancellation, scheduling, etc. */ public class StatisticsJobManager { private static final Logger LOG = LogManager.getLogger(StatisticsJobManager.class); - // statistics job - private Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap(); + /** + * save statistics job status information + */ + private final Map<Long, StatisticsJob> idToStatisticsJob = Maps.newConcurrentMap(); - public void createStatisticsJob(AnalyzeStmt analyzeStmt) { - // step0: init statistics job by analyzeStmt + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); + + public void readLock() { + lock.readLock().lock(); + } + + public void readUnlock() { + lock.readLock().unlock(); + } + + private void writeLock() { + lock.writeLock().lock(); + } + + private void writeUnlock() { + lock.writeLock().unlock(); + } + + public Map<Long, StatisticsJob> getIdToStatisticsJob() { + return this.idToStatisticsJob; + } + + public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException { + // step1: init statistics job by analyzeStmt StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt); - // step1: get statistics to be analyzed - Set<Long> tableIdList = statisticsJob.relatedTableId(); - // step2: check restrict - checkRestrict(tableIdList); - // step3: check permission - checkPermission(); - // step4: create it - createStatisticsJob(statisticsJob); + writeLock(); + try { + // step2: check restrict + this.checkRestrict(analyzeStmt.getDbId(), statisticsJob.getTblIds()); + // step3: create it + this.createStatisticsJob(statisticsJob); + } finally { + writeUnlock(); + } } - public void createStatisticsJob(StatisticsJob statisticsJob) { - idToStatisticsJob.put(statisticsJob.getId(), statisticsJob); + public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException { + this.idToStatisticsJob.put(statisticsJob.getId(), statisticsJob); try { Catalog.getCurrentCatalog().getStatisticsJobScheduler().addPendingJob(statisticsJob); } catch (IllegalStateException e) { LOG.info("The pending statistics job is full. Please submit it again later."); + throw new DdlException("The pending statistics job is full, Please submit it again later."); } } - // Rule1: The same table cannot have two unfinished statistics jobs - // Rule2: The unfinished statistics job could not more then Config.max_statistics_job_num - // Rule3: The job for external table is not supported - private void checkRestrict(Set<Long> tableIdList) { - // TODO + /** + * The statistical job has the following restrict: + * - Rule1: The same table cannot have two unfinished statistics jobs + * - Rule2: The unfinished statistics job could not more than Config.max_statistics_job_num + * - Rule3: The job for external table is not supported + */ + private void checkRestrict(long dbId, Set<Long> tableIds) throws AnalysisException { + Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(dbId); + db.readLock(); + try { + // check table type + for (Long tableId : tableIds) { + Table table = db.getTableOrAnalysisException(tableId); + if (table.getType() != Table.TableType.OLAP) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_NOT_OLAP_TABLE, db.getFullName(), table.getName(), "ANALYZE"); + } + } + } finally { + db.readUnlock(); + } + + + int unfinishedJobs = 0; + + // check table unfinished job + for (StatisticsJob statisticsJob : this.idToStatisticsJob.values()) { + StatisticsJob.JobState jobState = statisticsJob.getJobState(); + Set<Long> tblIds = statisticsJob.getTblIds(); + if (jobState == StatisticsJob.JobState.PENDING + || jobState == StatisticsJob.JobState.SCHEDULING + || jobState == StatisticsJob.JobState.RUNNING) { + for (Long tableId : tableIds) { + if (tblIds.contains(tableId)) { + throw new AnalysisException("The table(id=" + tableId + ") have unfinished statistics jobs"); + } + } + unfinishedJobs++; + } + } + + // check the number of unfinished tasks + if (unfinishedJobs > Config.cbo_max_statistics_job_num) { + throw new AnalysisException("The unfinished statistics job could not more than cbo_max_statistics_job_num: " + + Config.cbo_max_statistics_job_num); + } } - private void checkPermission() { - // TODO + public void alterStatisticsJobInfo(Long jobId, Long taskId, String errorMsg) { Review Comment: I have removed this method and do same thing elsewhere -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org