This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new c8d73369e4 KYLIN-5217 Create a initial commit
c8d73369e4 is described below

commit c8d73369e47013a564b263420d4250d87246e7f6
Author: junqing.cai <junqing....@kyligence.io>
AuthorDate: Tue Aug 9 14:27:42 2022 +0800

    KYLIN-5217 Create a initial commit
---
 .../apache/kylin/job/dao/ExecutableOutputPO.java   | 149 ++++++++++++
 .../org/apache/kylin/job/dao/ExecutablePO.java     | 152 ++++++++++++
 .../org/apache/kylin/job/dao/JobStatistics.java    |  77 ++++++
 .../apache/kylin/job/dao/JobStatisticsBasic.java   |  53 +++++
 .../apache/kylin/job/dao/JobStatisticsManager.java | 263 +++++++++++++++++++++
 .../org/apache/kylin/job/dao/NExecutableDao.java   | 133 +++++++++++
 .../runners/AbstractDefaultSchedulerRunner.java    |  93 ++++++++
 .../apache/kylin/job/runners/FetcherRunner.java    | 241 +++++++++++++++++++
 .../kylin/job/runners/ForkBasedJobRunner.java      |  48 ++++
 .../kylin/job/runners/InMemoryJobRunner.java       |  36 +++
 .../apache/kylin/job/runners/JobCheckRunner.java   |  90 +++++++
 .../org/apache/kylin/job/runners/JobRunner.java    |  76 ++++++
 .../apache/kylin/job/runners/JobRunnerFactory.java | 185 +++++++++++++++
 .../job/runners/LicenseCapacityCheckRunner.java    |  63 +++++
 .../kylin/job/runners/QuotaStorageCheckRunner.java |  77 ++++++
 .../cube/storage/GarbageStorageCollector.java      |  87 +++++++
 .../cube/storage/ProjectStorageInfoCollector.java  |  79 +++++++
 .../cube/storage/StorageInfoCollector.java         |  29 +++
 .../metadata/cube/storage/StorageInfoEnum.java     |  23 ++
 .../cube/storage/StorageQuotaCollector.java        |  35 +++
 .../metadata/cube/storage/StorageVolumeInfo.java   |  38 +++
 .../cube/storage/TotalStorageCollector.java        |  42 ++++
 22 files changed, 2069 insertions(+)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
new file mode 100644
index 0000000000..a3b69ab095
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ *
+ */
+@Setter
+@Getter
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
+public class ExecutableOutputPO implements Serializable {
+
+    @JsonIgnore
+    private InputStream contentStream;
+
+    @JsonProperty("content")
+    private String content;
+
+    @JsonProperty("log_path")
+    private String logPath;
+
+    @JsonProperty("status")
+    private String status = "READY";
+
+    @JsonProperty("info")
+    private Map<String, String> info = Maps.newHashMap();
+
+    @JsonProperty("last_modified")
+    private long lastModified;
+
+    @JsonProperty("createTime")
+    private long createTime = System.currentTimeMillis();
+
+    @JsonProperty("start_time")
+    private long startTime;
+
+    @JsonProperty("end_time")
+    private long endTime;
+
+    @JsonProperty("wait_time")
+    private long waitTime;
+
+    @JsonProperty("duration")
+    private long duration;
+
+    @JsonProperty("last_running_start_time")
+    private long lastRunningStartTime;
+
+    @JsonProperty("is_resumable")
+    private boolean resumable = false;
+
+    @JsonProperty("byte_size")
+    private long byteSize;
+
+    @JsonProperty("failed_msg")
+    private String failedMsg;
+
+    @JsonProperty("failed_step_id")
+    private String failedStepId;
+
+    @JsonProperty("failed_segment_id")
+    private String failedSegmentId;
+
+    @JsonProperty("failed_stack")
+    private String failedStack;
+
+    @JsonProperty("failed_reason")
+    private String failedReason;
+
+    public void addStartTime(long time) {
+        //when ready -> suicidal/discarded, the start time is 0
+        if (startTime == 0) {
+            startTime = time;
+        }
+        endTime = 0;
+    }
+
+    public void addEndTime(long time) {
+        Preconditions.checkArgument(startTime > 0L);
+        endTime = time;
+    }
+
+    public void addDuration(long time) {
+        if (time != 0 && time > lastRunningStartTime) {
+            duration = duration + time - lastRunningStartTime;
+        }
+    }
+
+    public void addLastRunningStartTime(long time) {
+        lastRunningStartTime = time;
+    }
+
+    public void resetTime() {
+        createTime = System.currentTimeMillis();
+        startTime = 0L;
+        endTime = 0L;
+        waitTime = 0L;
+        duration = 0L;
+        lastRunningStartTime = 0L;
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
new file mode 100644
index 0000000000..b396efd047
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import static 
org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_IDS_DELIMITER;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.JobTypeEnum;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ */
+@Setter
+@Getter
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
+public class ExecutablePO extends RootPersistentEntity {
+    public static final int HIGHEST_PRIORITY = 0;
+    public static final int DEFAULT_PRIORITY = 3;
+    public static final int LOWEST_PRIORITY = 4;
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("tasks")
+    private List<ExecutablePO> tasks;
+
+    @JsonProperty("type")
+    private String type;
+
+    @JsonProperty("handler_type")
+    private String handlerType;
+
+    @JsonProperty("params")
+    private Map<String, String> params = Maps.newHashMap();
+
+    @JsonProperty("segments")
+    private Set<NDataSegment> segments = Sets.newHashSet();
+
+    @JsonProperty("job_type")
+    private JobTypeEnum jobType;
+
+    @JsonProperty("data_range_start")
+    private long dataRangeStart;
+
+    @JsonProperty("data_range_end")
+    private long dataRangeEnd;
+
+    @JsonProperty("target_model")
+    private String targetModel;
+
+    @JsonProperty("target_segments")
+    private List<String> targetSegments;
+
+    @JsonProperty("output")
+    private ExecutableOutputPO output = new ExecutableOutputPO();
+
+    private String project;
+
+    @JsonProperty("target_partitions")
+    private Set<Long> targetPartitions = Sets.newHashSet();
+
+    @JsonProperty("priority")
+    private int priority = DEFAULT_PRIORITY;
+
+    @JsonProperty("tag")
+    private Object tag;
+
+    @JsonProperty("stages_map")
+    private Map<String, List<ExecutablePO>> stagesMap;
+
+    public void setPriority(int p) {
+        priority = isPriorityValid(p) ? p : DEFAULT_PRIORITY;
+    }
+
+    @Override
+    public String getResourcePath() {
+        return concatResourcePath(getUuid(), project);
+    }
+
+    public static String concatResourcePath(String name, String project) {
+        return new 
StringBuilder().append("/").append(project).append(ResourceStore.EXECUTABLE_JOB).append("/")
+                .append(name).toString();
+    }
+
+    public static boolean isPriorityValid(int priority) {
+        return priority >= HIGHEST_PRIORITY && priority <= LOWEST_PRIORITY;
+    }
+
+    public static boolean isHigherPriority(int p1, int p2) {
+        return p1 < p2;
+    }
+
+    public void addYarnApplicationJob(String appId) {
+        String oldAppIds = 
output.getInfo().getOrDefault(ExecutableConstants.YARN_APP_IDS, "");
+        Set<String> appIds = new 
HashSet<>(Arrays.asList(oldAppIds.split(YARN_APP_IDS_DELIMITER)));
+        if (!appIds.contains(appId)) {
+            String newAppIds = oldAppIds + (StringUtils.isEmpty(oldAppIds) ? 
"" : YARN_APP_IDS_DELIMITER) + appId;
+            output.getInfo().put(ExecutableConstants.YARN_APP_IDS, newAppIds);
+        }
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
new file mode 100644
index 0000000000..0b1928fbbf
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class JobStatistics extends JobStatisticsBasic {
+
+    @JsonProperty("date")
+    private long date;
+    @JsonProperty("model_stats")
+    private Map<String, JobStatisticsBasic> jobStatisticsByModels = 
Maps.newHashMap();
+
+    @Override
+    public String resourceName() {
+        return String.valueOf(date);
+    }
+
+    public JobStatistics(long date, String model, long duration, long 
byteSize) {
+        this.date = date;
+        setCount(1);
+        setTotalDuration(duration);
+        setTotalByteSize(byteSize);
+        jobStatisticsByModels.put(model, new JobStatisticsBasic(duration, 
byteSize));
+    }
+
+    public JobStatistics(int count, long totalDuration, long totalByteSize) {
+        setCount(count);
+        setTotalDuration(totalDuration);
+        setTotalByteSize(totalByteSize);
+    }
+
+    public JobStatistics(long date, long totalDuration, long totalByteSize) {
+        this.date = date;
+        setCount(1);
+        setTotalDuration(totalDuration);
+        setTotalByteSize(totalByteSize);
+    }
+
+    public void update(String model, long duration, long byteSize, int 
deltaCount) {
+        super.update(duration, byteSize, deltaCount);
+        JobStatisticsBasic jobStatisticsByModel = 
jobStatisticsByModels.get(model);
+        if (jobStatisticsByModel == null) {
+            jobStatisticsByModel = new JobStatisticsBasic(duration, byteSize);
+        } else {
+            jobStatisticsByModel.update(duration, byteSize, deltaCount);
+        }
+
+        jobStatisticsByModels.put(model, jobStatisticsByModel);
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
new file mode 100644
index 0000000000..c05743c3cf
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class JobStatisticsBasic extends RootPersistentEntity {
+    @JsonProperty("count")
+    private int count;
+    @JsonProperty("total_duration")
+    private long totalDuration;
+    @JsonProperty("total_byte_size")
+    private long totalByteSize;
+
+    public void update(long duration, long byteSize, int deltaCount) {
+        this.count += deltaCount;
+        this.totalDuration += duration;
+        this.totalByteSize += byteSize;
+    }
+
+    public JobStatisticsBasic(long totalDuration, long totalByteSize) {
+        this.count = 1;
+        this.totalDuration = totalDuration;
+        this.totalByteSize = totalByteSize;
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
new file mode 100644
index 0000000000..2f0a65bfd3
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.time.DayOfWeek;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAdjusters;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+
+import com.google.common.collect.Maps;
+
+public class JobStatisticsManager {
+
+    public static JobStatisticsManager getInstance(KylinConfig config, String 
project) {
+        return config.getManager(project, JobStatisticsManager.class);
+    }
+
+    // called by reflection
+    static JobStatisticsManager newInstance(KylinConfig conf, String project) {
+        try {
+            String cls = JobStatisticsManager.class.getName();
+            Class<? extends JobStatisticsManager> clz = ClassUtil.forName(cls, 
JobStatisticsManager.class);
+            return clz.getConstructor(KylinConfig.class, 
String.class).newInstance(conf, project);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to init DataModelManager from " 
+ conf, e);
+        }
+    }
+
+    // 
============================================================================
+
+    private KylinConfig config;
+    private String project;
+
+    private CachedCrudAssist<JobStatistics> crud;
+
+    public JobStatisticsManager(KylinConfig config, String project) {
+        init(config, project);
+    }
+
+    protected void init(KylinConfig cfg, final String project) {
+        this.config = cfg;
+        this.project = project;
+        final ResourceStore store = 
ResourceStore.getKylinMetaStore(this.config);
+        String resourceRootPath = "/" + this.project + 
ResourceStore.JOB_STATISTICS;
+        this.crud = new CachedCrudAssist<JobStatistics>(store, 
resourceRootPath, JobStatistics.class) {
+            @Override
+            protected JobStatistics initEntityAfterReload(JobStatistics 
jobStatistics, String resourceName) {
+                return jobStatistics;
+            }
+        };
+    }
+
+    public List<JobStatistics> getAll() {
+        return crud.listAll();
+    }
+
+    public JobStatistics updateStatistics(long date, String model, long 
duration, long byteSize, int deltaCount) {
+        JobStatistics jobStatistics = crud.get(String.valueOf(date));
+        JobStatistics jobStatisticsToUpdate;
+        if (jobStatistics == null) {
+            jobStatisticsToUpdate = new JobStatistics(date, model, duration, 
byteSize);
+            return crud.save(jobStatisticsToUpdate);
+        }
+
+        jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
+        jobStatisticsToUpdate.update(model, duration, byteSize, deltaCount);
+        return crud.save(jobStatisticsToUpdate);
+    }
+
+    public JobStatistics updateStatistics(long date, long duration, long 
byteSize, int deltaCount) {
+        JobStatistics jobStatistics = crud.get(String.valueOf(date));
+        JobStatistics jobStatisticsToUpdate;
+        if (jobStatistics == null) {
+            jobStatisticsToUpdate = new JobStatistics(date, duration, 
byteSize);
+            return crud.save(jobStatisticsToUpdate);
+        }
+
+        jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
+        jobStatisticsToUpdate.update(duration, byteSize, deltaCount);
+        return crud.save(jobStatisticsToUpdate);
+    }
+
+    public Pair<Integer, JobStatistics> getOverallJobStats(final long 
startTime, final long endTime) {
+        // filter
+        List<JobStatistics> filteredJobStats = 
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
+        // aggregate all job stats
+        JobStatistics aggregatedStats = aggregateJobStats(filteredJobStats);
+
+        return new Pair<>(aggregatedStats.getCount(), aggregatedStats);
+    }
+
+    public Map<String, Integer> getJobCountByTime(final long startTime, final 
long endTime,
+            final String timeDimension) {
+        Map<String, Integer> result = Maps.newHashMap();
+        aggregateJobStatsByTime(startTime, endTime, 
timeDimension).forEach((key, value) -> {
+            result.put(key, value.getCount());
+        });
+        return result;
+    }
+
+    public Map<String, Integer> getJobCountByModel(long startTime, long 
endTime) {
+        Map<String, Integer> result = Maps.newHashMap();
+
+        aggregateStatsByModel(startTime, endTime).forEach((modelName, value) 
-> {
+            String modelAlias = getModelAlias(modelName);
+            if (modelAlias == null)
+                return;
+            result.put(modelAlias, value.getCount());
+        });
+
+        return result;
+    }
+
+    public Map<String, Double> getDurationPerByteByTime(final long startTime, 
final long endTime,
+            final String timeDimension) {
+        Map<String, JobStatisticsBasic> aggregateResult = 
aggregateJobStatsByTime(startTime, endTime, timeDimension);
+        return calculateDurationPerByte(aggregateResult);
+    }
+
+    public Map<String, Double> getDurationPerByteByModel(long startTime, long 
endTime) {
+        Map<String, JobStatisticsBasic> transformedResult = Maps.newHashMap();
+
+        aggregateStatsByModel(startTime, endTime).forEach((modelName, value) 
-> {
+            String modelAlias = getModelAlias(modelName);
+            if (modelAlias == null)
+                return;
+            transformedResult.put(modelAlias,
+                    new JobStatisticsBasic(value.getTotalDuration(), 
value.getTotalByteSize()));
+        });
+
+        return calculateDurationPerByte(transformedResult);
+    }
+
+    private String getModelAlias(String modelId) {
+        NDataModelManager dataModelManager = 
NDataModelManager.getInstance(config, project);
+        NDataModel model = dataModelManager.getDataModelDesc(modelId);
+        if (model == null)
+            return null;
+
+        return model.getAlias();
+    }
+
+    private JobStatistics aggregateJobStats(List<JobStatistics> 
jobStatisticsToAggregate) {
+        return jobStatisticsToAggregate.stream()
+                .reduce((x, y) -> new JobStatistics(x.getCount() + 
y.getCount(),
+                        x.getTotalDuration() + y.getTotalDuration(), 
x.getTotalByteSize() + y.getTotalByteSize()))
+                .orElse(new JobStatistics());
+    }
+
+    // key is the date, value is the aggregated job stats
+    private Map<String, JobStatisticsBasic> aggregateJobStatsByTime(final long 
startTime, final long endTime,
+            final String timeDimension) {
+        Map<String, JobStatisticsBasic> result = Maps.newHashMap();
+
+        List<JobStatistics> qulifiedJobStats = 
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
+
+        long startDate = startTime;
+        while (startDate <= endTime) {
+            long nextDate = nextDate(startDate, timeDimension);
+            List<JobStatistics> list = 
getFilteredJobStatsByTime(qulifiedJobStats, startDate, nextDate);
+            result.put(formatDateTime(startDate), aggregateJobStats(list));
+            startDate = nextDate;
+        }
+
+        return result;
+    }
+
+    // format epoch time to date string
+    private String formatDateTime(long time) {
+        ZoneId zoneId = TimeZone.getDefault().toZoneId();
+        LocalDateTime localDateTime = 
Instant.ofEpochMilli(time).atZone(zoneId).toLocalDateTime();
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd",
+                Locale.getDefault(Locale.Category.FORMAT));
+        return localDateTime.format(formatter);
+    }
+
+    private long nextDate(final long date, final String timeDimension) {
+        ZoneId zoneId = TimeZone.getTimeZone(config.getTimeZone()).toZoneId();
+        LocalDate localDate = 
Instant.ofEpochMilli(date).atZone(zoneId).toLocalDate();
+        switch (timeDimension) {
+        case "day":
+            localDate = localDate.plusDays(1);
+            break;
+        case "week":
+            localDate = 
localDate.with(TemporalAdjusters.next(DayOfWeek.MONDAY));
+            break;
+        case "month":
+            localDate = 
localDate.with(TemporalAdjusters.firstDayOfNextMonth());
+            break;
+        default:
+            localDate = localDate.plusDays(1);
+            break;
+        }
+
+        return localDate.atStartOfDay(zoneId).toInstant().toEpochMilli();
+    }
+
+    private Map<String, JobStatisticsBasic> aggregateStatsByModel(long 
startTime, long endTime) {
+        return getFilteredJobStatsByTime(crud.listAll(), startTime, 
endTime).stream()
+                .map(JobStatistics::getJobStatisticsByModels).reduce((x, y) -> 
{
+                    // merge two maps
+                    Map<String, JobStatisticsBasic> mergedMap = 
Maps.newHashMap(x);
+                    y.forEach((k, v) -> mergedMap.merge(k, v,
+                            (value1, value2) -> new 
JobStatisticsBasic(value1.getCount() + value2.getCount(),
+                                    value1.getTotalDuration() + 
value2.getTotalDuration(),
+                                    value1.getTotalByteSize() + 
value2.getTotalByteSize())));
+                    return mergedMap;
+                }).orElse(Maps.newHashMap());
+    }
+
+    private List<JobStatistics> getFilteredJobStatsByTime(final 
List<JobStatistics> list, final long startTime,
+            final long endTime) {
+        return list.stream()
+                .filter(singleStats -> singleStats.getDate() >= startTime && 
singleStats.getDate() < endTime)
+                .collect(Collectors.toList());
+    }
+
+    private Map<String, Double> calculateDurationPerByte(Map<String, 
JobStatisticsBasic> totalMetricMap) {
+        Map<String, Double> result = Maps.newHashMap();
+        for (Map.Entry<String, JobStatisticsBasic> entry : 
totalMetricMap.entrySet()) {
+            double totalDuration = entry.getValue().getTotalDuration();
+            double totalByteSize = entry.getValue().getTotalByteSize();
+            if (totalByteSize == 0)
+                result.put(entry.getKey(), .0);
+            else {
+                result.put(entry.getKey(), totalDuration / totalByteSize);
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
new file mode 100644
index 0000000000..5c353d989c
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
+
+/**
+ */
+public class NExecutableDao {
+
+    private static final Serializer<ExecutablePO> JOB_SERIALIZER = new 
JsonSerializer<>(ExecutablePO.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(NExecutableDao.class);
+
+    public static NExecutableDao getInstance(KylinConfig config, String 
project) {
+        return config.getManager(project, NExecutableDao.class);
+    }
+
+    // called by reflection
+    static NExecutableDao newInstance(KylinConfig config, String project) {
+        return new NExecutableDao(config, project);
+    }
+    // 
============================================================================
+
+    private String project;
+
+    private KylinConfig config;
+
+    private CachedCrudAssist<ExecutablePO> crud;
+
+    private NExecutableDao(KylinConfig config, String project) {
+        logger.trace("Using metadata url: {}", config);
+        this.project = project;
+        this.config = config;
+        String resourceRootPath = "/" + project + 
ResourceStore.EXECUTE_RESOURCE_ROOT;
+        this.crud = new CachedCrudAssist<ExecutablePO>(getStore(), 
resourceRootPath, "", ExecutablePO.class) {
+            @Override
+            protected ExecutablePO initEntityAfterReload(ExecutablePO entity, 
String resourceName) {
+                entity.setProject(project);
+                return entity;
+            }
+        };
+    }
+
+    public List<ExecutablePO> getJobs() {
+        return crud.listAll();
+    }
+
+    public List<ExecutablePO> getPartialJobs(Predicate<String> predicate) {
+        return crud.listPartial(predicate);
+    }
+
+    public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) {
+        return crud.listAll().stream()
+                .filter(x -> x.getLastModified() >= timeStart && 
x.getLastModified() < timeEndExclusive)
+                .collect(Collectors.toList());
+    }
+
+    public List<String> getJobIds() {
+        return 
crud.listAll().stream().sorted(Comparator.comparing(ExecutablePO::getCreateTime))
+                
.sorted(Comparator.comparing(ExecutablePO::getPriority)).map(RootPersistentEntity::resourceName)
+                .collect(Collectors.toList());
+    }
+
+    public ExecutablePO getJobByUuid(String uuid) {
+        return crud.get(uuid);
+    }
+
+    public ExecutablePO addJob(ExecutablePO job) {
+        if (getJobByUuid(job.getUuid()) != null) {
+            throw new IllegalArgumentException("job id:" + job.getUuid() + " 
already exists");
+        }
+        crud.save(job);
+        return job;
+    }
+
+    // for ut
+    @VisibleForTesting
+    public void deleteAllJob() {
+        getJobs().forEach(job -> deleteJob(job.getId()));
+    }
+
+    public void deleteJob(String uuid) {
+        crud.delete(uuid);
+    }
+
+    public void updateJob(String uuid, Predicate<ExecutablePO> updater) {
+        val job = getJobByUuid(uuid);
+        Preconditions.checkNotNull(job);
+        val copyForWrite = JsonUtil.copyBySerialization(job, JOB_SERIALIZER, 
null);
+        if (updater.test(copyForWrite)) {
+            crud.save(copyForWrite);
+        }
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getKylinMetaStore(config);
+    }
+
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
new file mode 100644
index 0000000000..a8c6f3c973
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.logging.SetLogCategory;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.SneakyThrows;
+
+public abstract class AbstractDefaultSchedulerRunner implements Runnable {
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractDefaultSchedulerRunner.class);
+    protected final NDefaultScheduler nDefaultScheduler;
+
+    protected final ExecutableContext context;
+
+    protected final String project;
+
+    public AbstractDefaultSchedulerRunner(final NDefaultScheduler 
nDefaultScheduler) {
+        this.nDefaultScheduler = nDefaultScheduler;
+        this.context = nDefaultScheduler.getContext();
+        this.project = nDefaultScheduler.getProject();
+    }
+
+    @SneakyThrows
+    private boolean checkEpochIdFailed() {
+        //check failed if isInterrupted
+        if (Thread.currentThread().isInterrupted()) {
+            throw new InterruptedException();
+        }
+
+        if (!KylinConfig.getInstanceFromEnv().isUTEnv()
+                && 
!EpochManager.getInstance().checkEpochId(context.getEpochId(), project)) {
+            nDefaultScheduler.forceShutdown();
+            return true;
+        }
+        return false;
+    }
+
+    // stop job if Storage Quota Limit reached
+    protected void stopJobIfSQLReached(String jobId) {
+        if (context.isReachQuotaLimit()) {
+            try {
+                EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+                    if (context.isReachQuotaLimit()) {
+                        
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).pauseJob(jobId);
+                        logger.info("Job {} paused due to no available storage 
quota.", jobId);
+                        logger.info("Please clean up low-efficient storage in 
time, "
+                                + "increase the low-efficient storage 
threshold, "
+                                + "or notify the administrator to increase the 
storage quota for this project.");
+                    }
+                    return null;
+                }, project, UnitOfWork.DEFAULT_MAX_RETRY, 
context.getEpochId(), jobId);
+            } catch (Exception e) {
+                logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} 
failed to pause", project, jobId, e);
+            }
+        }
+    }
+
+    @Override
+    public void run() {
+        try (SetLogCategory ignored = new SetLogCategory("schedule")) {
+            if (checkEpochIdFailed()) {
+                return;
+            }
+            doRun();
+        }
+    }
+
+    protected abstract void doRun();
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
new file mode 100644
index 0000000000..c9bb123a45
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+
+public class FetcherRunner extends AbstractDefaultSchedulerRunner {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(FetcherRunner.class);
+
+    private final ExecutorService jobPool;
+
+    private final ScheduledExecutorService fetcherPool;
+
+    public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService 
jobPool,
+            ScheduledExecutorService fetcherPool) {
+        super(nDefaultScheduler);
+        this.jobPool = jobPool;
+        this.fetcherPool = fetcherPool;
+    }
+
+    private boolean checkSuicide(String jobId) {
+        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        if (executableManager.getJob(jobId).getStatus().isFinalState()) {
+            return false;
+        }
+        return executableManager.getJob(jobId).checkSuicide();
+    }
+
+    private boolean markSuicideJob(String jobId) {
+        try {
+            if (checkSuicide(jobId)) {
+                return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() 
-> {
+                    if (checkSuicide(jobId)) {
+                        
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).suicideJob(jobId);
+                        return true;
+                    }
+                    return false;
+                }, project, UnitOfWork.DEFAULT_MAX_RETRY, 
context.getEpochId(), jobId);
+            }
+        } catch (Exception e) {
+            logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should 
be suicidal but discard failed", project,
+                    jobId, e);
+        }
+        return false;
+    }
+
+    private boolean markErrorJob(String jobId) {
+        try {
+            return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+                val manager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+                manager.errorJob(jobId);
+                return true;
+            }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), 
jobId);
+        } catch (Exception e) {
+            logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should 
be error but mark failed", project,
+                    jobId, e);
+        }
+        return false;
+    }
+
+    @Override
+    public void doRun() {
+        try {
+            val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+            Map<String, Executable> runningJobs = context.getRunningJobs();
+
+            int nRunning = 0;
+            int nReady = 0;
+            int nStopped = 0;
+            int nOthers = 0;
+            int nError = 0;
+            int nDiscarded = 0;
+            int nSucceed = 0;
+            int nSuicidal = 0;
+            for (final String id : executableManager.getJobs()) {
+                if (markSuicideJob(id)) {
+                    nSuicidal++;
+                    continue;
+                }
+
+                if (runningJobs.containsKey(id)) {
+
+                    // this is very important to prevent from same job being 
scheduled at same time.
+                    // e.g. when a job is restarted, the old job may still be 
running (even if we tried to interrupt it)
+                    // until the old job is finished, the new job should not 
start
+                    nRunning++;
+                    continue;
+                }
+
+                final Output output = executableManager.getOutput(id);
+
+                switch (output.getState()) {
+                case READY:
+                    nReady++;
+                    if (isJobPoolFull()) {
+                        break;
+                    }
+
+                    if (context.isReachQuotaLimit()) {
+                        stopJobIfSQLReached(id);
+                        break;
+                    }
+
+                    logger.info("fetcher schedule {} ", id);
+                    scheduleJob(id);
+                    break;
+                case DISCARDED:
+                    nDiscarded++;
+                    break;
+                case ERROR:
+                    nError++;
+                    break;
+                case SUCCEED:
+                    nSucceed++;
+                    break;
+                case PAUSED:
+                    nStopped++;
+                    break;
+                case SUICIDAL:
+                    nSuicidal++;
+                    break;
+                default:
+                    if (allSubTasksSuccess(id)) {
+                        logger.info("All sub tasks are successful, reschedule 
job {}", id);
+                        scheduleJob(id);
+                        break;
+                    }
+                    logger.warn("Unexpected status for {} <{}>", id, 
output.getState());
+                    if (markErrorJob(id)) {
+                        nError++;
+                    } else {
+                        nOthers++;
+                    }
+                    break;
+                }
+            }
+
+            logger.info(
+                    "Job Status in project {}: {} should running, {} actual 
running, {} stopped, {} ready, {} already succeed, {} error, {} discarded, {} 
suicidal,  {} others",
+                    project, nRunning, runningJobs.size(), nStopped, nReady, 
nSucceed, nError, nDiscarded, nSuicidal,
+                    nOthers);
+        } catch (Exception e) {
+            logger.warn("Job Fetcher caught a exception ", e);
+        }
+    }
+
+    private boolean allSubTasksSuccess(String id) {
+        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+
+        // check special case, all sub task success, show make current job to 
success
+        AbstractExecutable job = executableManager.getJob(id);
+        if (job instanceof DefaultChainedExecutable) {
+            return ((DefaultChainedExecutable) job).getTasks().stream()
+                    .allMatch(abstractExecutable -> 
abstractExecutable.getStatus() == ExecutableState.SUCCEED);
+        }
+
+        return false;
+    }
+
+    private void scheduleJob(String id) {
+        AbstractExecutable executable = null;
+        String jobDesc = null;
+
+        boolean memoryLock = false;
+        int useMemoryCapacity = 0;
+        try {
+            val config = KylinConfig.getInstanceFromEnv();
+            val executableManager = NExecutableManager.getInstance(config, 
project);
+            executable = executableManager.getJob(id);
+            if (!config.getDeployMode().equals("cluster")) {
+                useMemoryCapacity = executable.computeStepDriverMemory();
+            }
+            memoryLock = 
NDefaultScheduler.getMemoryRemaining().tryAcquire(useMemoryCapacity);
+            if (memoryLock) {
+                jobDesc = executable.toString();
+                logger.info("{} prepare to schedule", jobDesc);
+                context.addRunningJob(executable);
+                jobPool.execute(new JobRunner(nDefaultScheduler, executable, 
this));
+                logger.info("{} scheduled", jobDesc);
+            } else {
+                logger.info("memory is not enough, remaining: {} MB",
+                        
NDefaultScheduler.getMemoryRemaining().availablePermits());
+            }
+        } catch (Exception ex) {
+            if (executable != null) {
+                context.removeRunningJob(executable);
+                if (memoryLock) {
+                    // may release twice when exception raise after jobPool 
execute executable
+                    
NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity);
+                }
+            }
+            logger.warn("{} fail to schedule", jobDesc, ex);
+        }
+    }
+
+    private boolean isJobPoolFull() {
+        if (context.getRunningJobs().size() >= 
nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
+            logger.warn("There are too many jobs running, Job Fetch will wait 
until next schedule time.");
+            return true;
+        }
+        return false;
+    }
+
+    void scheduleNext() {
+        fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
new file mode 100644
index 0000000000..fc29a5ebb4
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.ExecutableApplication;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ForkBasedJobRunner extends JobRunnerFactory.AbstractJobRunner {
+
+    private final CliCommandExecutor cliExecutor = new CliCommandExecutor();
+
+    public ForkBasedJobRunner(KylinConfig kylinConfig, String project, 
List<String> originResources) {
+        super(kylinConfig, project, originResources);
+    }
+
+    @Override
+    protected void doExecute(ExecutableApplication app, Map<String, String> 
args) throws Exception {
+        String finalCommand = String.format(Locale.ROOT, "bash -x 
%s/sbin/bootstrap.sh %s %s 2>>%s",
+                KylinConfig.getKylinHome(), app.getClass().getName(), 
formatArgs(args), getJobTmpDir() + "/job.log");
+        log.info("Try to execute {}", finalCommand);
+        cliExecutor.execute(finalCommand, new BufferedLogger(log), jobId);
+    }
+
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
new file mode 100644
index 0000000000..119c3313e3
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ExecutableApplication;
+
+public class InMemoryJobRunner extends JobRunnerFactory.AbstractJobRunner {
+
+    public InMemoryJobRunner(KylinConfig config, String project, List<String> 
resources) {
+        super(config, project, resources);
+    }
+
+    @Override
+    public void doExecute(ExecutableApplication app, Map<String, String> args) 
throws Exception {
+        app.execute(formatArgs(args).split(" "));
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
new file mode 100644
index 0000000000..195a244900
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+
+public class JobCheckRunner extends AbstractDefaultSchedulerRunner {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JobCheckRunner.class);
+
+    public JobCheckRunner(NDefaultScheduler nDefaultScheduler) {
+        super(nDefaultScheduler);
+    }
+
+    private boolean checkTimeoutIfNeeded(String jobId, Long startTime) {
+        Integer timeOutMinute = 
KylinConfig.getInstanceFromEnv().getSchedulerJobTimeOutMinute();
+        if (timeOutMinute == 0) {
+            return false;
+        }
+        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        if (executableManager.getJob(jobId).getStatus().isFinalState()) {
+            return false;
+        }
+        long duration = System.currentTimeMillis() - startTime;
+        long durationMins = Math.toIntExact(duration / (60 * 1000));
+        return durationMins >= timeOutMinute;
+    }
+
+    private boolean discardTimeoutJob(String jobId, Long startTime) {
+        try {
+            if (checkTimeoutIfNeeded(jobId, startTime)) {
+                return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() 
-> {
+                    if (checkTimeoutIfNeeded(jobId, startTime)) {
+                        logger.error("project {} job {} running timeout.", 
project, jobId);
+                        
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).errorJob(jobId);
+                        return true;
+                    }
+                    return false;
+                }, project, UnitOfWork.DEFAULT_MAX_RETRY, 
context.getEpochId(), jobId);
+            }
+        } catch (Exception e) {
+            logger.warn("[UNEXPECTED_THINGS_HAPPENED] project " + project + " 
job " + jobId
+                    + " should be timeout but discard failed", e);
+        }
+        return false;
+    }
+
+    @Override
+    protected void doRun() {
+
+        logger.info("start check project {} job pool.", project);
+
+        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        Map<String, Executable> runningJobs = context.getRunningJobs();
+        Map<String, Long> runningJobInfos = context.getRunningJobInfos();
+        for (final String id : executableManager.getJobs()) {
+            if (runningJobs.containsKey(id)) {
+                discardTimeoutJob(id, runningJobInfos.get(id));
+                stopJobIfSQLReached(id);
+            }
+        }
+
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
new file mode 100644
index 0000000000..3e839f57f4
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
+import org.apache.kylin.job.exception.JobStoppedVoluntarilyException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.common.logging.SetLogCategory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+import lombok.var;
+
+public class JobRunner extends AbstractDefaultSchedulerRunner {
+    private static final Logger logger = 
LoggerFactory.getLogger(JobRunner.class);
+
+    private final AbstractExecutable executable;
+
+    private final FetcherRunner fetcherRunner;
+
+    public JobRunner(NDefaultScheduler nDefaultScheduler, AbstractExecutable 
executable, FetcherRunner fetcherRunner) {
+        super(nDefaultScheduler);
+        this.fetcherRunner = fetcherRunner;
+        this.executable = executable;
+    }
+
+    @Override
+    protected void doRun() {
+        //only the first 8 chars of the job uuid
+        val jobIdSimple = executable.getId().substring(0, 8);
+        try (SetThreadName ignored = new 
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
+                SetLogCategory logCategory = new SetLogCategory("schedule")) {
+            executable.execute(context);
+            // trigger the next step asap
+            fetcherRunner.scheduleNext();
+        } catch (JobStoppedVoluntarilyException | 
JobStoppedNonVoluntarilyException e) {
+            logger.info("Job quits either voluntarily or non-voluntarily,job: 
{}", jobIdSimple, e);
+        } catch (ExecuteException e) {
+            logger.error("ExecuteException occurred while job: " + 
executable.getId(), e);
+        } catch (Exception e) {
+            logger.error("unknown error execute job: " + executable.getId(), 
e);
+        } finally {
+            context.removeRunningJob(executable);
+            val config = KylinConfig.getInstanceFromEnv();
+            var usingMemory = 0;
+            if (!config.getDeployMode().equals("cluster")) {
+                usingMemory = executable.computeStepDriverMemory();
+            }
+            logger.info("Before job:{} global memory release {}", jobIdSimple,
+                    NDefaultScheduler.getMemoryRemaining().availablePermits());
+            NDefaultScheduler.getMemoryRemaining().release(usingMemory);
+            logger.info("After job:{} global memory release {}", jobIdSimple,
+                    NDefaultScheduler.getMemoryRemaining().availablePermits());
+        }
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
new file mode 100644
index 0000000000..0713558d35
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ExecutableApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.ZipFileUtils;
+import org.apache.kylin.common.persistence.metadata.MetadataStore;
+import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class JobRunnerFactory {
+
+    private JobRunnerFactory() {
+        // Just implement it
+    }
+
+    public static AbstractJobRunner createRunner(KylinConfig config, String 
type, String project,
+            List<String> resources) {
+        switch (type) {
+        case "fork":
+            return new ForkBasedJobRunner(config, project, resources);
+        case "in-memory":
+            return new InMemoryJobRunner(config, project, resources);
+        default:
+            throw new NotImplementedException("Runner type " + type + " not 
implement");
+        }
+    }
+
+    @RequiredArgsConstructor
+    public abstract static class AbstractJobRunner {
+
+        protected final KylinConfig kylinConfig;
+        protected final String project;
+        protected final List<String> originResources;
+        private Consumer<Properties> configUpdater;
+
+        protected String metaDumpUrl;
+        protected String jobId;
+
+        public String prepareEnv(String jobId) throws IOException {
+            this.jobId = jobId;
+            val jobTmpDir = getJobTmpDir();
+            FileUtils.forceMkdir(new File(jobTmpDir));
+
+            metaDumpUrl = kylinConfig.getMetadataUrlPrefix() + 
"@hdfs,path=file://" + jobTmpDir + "/meta";
+            return jobTmpDir;
+        }
+
+        public void start(ExecutableApplication app, Map<String, String> args) 
throws Exception {
+            attachMetadataAndKylinProps(false);
+            args.put("meta", metaDumpUrl);
+            args.put("metaOutput", metaDumpUrl + "_output");
+            doExecute(app, args);
+            uploadJobLog();
+        }
+
+        public void cleanupEnv() {
+            FileUtils.deleteQuietly(new File(getJobTmpDir()));
+        }
+
+        protected abstract void doExecute(ExecutableApplication app, 
Map<String, String> args) throws Exception;
+
+        protected String formatArgs(Map<String, String> args) {
+            return args.entrySet().stream().map(entry -> "--" + entry.getKey() 
+ "=" + entry.getValue())
+                    .collect(Collectors.joining(" "));
+        }
+
+        public String getJobTmpDir() {
+            return KylinConfigBase.getKylinHome() + "/tmp/" + jobId;
+        }
+
+        public void setConfigUpdater(Consumer<Properties> consumer) {
+            this.configUpdater = consumer;
+        }
+
+        protected boolean uploadJobLog() {
+            val jobContentZip = getJobTmpDir() + ".zip";
+            try {
+                ZipFileUtils.compressZipFile(getJobTmpDir(), jobContentZip);
+                val jobDir = kylinConfig.getJobTmpDir(project, true);
+                val fs = HadoopUtil.getFileSystem(jobDir);
+
+                try (val in = new FileInputStream(new File(jobContentZip));
+                        val out = fs.create(new Path(jobDir + jobId + ".zip"), 
true)) {
+                    IOUtils.copy(in, out);
+                }
+                return true;
+            } catch (IOException e) {
+                log.warn("Upload Job Evidence failed {}", jobId, e);
+            } finally {
+                FileUtils.deleteQuietly(new File(jobContentZip));
+            }
+            return false;
+        }
+
+        protected void attachMetadataAndKylinProps(boolean kylinPropsOnly) 
throws IOException {
+            if (StringUtils.isEmpty(metaDumpUrl)) {
+                throw new RuntimeException("Missing metaUrl");
+            }
+
+            File tmpDir = File.createTempFile("kylin_job_meta", "");
+            try {
+                org.apache.commons.io.FileUtils.forceDelete(tmpDir); // we 
need a directory, so delete the file first
+
+                Properties props = kylinConfig.exportToProperties();
+                props.setProperty("kylin.metadata.url", metaDumpUrl);
+                if (configUpdater != null) {
+                    configUpdater.accept(props);
+                }
+
+                if (kylinPropsOnly) {
+                    ResourceStore.dumpKylinProps(tmpDir, props);
+                } else {
+                    // The way of Updating metadata is CopyOnWrite. So it is 
safe to use Reference in the value.
+                    Map<String, RawResource> dumpMap = 
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(
+                            UnitOfWorkParams.<Map> 
builder().readonly(true).unitName(project).processor(() -> {
+                                Map<String, RawResource> retMap = 
Maps.newHashMap();
+                                for (String resPath : originResources) {
+                                    ResourceStore resourceStore = 
ResourceStore.getKylinMetaStore(kylinConfig);
+                                    RawResource rawResource = 
resourceStore.getResource(resPath);
+                                    retMap.put(resPath, rawResource);
+                                }
+                                return retMap;
+                            }).build());
+
+                    if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) {
+                        return;
+                    }
+                    // dump metadata
+                    ResourceStore.dumpResourceMaps(kylinConfig, tmpDir, 
dumpMap, props);
+                }
+
+                // copy metadata to target metaUrl
+                KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+                
MetadataStore.createMetadataStore(dstConfig).uploadFromFile(tmpDir);
+                // clean up
+                log.debug("Copied metadata to the target metaUrl, delete the 
temp dir: {}", tmpDir);
+            } finally {
+                FileUtils.forceDelete(tmpDir);
+            }
+        }
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
new file mode 100644
index 0000000000..1b51dd2c3b
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import static 
org.apache.kylin.common.exception.CommonErrorCode.LICENSE_OVER_CAPACITY;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class LicenseCapacityCheckRunner extends AbstractDefaultSchedulerRunner 
{
+    private static final Logger logger = 
LoggerFactory.getLogger(LicenseCapacityCheckRunner.class);
+
+    public LicenseCapacityCheckRunner(NDefaultScheduler nDefaultScheduler) {
+        super(nDefaultScheduler);
+    }
+
+    @Override
+    protected void doRun() {
+        logger.info("start check license capacity for project {}", project);
+        context.setLicenseOverCapacity(isLicenseOverCapacity());
+    }
+
+    private boolean isLicenseOverCapacity() {
+        val sourceUsageManager = 
SourceUsageManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+        try {
+            sourceUsageManager.checkIsOverCapacity(project);
+        } catch (KylinException e) {
+            if (LICENSE_OVER_CAPACITY.toErrorCode() == e.getErrorCode()) {
+                logger.warn("Source usage over capacity, no job will be 
scheduled.", e);
+                return true;
+            }
+        } catch (Throwable e) {
+            logger.warn("Check source usage over capacity failed.", e);
+        }
+
+        // not over capacity
+        return false;
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
new file mode 100644
index 0000000000..c7f3140671
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector;
+import org.apache.kylin.metadata.cube.storage.StorageInfoEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import lombok.val;
+import lombok.var;
+
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class QuotaStorageCheckRunner extends AbstractDefaultSchedulerRunner {
+    private static final Logger logger = 
LoggerFactory.getLogger(QuotaStorageCheckRunner.class);
+
+    private final ProjectStorageInfoCollector collector;
+
+    public QuotaStorageCheckRunner(NDefaultScheduler nDefaultScheduler) {
+        super(nDefaultScheduler);
+
+        val storageInfoEnumList = 
Lists.newArrayList(StorageInfoEnum.STORAGE_QUOTA, 
StorageInfoEnum.TOTAL_STORAGE);
+        collector = new ProjectStorageInfoCollector(storageInfoEnumList);
+    }
+
+    @Override
+    protected void doRun() {
+        logger.info("start check project {} storage quota.", 
nDefaultScheduler.getProject());
+        context.setReachQuotaLimit(reachStorageQuota());
+    }
+
+    private boolean reachStorageQuota() {
+        var storageVolumeInfo = 
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
+                nDefaultScheduler.getProject());
+        var totalSize = storageVolumeInfo.getTotalStorageSize();
+        int retryCount = 3;
+        while (retryCount-- > 0 && totalSize < 0) {
+            storageVolumeInfo = 
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
+                    nDefaultScheduler.getProject());
+            totalSize = storageVolumeInfo.getTotalStorageSize();
+        }
+        val storageQuotaSize = storageVolumeInfo.getStorageQuotaSize();
+        if (totalSize < 0) {
+            logger.error(
+                    "Project '{}' : an exception occurs when getting storage 
volume info, no job will be scheduled!!! The error info : {}",
+                    nDefaultScheduler.getProject(),
+                    
storageVolumeInfo.getThrowableMap().get(StorageInfoEnum.TOTAL_STORAGE));
+            return true;
+        }
+        if (totalSize >= storageQuotaSize) {
+            logger.info("Project '{}' reach storage quota, no job will be 
scheduled!!!",
+                    nDefaultScheduler.getProject());
+            return true;
+        }
+        return false;
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
new file mode 100644
index 0000000000..5fb6ecc8ff
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.optimization.IndexOptimizer;
+import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory;
+import org.apache.kylin.metadata.model.NDataModel;
+
+import com.google.common.collect.Maps;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class GarbageStorageCollector implements StorageInfoCollector {
+
+    @Override
+    public void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) {
+        Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap();
+        long storageSize = 0L;
+
+        for (val model : getModels(project)) {
+            val dataflow = getDataflow(model).copy();
+
+            final IndexOptimizer indexOptimizer = 
IndexOptimizerFactory.getOptimizer(dataflow, false);
+            val garbageLayouts = 
indexOptimizer.getGarbageLayoutMap(dataflow).keySet();
+            if (CollectionUtils.isNotEmpty(garbageLayouts)) {
+                storageSize += calculateLayoutSize(garbageLayouts, dataflow);
+                garbageIndexMap.put(model.getId(), garbageLayouts);
+            }
+        }
+
+        storageVolumeInfo.setGarbageModelIndexMap(garbageIndexMap);
+        storageVolumeInfo.setGarbageStorageSize(storageSize);
+    }
+
+    private List<NDataModel> getModels(String project) {
+        val dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        return dataflowManager.listUnderliningDataModels();
+    }
+
+    private NDataflow getDataflow(NDataModel model) {
+        val dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), 
model.getProject());
+        return dataflowManager.getDataflow(model.getUuid());
+    }
+
+    private long calculateLayoutSize(Set<Long> cuboidLayoutIdSet, NDataflow 
dataflow) {
+        long cuboidLayoutSize = 0L;
+        for (NDataSegment segment : 
dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING)) {
+            for (Long cuboidLayoutId : cuboidLayoutIdSet) {
+                NDataLayout dataCuboid = 
segment.getSegDetails().getLayoutById(cuboidLayoutId);
+                if (dataCuboid != null) {
+                    cuboidLayoutSize += dataCuboid.getByteSize();
+                }
+            }
+        }
+        return cuboidLayoutSize;
+    }
+
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
new file mode 100644
index 0000000000..6a5f4d7c56
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+public class ProjectStorageInfoCollector {
+
+    private List<StorageInfoCollector> collectors = Lists.newArrayList();
+
+    private static final ImmutableMap<Class, StorageInfoEnum> collectorType = 
ImmutableMap
+            .<Class, StorageInfoEnum> 
builder().put(GarbageStorageCollector.class, StorageInfoEnum.GARBAGE_STORAGE)
+            .put(TotalStorageCollector.class, StorageInfoEnum.TOTAL_STORAGE)
+            .put(StorageQuotaCollector.class, 
StorageInfoEnum.STORAGE_QUOTA).build();
+
+    public ProjectStorageInfoCollector(List<StorageInfoEnum> storageInfoList) {
+        if (CollectionUtils.isNotEmpty(storageInfoList)) {
+            storageInfoList.forEach(si -> addCollectors(si));
+        }
+    }
+
+    private void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) {
+        for (StorageInfoCollector collector : collectors) {
+            try {
+                collector.collect(config, project, storageVolumeInfo);
+            } catch (Exception e) {
+                
storageVolumeInfo.getThrowableMap().put(collectorType.get(collector.getClass()),
 e);
+            }
+        }
+    }
+
+    private void addCollectors(StorageInfoEnum storageInfoEnum) {
+        switch (storageInfoEnum) {
+        case GARBAGE_STORAGE:
+            collectors.add(new GarbageStorageCollector());
+            break;
+        case TOTAL_STORAGE:
+            collectors.add(new TotalStorageCollector());
+            break;
+        case STORAGE_QUOTA:
+            collectors.add(new StorageQuotaCollector());
+            break;
+        default:
+            break;
+        }
+    }
+
+    public StorageVolumeInfo getStorageVolumeInfo(KylinConfig config, String 
project) {
+        StorageVolumeInfo storageVolumeInfo = new StorageVolumeInfo();
+        if (StringUtils.isBlank(project) || 
CollectionUtils.isEmpty(collectors)) {
+            return storageVolumeInfo;
+        }
+        collect(config, project, storageVolumeInfo);
+        return storageVolumeInfo;
+    }
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
new file mode 100644
index 0000000000..4218244b06
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+
+public interface StorageInfoCollector {
+
+    void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) throws IOException;
+
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
new file mode 100644
index 0000000000..ac29378738
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+public enum StorageInfoEnum {
+    GARBAGE_STORAGE, TOTAL_STORAGE, STORAGE_QUOTA
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
new file mode 100644
index 0000000000..951f896ef6
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.metadata.project.NProjectManager;
+
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class StorageQuotaCollector implements StorageInfoCollector {
+
+    @Override
+    public void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) {
+        config = 
NProjectManager.getInstance(config).getProject(project).getConfig();
+        long quotaSize = config.getStorageQuotaSize();
+        storageVolumeInfo.setStorageQuotaSize(quotaSize);
+    }
+
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
new file mode 100644
index 0000000000..d77b18f18f
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.metadata.cube.storage;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.kylin.common.annotation.Clarification;
+
+@Getter
+@Setter
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class StorageVolumeInfo {
+    private long storageQuotaSize = -1L;
+    private long totalStorageSize = -1L;
+    private long garbageStorageSize = -1L;
+    private Map<String, Set<Long>> garbageModelIndexMap = Maps.newHashMap();
+    private Map<StorageInfoEnum, Throwable> throwableMap = Maps.newHashMap();
+}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
new file mode 100644
index 0000000000..e5101a9f1a
--- /dev/null
+++ 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+
+public class TotalStorageCollector implements StorageInfoCollector {
+
+    @Override
+    public void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) throws IOException {
+        String strPath = config.getWorkingDirectoryWithConfiguredFs(project);
+        Path path = new Path(strPath);
+        FileSystem fs = 
path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+        long totalStorageSize = 0L;
+        if (fs.exists(path)) {
+            totalStorageSize = HadoopUtil.getContentSummary(fs, 
path).getLength();
+        }
+        storageVolumeInfo.setTotalStorageSize(totalStorageSize);
+    }
+
+}

Reply via email to