This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 205bf2c50f KYLIN-5217 Create a initial commit
205bf2c50f is described below

commit 205bf2c50fc6416910fd83846cf62d86747f2330
Author: junqing.cai <junqing....@kyligence.io>
AuthorDate: Tue Aug 9 14:20:14 2022 +0800

    KYLIN-5217 Create a initial commit
---
 .../apache/kylin/job/dao/ExecutableOutputPO.java   | 149 ------------
 .../org/apache/kylin/job/dao/ExecutablePO.java     | 152 ------------
 .../org/apache/kylin/job/dao/JobStatistics.java    |  77 ------
 .../apache/kylin/job/dao/JobStatisticsBasic.java   |  53 -----
 .../apache/kylin/job/dao/JobStatisticsManager.java | 263 ---------------------
 .../org/apache/kylin/job/dao/NExecutableDao.java   | 133 -----------
 .../runners/AbstractDefaultSchedulerRunner.java    |  93 --------
 .../apache/kylin/job/runners/FetcherRunner.java    | 241 -------------------
 .../kylin/job/runners/ForkBasedJobRunner.java      |  48 ----
 .../kylin/job/runners/InMemoryJobRunner.java       |  36 ---
 .../apache/kylin/job/runners/JobCheckRunner.java   |  90 -------
 .../org/apache/kylin/job/runners/JobRunner.java    |  76 ------
 .../apache/kylin/job/runners/JobRunnerFactory.java | 185 ---------------
 .../job/runners/LicenseCapacityCheckRunner.java    |  63 -----
 .../kylin/job/runners/QuotaStorageCheckRunner.java |  77 ------
 .../cube/storage/GarbageStorageCollector.java      |  87 -------
 .../cube/storage/ProjectStorageInfoCollector.java  |  79 -------
 .../cube/storage/StorageInfoCollector.java         |  29 ---
 .../metadata/cube/storage/StorageInfoEnum.java     |  23 --
 .../cube/storage/StorageQuotaCollector.java        |  35 ---
 .../metadata/cube/storage/StorageVolumeInfo.java   |  38 ---
 .../cube/storage/TotalStorageCollector.java        |  42 ----
 22 files changed, 2069 deletions(-)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index a3b69ab095..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- *
- */
-@Setter
-@Getter
-@SuppressWarnings("serial")
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO implements Serializable {
-
-    @JsonIgnore
-    private InputStream contentStream;
-
-    @JsonProperty("content")
-    private String content;
-
-    @JsonProperty("log_path")
-    private String logPath;
-
-    @JsonProperty("status")
-    private String status = "READY";
-
-    @JsonProperty("info")
-    private Map<String, String> info = Maps.newHashMap();
-
-    @JsonProperty("last_modified")
-    private long lastModified;
-
-    @JsonProperty("createTime")
-    private long createTime = System.currentTimeMillis();
-
-    @JsonProperty("start_time")
-    private long startTime;
-
-    @JsonProperty("end_time")
-    private long endTime;
-
-    @JsonProperty("wait_time")
-    private long waitTime;
-
-    @JsonProperty("duration")
-    private long duration;
-
-    @JsonProperty("last_running_start_time")
-    private long lastRunningStartTime;
-
-    @JsonProperty("is_resumable")
-    private boolean resumable = false;
-
-    @JsonProperty("byte_size")
-    private long byteSize;
-
-    @JsonProperty("failed_msg")
-    private String failedMsg;
-
-    @JsonProperty("failed_step_id")
-    private String failedStepId;
-
-    @JsonProperty("failed_segment_id")
-    private String failedSegmentId;
-
-    @JsonProperty("failed_stack")
-    private String failedStack;
-
-    @JsonProperty("failed_reason")
-    private String failedReason;
-
-    public void addStartTime(long time) {
-        //when ready -> suicidal/discarded, the start time is 0
-        if (startTime == 0) {
-            startTime = time;
-        }
-        endTime = 0;
-    }
-
-    public void addEndTime(long time) {
-        Preconditions.checkArgument(startTime > 0L);
-        endTime = time;
-    }
-
-    public void addDuration(long time) {
-        if (time != 0 && time > lastRunningStartTime) {
-            duration = duration + time - lastRunningStartTime;
-        }
-    }
-
-    public void addLastRunningStartTime(long time) {
-        lastRunningStartTime = time;
-    }
-
-    public void resetTime() {
-        createTime = System.currentTimeMillis();
-        startTime = 0L;
-        endTime = 0L;
-        waitTime = 0L;
-        duration = 0L;
-        lastRunningStartTime = 0L;
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index b396efd047..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import static 
org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_IDS_DELIMITER;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- */
-@Setter
-@Getter
-@SuppressWarnings("serial")
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
-    public static final int HIGHEST_PRIORITY = 0;
-    public static final int DEFAULT_PRIORITY = 3;
-    public static final int LOWEST_PRIORITY = 4;
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("tasks")
-    private List<ExecutablePO> tasks;
-
-    @JsonProperty("type")
-    private String type;
-
-    @JsonProperty("handler_type")
-    private String handlerType;
-
-    @JsonProperty("params")
-    private Map<String, String> params = Maps.newHashMap();
-
-    @JsonProperty("segments")
-    private Set<NDataSegment> segments = Sets.newHashSet();
-
-    @JsonProperty("job_type")
-    private JobTypeEnum jobType;
-
-    @JsonProperty("data_range_start")
-    private long dataRangeStart;
-
-    @JsonProperty("data_range_end")
-    private long dataRangeEnd;
-
-    @JsonProperty("target_model")
-    private String targetModel;
-
-    @JsonProperty("target_segments")
-    private List<String> targetSegments;
-
-    @JsonProperty("output")
-    private ExecutableOutputPO output = new ExecutableOutputPO();
-
-    private String project;
-
-    @JsonProperty("target_partitions")
-    private Set<Long> targetPartitions = Sets.newHashSet();
-
-    @JsonProperty("priority")
-    private int priority = DEFAULT_PRIORITY;
-
-    @JsonProperty("tag")
-    private Object tag;
-
-    @JsonProperty("stages_map")
-    private Map<String, List<ExecutablePO>> stagesMap;
-
-    public void setPriority(int p) {
-        priority = isPriorityValid(p) ? p : DEFAULT_PRIORITY;
-    }
-
-    @Override
-    public String getResourcePath() {
-        return concatResourcePath(getUuid(), project);
-    }
-
-    public static String concatResourcePath(String name, String project) {
-        return new 
StringBuilder().append("/").append(project).append(ResourceStore.EXECUTABLE_JOB).append("/")
-                .append(name).toString();
-    }
-
-    public static boolean isPriorityValid(int priority) {
-        return priority >= HIGHEST_PRIORITY && priority <= LOWEST_PRIORITY;
-    }
-
-    public static boolean isHigherPriority(int p1, int p2) {
-        return p1 < p2;
-    }
-
-    public void addYarnApplicationJob(String appId) {
-        String oldAppIds = 
output.getInfo().getOrDefault(ExecutableConstants.YARN_APP_IDS, "");
-        Set<String> appIds = new 
HashSet<>(Arrays.asList(oldAppIds.split(YARN_APP_IDS_DELIMITER)));
-        if (!appIds.contains(appId)) {
-            String newAppIds = oldAppIds + (StringUtils.isEmpty(oldAppIds) ? 
"" : YARN_APP_IDS_DELIMITER) + appId;
-            output.getInfo().put(ExecutableConstants.YARN_APP_IDS, newAppIds);
-        }
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
deleted file mode 100644
index 0b1928fbbf..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-@Getter
-@Setter
-@NoArgsConstructor
-public class JobStatistics extends JobStatisticsBasic {
-
-    @JsonProperty("date")
-    private long date;
-    @JsonProperty("model_stats")
-    private Map<String, JobStatisticsBasic> jobStatisticsByModels = 
Maps.newHashMap();
-
-    @Override
-    public String resourceName() {
-        return String.valueOf(date);
-    }
-
-    public JobStatistics(long date, String model, long duration, long 
byteSize) {
-        this.date = date;
-        setCount(1);
-        setTotalDuration(duration);
-        setTotalByteSize(byteSize);
-        jobStatisticsByModels.put(model, new JobStatisticsBasic(duration, 
byteSize));
-    }
-
-    public JobStatistics(int count, long totalDuration, long totalByteSize) {
-        setCount(count);
-        setTotalDuration(totalDuration);
-        setTotalByteSize(totalByteSize);
-    }
-
-    public JobStatistics(long date, long totalDuration, long totalByteSize) {
-        this.date = date;
-        setCount(1);
-        setTotalDuration(totalDuration);
-        setTotalByteSize(totalByteSize);
-    }
-
-    public void update(String model, long duration, long byteSize, int 
deltaCount) {
-        super.update(duration, byteSize, deltaCount);
-        JobStatisticsBasic jobStatisticsByModel = 
jobStatisticsByModels.get(model);
-        if (jobStatisticsByModel == null) {
-            jobStatisticsByModel = new JobStatisticsBasic(duration, byteSize);
-        } else {
-            jobStatisticsByModel.update(duration, byteSize, deltaCount);
-        }
-
-        jobStatisticsByModels.put(model, jobStatisticsByModel);
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
deleted file mode 100644
index c05743c3cf..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-@Getter
-@Setter
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobStatisticsBasic extends RootPersistentEntity {
-    @JsonProperty("count")
-    private int count;
-    @JsonProperty("total_duration")
-    private long totalDuration;
-    @JsonProperty("total_byte_size")
-    private long totalByteSize;
-
-    public void update(long duration, long byteSize, int deltaCount) {
-        this.count += deltaCount;
-        this.totalDuration += duration;
-        this.totalByteSize += byteSize;
-    }
-
-    public JobStatisticsBasic(long totalDuration, long totalByteSize) {
-        this.count = 1;
-        this.totalDuration = totalDuration;
-        this.totalByteSize = totalByteSize;
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
deleted file mode 100644
index 2f0a65bfd3..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.time.DayOfWeek;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.TemporalAdjusters;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.stream.Collectors;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-
-import com.google.common.collect.Maps;
-
-public class JobStatisticsManager {
-
-    public static JobStatisticsManager getInstance(KylinConfig config, String 
project) {
-        return config.getManager(project, JobStatisticsManager.class);
-    }
-
-    // called by reflection
-    static JobStatisticsManager newInstance(KylinConfig conf, String project) {
-        try {
-            String cls = JobStatisticsManager.class.getName();
-            Class<? extends JobStatisticsManager> clz = ClassUtil.forName(cls, 
JobStatisticsManager.class);
-            return clz.getConstructor(KylinConfig.class, 
String.class).newInstance(conf, project);
-        } catch (Exception e) {
-            throw new RuntimeException("Failed to init DataModelManager from " 
+ conf, e);
-        }
-    }
-
-    // 
============================================================================
-
-    private KylinConfig config;
-    private String project;
-
-    private CachedCrudAssist<JobStatistics> crud;
-
-    public JobStatisticsManager(KylinConfig config, String project) {
-        init(config, project);
-    }
-
-    protected void init(KylinConfig cfg, final String project) {
-        this.config = cfg;
-        this.project = project;
-        final ResourceStore store = 
ResourceStore.getKylinMetaStore(this.config);
-        String resourceRootPath = "/" + this.project + 
ResourceStore.JOB_STATISTICS;
-        this.crud = new CachedCrudAssist<JobStatistics>(store, 
resourceRootPath, JobStatistics.class) {
-            @Override
-            protected JobStatistics initEntityAfterReload(JobStatistics 
jobStatistics, String resourceName) {
-                return jobStatistics;
-            }
-        };
-    }
-
-    public List<JobStatistics> getAll() {
-        return crud.listAll();
-    }
-
-    public JobStatistics updateStatistics(long date, String model, long 
duration, long byteSize, int deltaCount) {
-        JobStatistics jobStatistics = crud.get(String.valueOf(date));
-        JobStatistics jobStatisticsToUpdate;
-        if (jobStatistics == null) {
-            jobStatisticsToUpdate = new JobStatistics(date, model, duration, 
byteSize);
-            return crud.save(jobStatisticsToUpdate);
-        }
-
-        jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
-        jobStatisticsToUpdate.update(model, duration, byteSize, deltaCount);
-        return crud.save(jobStatisticsToUpdate);
-    }
-
-    public JobStatistics updateStatistics(long date, long duration, long 
byteSize, int deltaCount) {
-        JobStatistics jobStatistics = crud.get(String.valueOf(date));
-        JobStatistics jobStatisticsToUpdate;
-        if (jobStatistics == null) {
-            jobStatisticsToUpdate = new JobStatistics(date, duration, 
byteSize);
-            return crud.save(jobStatisticsToUpdate);
-        }
-
-        jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
-        jobStatisticsToUpdate.update(duration, byteSize, deltaCount);
-        return crud.save(jobStatisticsToUpdate);
-    }
-
-    public Pair<Integer, JobStatistics> getOverallJobStats(final long 
startTime, final long endTime) {
-        // filter
-        List<JobStatistics> filteredJobStats = 
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
-        // aggregate all job stats
-        JobStatistics aggregatedStats = aggregateJobStats(filteredJobStats);
-
-        return new Pair<>(aggregatedStats.getCount(), aggregatedStats);
-    }
-
-    public Map<String, Integer> getJobCountByTime(final long startTime, final 
long endTime,
-            final String timeDimension) {
-        Map<String, Integer> result = Maps.newHashMap();
-        aggregateJobStatsByTime(startTime, endTime, 
timeDimension).forEach((key, value) -> {
-            result.put(key, value.getCount());
-        });
-        return result;
-    }
-
-    public Map<String, Integer> getJobCountByModel(long startTime, long 
endTime) {
-        Map<String, Integer> result = Maps.newHashMap();
-
-        aggregateStatsByModel(startTime, endTime).forEach((modelName, value) 
-> {
-            String modelAlias = getModelAlias(modelName);
-            if (modelAlias == null)
-                return;
-            result.put(modelAlias, value.getCount());
-        });
-
-        return result;
-    }
-
-    public Map<String, Double> getDurationPerByteByTime(final long startTime, 
final long endTime,
-            final String timeDimension) {
-        Map<String, JobStatisticsBasic> aggregateResult = 
aggregateJobStatsByTime(startTime, endTime, timeDimension);
-        return calculateDurationPerByte(aggregateResult);
-    }
-
-    public Map<String, Double> getDurationPerByteByModel(long startTime, long 
endTime) {
-        Map<String, JobStatisticsBasic> transformedResult = Maps.newHashMap();
-
-        aggregateStatsByModel(startTime, endTime).forEach((modelName, value) 
-> {
-            String modelAlias = getModelAlias(modelName);
-            if (modelAlias == null)
-                return;
-            transformedResult.put(modelAlias,
-                    new JobStatisticsBasic(value.getTotalDuration(), 
value.getTotalByteSize()));
-        });
-
-        return calculateDurationPerByte(transformedResult);
-    }
-
-    private String getModelAlias(String modelId) {
-        NDataModelManager dataModelManager = 
NDataModelManager.getInstance(config, project);
-        NDataModel model = dataModelManager.getDataModelDesc(modelId);
-        if (model == null)
-            return null;
-
-        return model.getAlias();
-    }
-
-    private JobStatistics aggregateJobStats(List<JobStatistics> 
jobStatisticsToAggregate) {
-        return jobStatisticsToAggregate.stream()
-                .reduce((x, y) -> new JobStatistics(x.getCount() + 
y.getCount(),
-                        x.getTotalDuration() + y.getTotalDuration(), 
x.getTotalByteSize() + y.getTotalByteSize()))
-                .orElse(new JobStatistics());
-    }
-
-    // key is the date, value is the aggregated job stats
-    private Map<String, JobStatisticsBasic> aggregateJobStatsByTime(final long 
startTime, final long endTime,
-            final String timeDimension) {
-        Map<String, JobStatisticsBasic> result = Maps.newHashMap();
-
-        List<JobStatistics> qulifiedJobStats = 
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
-
-        long startDate = startTime;
-        while (startDate <= endTime) {
-            long nextDate = nextDate(startDate, timeDimension);
-            List<JobStatistics> list = 
getFilteredJobStatsByTime(qulifiedJobStats, startDate, nextDate);
-            result.put(formatDateTime(startDate), aggregateJobStats(list));
-            startDate = nextDate;
-        }
-
-        return result;
-    }
-
-    // format epoch time to date string
-    private String formatDateTime(long time) {
-        ZoneId zoneId = TimeZone.getDefault().toZoneId();
-        LocalDateTime localDateTime = 
Instant.ofEpochMilli(time).atZone(zoneId).toLocalDateTime();
-        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd",
-                Locale.getDefault(Locale.Category.FORMAT));
-        return localDateTime.format(formatter);
-    }
-
-    private long nextDate(final long date, final String timeDimension) {
-        ZoneId zoneId = TimeZone.getTimeZone(config.getTimeZone()).toZoneId();
-        LocalDate localDate = 
Instant.ofEpochMilli(date).atZone(zoneId).toLocalDate();
-        switch (timeDimension) {
-        case "day":
-            localDate = localDate.plusDays(1);
-            break;
-        case "week":
-            localDate = 
localDate.with(TemporalAdjusters.next(DayOfWeek.MONDAY));
-            break;
-        case "month":
-            localDate = 
localDate.with(TemporalAdjusters.firstDayOfNextMonth());
-            break;
-        default:
-            localDate = localDate.plusDays(1);
-            break;
-        }
-
-        return localDate.atStartOfDay(zoneId).toInstant().toEpochMilli();
-    }
-
-    private Map<String, JobStatisticsBasic> aggregateStatsByModel(long 
startTime, long endTime) {
-        return getFilteredJobStatsByTime(crud.listAll(), startTime, 
endTime).stream()
-                .map(JobStatistics::getJobStatisticsByModels).reduce((x, y) -> 
{
-                    // merge two maps
-                    Map<String, JobStatisticsBasic> mergedMap = 
Maps.newHashMap(x);
-                    y.forEach((k, v) -> mergedMap.merge(k, v,
-                            (value1, value2) -> new 
JobStatisticsBasic(value1.getCount() + value2.getCount(),
-                                    value1.getTotalDuration() + 
value2.getTotalDuration(),
-                                    value1.getTotalByteSize() + 
value2.getTotalByteSize())));
-                    return mergedMap;
-                }).orElse(Maps.newHashMap());
-    }
-
-    private List<JobStatistics> getFilteredJobStatsByTime(final 
List<JobStatistics> list, final long startTime,
-            final long endTime) {
-        return list.stream()
-                .filter(singleStats -> singleStats.getDate() >= startTime && 
singleStats.getDate() < endTime)
-                .collect(Collectors.toList());
-    }
-
-    private Map<String, Double> calculateDurationPerByte(Map<String, 
JobStatisticsBasic> totalMetricMap) {
-        Map<String, Double> result = Maps.newHashMap();
-        for (Map.Entry<String, JobStatisticsBasic> entry : 
totalMetricMap.entrySet()) {
-            double totalDuration = entry.getValue().getTotalDuration();
-            double totalByteSize = entry.getValue().getTotalByteSize();
-            if (totalByteSize == 0)
-                result.put(entry.getKey(), .0);
-            else {
-                result.put(entry.getKey(), totalDuration / totalByteSize);
-            }
-        }
-        return result;
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java 
b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
deleted file mode 100644
index 5c353d989c..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import lombok.val;
-
-/**
- */
-public class NExecutableDao {
-
-    private static final Serializer<ExecutablePO> JOB_SERIALIZER = new 
JsonSerializer<>(ExecutablePO.class);
-    private static final Logger logger = 
LoggerFactory.getLogger(NExecutableDao.class);
-
-    public static NExecutableDao getInstance(KylinConfig config, String 
project) {
-        return config.getManager(project, NExecutableDao.class);
-    }
-
-    // called by reflection
-    static NExecutableDao newInstance(KylinConfig config, String project) {
-        return new NExecutableDao(config, project);
-    }
-    // 
============================================================================
-
-    private String project;
-
-    private KylinConfig config;
-
-    private CachedCrudAssist<ExecutablePO> crud;
-
-    private NExecutableDao(KylinConfig config, String project) {
-        logger.trace("Using metadata url: {}", config);
-        this.project = project;
-        this.config = config;
-        String resourceRootPath = "/" + project + 
ResourceStore.EXECUTE_RESOURCE_ROOT;
-        this.crud = new CachedCrudAssist<ExecutablePO>(getStore(), 
resourceRootPath, "", ExecutablePO.class) {
-            @Override
-            protected ExecutablePO initEntityAfterReload(ExecutablePO entity, 
String resourceName) {
-                entity.setProject(project);
-                return entity;
-            }
-        };
-    }
-
-    public List<ExecutablePO> getJobs() {
-        return crud.listAll();
-    }
-
-    public List<ExecutablePO> getPartialJobs(Predicate<String> predicate) {
-        return crud.listPartial(predicate);
-    }
-
-    public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) {
-        return crud.listAll().stream()
-                .filter(x -> x.getLastModified() >= timeStart && 
x.getLastModified() < timeEndExclusive)
-                .collect(Collectors.toList());
-    }
-
-    public List<String> getJobIds() {
-        return 
crud.listAll().stream().sorted(Comparator.comparing(ExecutablePO::getCreateTime))
-                
.sorted(Comparator.comparing(ExecutablePO::getPriority)).map(RootPersistentEntity::resourceName)
-                .collect(Collectors.toList());
-    }
-
-    public ExecutablePO getJobByUuid(String uuid) {
-        return crud.get(uuid);
-    }
-
-    public ExecutablePO addJob(ExecutablePO job) {
-        if (getJobByUuid(job.getUuid()) != null) {
-            throw new IllegalArgumentException("job id:" + job.getUuid() + " 
already exists");
-        }
-        crud.save(job);
-        return job;
-    }
-
-    // for ut
-    @VisibleForTesting
-    public void deleteAllJob() {
-        getJobs().forEach(job -> deleteJob(job.getId()));
-    }
-
-    public void deleteJob(String uuid) {
-        crud.delete(uuid);
-    }
-
-    public void updateJob(String uuid, Predicate<ExecutablePO> updater) {
-        val job = getJobByUuid(uuid);
-        Preconditions.checkNotNull(job);
-        val copyForWrite = JsonUtil.copyBySerialization(job, JOB_SERIALIZER, 
null);
-        if (updater.test(copyForWrite)) {
-            crud.save(copyForWrite);
-        }
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getKylinMetaStore(config);
-    }
-
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
deleted file mode 100644
index a8c6f3c973..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.logging.SetLogCategory;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.epoch.EpochManager;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.SneakyThrows;
-
-public abstract class AbstractDefaultSchedulerRunner implements Runnable {
-    private static final Logger logger = 
LoggerFactory.getLogger(AbstractDefaultSchedulerRunner.class);
-    protected final NDefaultScheduler nDefaultScheduler;
-
-    protected final ExecutableContext context;
-
-    protected final String project;
-
-    public AbstractDefaultSchedulerRunner(final NDefaultScheduler 
nDefaultScheduler) {
-        this.nDefaultScheduler = nDefaultScheduler;
-        this.context = nDefaultScheduler.getContext();
-        this.project = nDefaultScheduler.getProject();
-    }
-
-    @SneakyThrows
-    private boolean checkEpochIdFailed() {
-        //check failed if isInterrupted
-        if (Thread.currentThread().isInterrupted()) {
-            throw new InterruptedException();
-        }
-
-        if (!KylinConfig.getInstanceFromEnv().isUTEnv()
-                && 
!EpochManager.getInstance().checkEpochId(context.getEpochId(), project)) {
-            nDefaultScheduler.forceShutdown();
-            return true;
-        }
-        return false;
-    }
-
-    // stop job if Storage Quota Limit reached
-    protected void stopJobIfSQLReached(String jobId) {
-        if (context.isReachQuotaLimit()) {
-            try {
-                EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-                    if (context.isReachQuotaLimit()) {
-                        
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).pauseJob(jobId);
-                        logger.info("Job {} paused due to no available storage 
quota.", jobId);
-                        logger.info("Please clean up low-efficient storage in 
time, "
-                                + "increase the low-efficient storage 
threshold, "
-                                + "or notify the administrator to increase the 
storage quota for this project.");
-                    }
-                    return null;
-                }, project, UnitOfWork.DEFAULT_MAX_RETRY, 
context.getEpochId(), jobId);
-            } catch (Exception e) {
-                logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} 
failed to pause", project, jobId, e);
-            }
-        }
-    }
-
-    @Override
-    public void run() {
-        try (SetLogCategory ignored = new SetLogCategory("schedule")) {
-            if (checkEpochIdFailed()) {
-                return;
-            }
-            doRun();
-        }
-    }
-
-    protected abstract void doRun();
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
deleted file mode 100644
index c9bb123a45..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-
-public class FetcherRunner extends AbstractDefaultSchedulerRunner {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(FetcherRunner.class);
-
-    private final ExecutorService jobPool;
-
-    private final ScheduledExecutorService fetcherPool;
-
-    public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService 
jobPool,
-            ScheduledExecutorService fetcherPool) {
-        super(nDefaultScheduler);
-        this.jobPool = jobPool;
-        this.fetcherPool = fetcherPool;
-    }
-
-    private boolean checkSuicide(String jobId) {
-        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        if (executableManager.getJob(jobId).getStatus().isFinalState()) {
-            return false;
-        }
-        return executableManager.getJob(jobId).checkSuicide();
-    }
-
-    private boolean markSuicideJob(String jobId) {
-        try {
-            if (checkSuicide(jobId)) {
-                return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() 
-> {
-                    if (checkSuicide(jobId)) {
-                        
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).suicideJob(jobId);
-                        return true;
-                    }
-                    return false;
-                }, project, UnitOfWork.DEFAULT_MAX_RETRY, 
context.getEpochId(), jobId);
-            }
-        } catch (Exception e) {
-            logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should 
be suicidal but discard failed", project,
-                    jobId, e);
-        }
-        return false;
-    }
-
-    private boolean markErrorJob(String jobId) {
-        try {
-            return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
-                val manager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-                manager.errorJob(jobId);
-                return true;
-            }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), 
jobId);
-        } catch (Exception e) {
-            logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should 
be error but mark failed", project,
-                    jobId, e);
-        }
-        return false;
-    }
-
-    @Override
-    public void doRun() {
-        try {
-            val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-            Map<String, Executable> runningJobs = context.getRunningJobs();
-
-            int nRunning = 0;
-            int nReady = 0;
-            int nStopped = 0;
-            int nOthers = 0;
-            int nError = 0;
-            int nDiscarded = 0;
-            int nSucceed = 0;
-            int nSuicidal = 0;
-            for (final String id : executableManager.getJobs()) {
-                if (markSuicideJob(id)) {
-                    nSuicidal++;
-                    continue;
-                }
-
-                if (runningJobs.containsKey(id)) {
-
-                    // this is very important to prevent from same job being 
scheduled at same time.
-                    // e.g. when a job is restarted, the old job may still be 
running (even if we tried to interrupt it)
-                    // until the old job is finished, the new job should not 
start
-                    nRunning++;
-                    continue;
-                }
-
-                final Output output = executableManager.getOutput(id);
-
-                switch (output.getState()) {
-                case READY:
-                    nReady++;
-                    if (isJobPoolFull()) {
-                        break;
-                    }
-
-                    if (context.isReachQuotaLimit()) {
-                        stopJobIfSQLReached(id);
-                        break;
-                    }
-
-                    logger.info("fetcher schedule {} ", id);
-                    scheduleJob(id);
-                    break;
-                case DISCARDED:
-                    nDiscarded++;
-                    break;
-                case ERROR:
-                    nError++;
-                    break;
-                case SUCCEED:
-                    nSucceed++;
-                    break;
-                case PAUSED:
-                    nStopped++;
-                    break;
-                case SUICIDAL:
-                    nSuicidal++;
-                    break;
-                default:
-                    if (allSubTasksSuccess(id)) {
-                        logger.info("All sub tasks are successful, reschedule 
job {}", id);
-                        scheduleJob(id);
-                        break;
-                    }
-                    logger.warn("Unexpected status for {} <{}>", id, 
output.getState());
-                    if (markErrorJob(id)) {
-                        nError++;
-                    } else {
-                        nOthers++;
-                    }
-                    break;
-                }
-            }
-
-            logger.info(
-                    "Job Status in project {}: {} should running, {} actual 
running, {} stopped, {} ready, {} already succeed, {} error, {} discarded, {} 
suicidal,  {} others",
-                    project, nRunning, runningJobs.size(), nStopped, nReady, 
nSucceed, nError, nDiscarded, nSuicidal,
-                    nOthers);
-        } catch (Exception e) {
-            logger.warn("Job Fetcher caught a exception ", e);
-        }
-    }
-
-    private boolean allSubTasksSuccess(String id) {
-        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-
-        // check special case, all sub task success, show make current job to 
success
-        AbstractExecutable job = executableManager.getJob(id);
-        if (job instanceof DefaultChainedExecutable) {
-            return ((DefaultChainedExecutable) job).getTasks().stream()
-                    .allMatch(abstractExecutable -> 
abstractExecutable.getStatus() == ExecutableState.SUCCEED);
-        }
-
-        return false;
-    }
-
-    private void scheduleJob(String id) {
-        AbstractExecutable executable = null;
-        String jobDesc = null;
-
-        boolean memoryLock = false;
-        int useMemoryCapacity = 0;
-        try {
-            val config = KylinConfig.getInstanceFromEnv();
-            val executableManager = NExecutableManager.getInstance(config, 
project);
-            executable = executableManager.getJob(id);
-            if (!config.getDeployMode().equals("cluster")) {
-                useMemoryCapacity = executable.computeStepDriverMemory();
-            }
-            memoryLock = 
NDefaultScheduler.getMemoryRemaining().tryAcquire(useMemoryCapacity);
-            if (memoryLock) {
-                jobDesc = executable.toString();
-                logger.info("{} prepare to schedule", jobDesc);
-                context.addRunningJob(executable);
-                jobPool.execute(new JobRunner(nDefaultScheduler, executable, 
this));
-                logger.info("{} scheduled", jobDesc);
-            } else {
-                logger.info("memory is not enough, remaining: {} MB",
-                        
NDefaultScheduler.getMemoryRemaining().availablePermits());
-            }
-        } catch (Exception ex) {
-            if (executable != null) {
-                context.removeRunningJob(executable);
-                if (memoryLock) {
-                    // may release twice when exception raise after jobPool 
execute executable
-                    
NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity);
-                }
-            }
-            logger.warn("{} fail to schedule", jobDesc, ex);
-        }
-    }
-
-    private boolean isJobPoolFull() {
-        if (context.getRunningJobs().size() >= 
nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
-            logger.warn("There are too many jobs running, Job Fetch will wait 
until next schedule time.");
-            return true;
-        }
-        return false;
-    }
-
-    void scheduleNext() {
-        fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
deleted file mode 100644
index fc29a5ebb4..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.ExecutableApplication;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ForkBasedJobRunner extends JobRunnerFactory.AbstractJobRunner {
-
-    private final CliCommandExecutor cliExecutor = new CliCommandExecutor();
-
-    public ForkBasedJobRunner(KylinConfig kylinConfig, String project, 
List<String> originResources) {
-        super(kylinConfig, project, originResources);
-    }
-
-    @Override
-    protected void doExecute(ExecutableApplication app, Map<String, String> 
args) throws Exception {
-        String finalCommand = String.format(Locale.ROOT, "bash -x 
%s/sbin/bootstrap.sh %s %s 2>>%s",
-                KylinConfig.getKylinHome(), app.getClass().getName(), 
formatArgs(args), getJobTmpDir() + "/job.log");
-        log.info("Try to execute {}", finalCommand);
-        cliExecutor.execute(finalCommand, new BufferedLogger(log), jobId);
-    }
-
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
deleted file mode 100644
index 119c3313e3..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ExecutableApplication;
-
-public class InMemoryJobRunner extends JobRunnerFactory.AbstractJobRunner {
-
-    public InMemoryJobRunner(KylinConfig config, String project, List<String> 
resources) {
-        super(config, project, resources);
-    }
-
-    @Override
-    public void doExecute(ExecutableApplication app, Map<String, String> args) 
throws Exception {
-        app.execute(formatArgs(args).split(" "));
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
deleted file mode 100644
index 195a244900..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-
-public class JobCheckRunner extends AbstractDefaultSchedulerRunner {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(JobCheckRunner.class);
-
-    public JobCheckRunner(NDefaultScheduler nDefaultScheduler) {
-        super(nDefaultScheduler);
-    }
-
-    private boolean checkTimeoutIfNeeded(String jobId, Long startTime) {
-        Integer timeOutMinute = 
KylinConfig.getInstanceFromEnv().getSchedulerJobTimeOutMinute();
-        if (timeOutMinute == 0) {
-            return false;
-        }
-        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        if (executableManager.getJob(jobId).getStatus().isFinalState()) {
-            return false;
-        }
-        long duration = System.currentTimeMillis() - startTime;
-        long durationMins = Math.toIntExact(duration / (60 * 1000));
-        return durationMins >= timeOutMinute;
-    }
-
-    private boolean discardTimeoutJob(String jobId, Long startTime) {
-        try {
-            if (checkTimeoutIfNeeded(jobId, startTime)) {
-                return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() 
-> {
-                    if (checkTimeoutIfNeeded(jobId, startTime)) {
-                        logger.error("project {} job {} running timeout.", 
project, jobId);
-                        
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), 
project).errorJob(jobId);
-                        return true;
-                    }
-                    return false;
-                }, project, UnitOfWork.DEFAULT_MAX_RETRY, 
context.getEpochId(), jobId);
-            }
-        } catch (Exception e) {
-            logger.warn("[UNEXPECTED_THINGS_HAPPENED] project " + project + " 
job " + jobId
-                    + " should be timeout but discard failed", e);
-        }
-        return false;
-    }
-
-    @Override
-    protected void doRun() {
-
-        logger.info("start check project {} job pool.", project);
-
-        val executableManager = 
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        Map<String, Executable> runningJobs = context.getRunningJobs();
-        Map<String, Long> runningJobInfos = context.getRunningJobInfos();
-        for (final String id : executableManager.getJobs()) {
-            if (runningJobs.containsKey(id)) {
-                discardTimeoutJob(id, runningJobInfos.get(id));
-                stopJobIfSQLReached(id);
-            }
-        }
-
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
deleted file mode 100644
index 3e839f57f4..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.SetThreadName;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
-import org.apache.kylin.job.exception.JobStoppedVoluntarilyException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.common.logging.SetLogCategory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-import lombok.var;
-
-public class JobRunner extends AbstractDefaultSchedulerRunner {
-    private static final Logger logger = 
LoggerFactory.getLogger(JobRunner.class);
-
-    private final AbstractExecutable executable;
-
-    private final FetcherRunner fetcherRunner;
-
-    public JobRunner(NDefaultScheduler nDefaultScheduler, AbstractExecutable 
executable, FetcherRunner fetcherRunner) {
-        super(nDefaultScheduler);
-        this.fetcherRunner = fetcherRunner;
-        this.executable = executable;
-    }
-
-    @Override
-    protected void doRun() {
-        //only the first 8 chars of the job uuid
-        val jobIdSimple = executable.getId().substring(0, 8);
-        try (SetThreadName ignored = new 
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
-                SetLogCategory logCategory = new SetLogCategory("schedule")) {
-            executable.execute(context);
-            // trigger the next step asap
-            fetcherRunner.scheduleNext();
-        } catch (JobStoppedVoluntarilyException | 
JobStoppedNonVoluntarilyException e) {
-            logger.info("Job quits either voluntarily or non-voluntarily,job: 
{}", jobIdSimple, e);
-        } catch (ExecuteException e) {
-            logger.error("ExecuteException occurred while job: " + 
executable.getId(), e);
-        } catch (Exception e) {
-            logger.error("unknown error execute job: " + executable.getId(), 
e);
-        } finally {
-            context.removeRunningJob(executable);
-            val config = KylinConfig.getInstanceFromEnv();
-            var usingMemory = 0;
-            if (!config.getDeployMode().equals("cluster")) {
-                usingMemory = executable.computeStepDriverMemory();
-            }
-            logger.info("Before job:{} global memory release {}", jobIdSimple,
-                    NDefaultScheduler.getMemoryRemaining().availablePermits());
-            NDefaultScheduler.getMemoryRemaining().release(usingMemory);
-            logger.info("After job:{} global memory release {}", jobIdSimple,
-                    NDefaultScheduler.getMemoryRemaining().availablePermits());
-        }
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
deleted file mode 100644
index 0713558d35..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigBase;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ExecutableApplication;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ZipFileUtils;
-import org.apache.kylin.common.persistence.metadata.MetadataStore;
-import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-
-import com.google.common.collect.Maps;
-
-import lombok.RequiredArgsConstructor;
-import lombok.val;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class JobRunnerFactory {
-
-    private JobRunnerFactory() {
-        // Just implement it
-    }
-
-    public static AbstractJobRunner createRunner(KylinConfig config, String 
type, String project,
-            List<String> resources) {
-        switch (type) {
-        case "fork":
-            return new ForkBasedJobRunner(config, project, resources);
-        case "in-memory":
-            return new InMemoryJobRunner(config, project, resources);
-        default:
-            throw new NotImplementedException("Runner type " + type + " not 
implement");
-        }
-    }
-
-    @RequiredArgsConstructor
-    public abstract static class AbstractJobRunner {
-
-        protected final KylinConfig kylinConfig;
-        protected final String project;
-        protected final List<String> originResources;
-        private Consumer<Properties> configUpdater;
-
-        protected String metaDumpUrl;
-        protected String jobId;
-
-        public String prepareEnv(String jobId) throws IOException {
-            this.jobId = jobId;
-            val jobTmpDir = getJobTmpDir();
-            FileUtils.forceMkdir(new File(jobTmpDir));
-
-            metaDumpUrl = kylinConfig.getMetadataUrlPrefix() + 
"@hdfs,path=file://" + jobTmpDir + "/meta";
-            return jobTmpDir;
-        }
-
-        public void start(ExecutableApplication app, Map<String, String> args) 
throws Exception {
-            attachMetadataAndKylinProps(false);
-            args.put("meta", metaDumpUrl);
-            args.put("metaOutput", metaDumpUrl + "_output");
-            doExecute(app, args);
-            uploadJobLog();
-        }
-
-        public void cleanupEnv() {
-            FileUtils.deleteQuietly(new File(getJobTmpDir()));
-        }
-
-        protected abstract void doExecute(ExecutableApplication app, 
Map<String, String> args) throws Exception;
-
-        protected String formatArgs(Map<String, String> args) {
-            return args.entrySet().stream().map(entry -> "--" + entry.getKey() 
+ "=" + entry.getValue())
-                    .collect(Collectors.joining(" "));
-        }
-
-        public String getJobTmpDir() {
-            return KylinConfigBase.getKylinHome() + "/tmp/" + jobId;
-        }
-
-        public void setConfigUpdater(Consumer<Properties> consumer) {
-            this.configUpdater = consumer;
-        }
-
-        protected boolean uploadJobLog() {
-            val jobContentZip = getJobTmpDir() + ".zip";
-            try {
-                ZipFileUtils.compressZipFile(getJobTmpDir(), jobContentZip);
-                val jobDir = kylinConfig.getJobTmpDir(project, true);
-                val fs = HadoopUtil.getFileSystem(jobDir);
-
-                try (val in = new FileInputStream(new File(jobContentZip));
-                        val out = fs.create(new Path(jobDir + jobId + ".zip"), 
true)) {
-                    IOUtils.copy(in, out);
-                }
-                return true;
-            } catch (IOException e) {
-                log.warn("Upload Job Evidence failed {}", jobId, e);
-            } finally {
-                FileUtils.deleteQuietly(new File(jobContentZip));
-            }
-            return false;
-        }
-
-        protected void attachMetadataAndKylinProps(boolean kylinPropsOnly) 
throws IOException {
-            if (StringUtils.isEmpty(metaDumpUrl)) {
-                throw new RuntimeException("Missing metaUrl");
-            }
-
-            File tmpDir = File.createTempFile("kylin_job_meta", "");
-            try {
-                org.apache.commons.io.FileUtils.forceDelete(tmpDir); // we 
need a directory, so delete the file first
-
-                Properties props = kylinConfig.exportToProperties();
-                props.setProperty("kylin.metadata.url", metaDumpUrl);
-                if (configUpdater != null) {
-                    configUpdater.accept(props);
-                }
-
-                if (kylinPropsOnly) {
-                    ResourceStore.dumpKylinProps(tmpDir, props);
-                } else {
-                    // The way of Updating metadata is CopyOnWrite. So it is 
safe to use Reference in the value.
-                    Map<String, RawResource> dumpMap = 
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(
-                            UnitOfWorkParams.<Map> 
builder().readonly(true).unitName(project).processor(() -> {
-                                Map<String, RawResource> retMap = 
Maps.newHashMap();
-                                for (String resPath : originResources) {
-                                    ResourceStore resourceStore = 
ResourceStore.getKylinMetaStore(kylinConfig);
-                                    RawResource rawResource = 
resourceStore.getResource(resPath);
-                                    retMap.put(resPath, rawResource);
-                                }
-                                return retMap;
-                            }).build());
-
-                    if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) {
-                        return;
-                    }
-                    // dump metadata
-                    ResourceStore.dumpResourceMaps(kylinConfig, tmpDir, 
dumpMap, props);
-                }
-
-                // copy metadata to target metaUrl
-                KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
-                
MetadataStore.createMetadataStore(dstConfig).uploadFromFile(tmpDir);
-                // clean up
-                log.debug("Copied metadata to the target metaUrl, delete the 
temp dir: {}", tmpDir);
-            } finally {
-                FileUtils.forceDelete(tmpDir);
-            }
-        }
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
deleted file mode 100644
index 1b51dd2c3b..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import static 
org.apache.kylin.common.exception.CommonErrorCode.LICENSE_OVER_CAPACITY;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class LicenseCapacityCheckRunner extends AbstractDefaultSchedulerRunner 
{
-    private static final Logger logger = 
LoggerFactory.getLogger(LicenseCapacityCheckRunner.class);
-
-    public LicenseCapacityCheckRunner(NDefaultScheduler nDefaultScheduler) {
-        super(nDefaultScheduler);
-    }
-
-    @Override
-    protected void doRun() {
-        logger.info("start check license capacity for project {}", project);
-        context.setLicenseOverCapacity(isLicenseOverCapacity());
-    }
-
-    private boolean isLicenseOverCapacity() {
-        val sourceUsageManager = 
SourceUsageManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        try {
-            sourceUsageManager.checkIsOverCapacity(project);
-        } catch (KylinException e) {
-            if (LICENSE_OVER_CAPACITY.toErrorCode() == e.getErrorCode()) {
-                logger.warn("Source usage over capacity, no job will be 
scheduled.", e);
-                return true;
-            }
-        } catch (Throwable e) {
-            logger.warn("Check source usage over capacity failed.", e);
-        }
-
-        // not over capacity
-        return false;
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
deleted file mode 100644
index c7f3140671..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector;
-import org.apache.kylin.metadata.cube.storage.StorageInfoEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import lombok.val;
-import lombok.var;
-
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class QuotaStorageCheckRunner extends AbstractDefaultSchedulerRunner {
-    private static final Logger logger = 
LoggerFactory.getLogger(QuotaStorageCheckRunner.class);
-
-    private final ProjectStorageInfoCollector collector;
-
-    public QuotaStorageCheckRunner(NDefaultScheduler nDefaultScheduler) {
-        super(nDefaultScheduler);
-
-        val storageInfoEnumList = 
Lists.newArrayList(StorageInfoEnum.STORAGE_QUOTA, 
StorageInfoEnum.TOTAL_STORAGE);
-        collector = new ProjectStorageInfoCollector(storageInfoEnumList);
-    }
-
-    @Override
-    protected void doRun() {
-        logger.info("start check project {} storage quota.", 
nDefaultScheduler.getProject());
-        context.setReachQuotaLimit(reachStorageQuota());
-    }
-
-    private boolean reachStorageQuota() {
-        var storageVolumeInfo = 
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
-                nDefaultScheduler.getProject());
-        var totalSize = storageVolumeInfo.getTotalStorageSize();
-        int retryCount = 3;
-        while (retryCount-- > 0 && totalSize < 0) {
-            storageVolumeInfo = 
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
-                    nDefaultScheduler.getProject());
-            totalSize = storageVolumeInfo.getTotalStorageSize();
-        }
-        val storageQuotaSize = storageVolumeInfo.getStorageQuotaSize();
-        if (totalSize < 0) {
-            logger.error(
-                    "Project '{}' : an exception occurs when getting storage 
volume info, no job will be scheduled!!! The error info : {}",
-                    nDefaultScheduler.getProject(),
-                    
storageVolumeInfo.getThrowableMap().get(StorageInfoEnum.TOTAL_STORAGE));
-            return true;
-        }
-        if (totalSize >= storageQuotaSize) {
-            logger.info("Project '{}' reach storage quota, no job will be 
scheduled!!!",
-                    nDefaultScheduler.getProject());
-            return true;
-        }
-        return false;
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
deleted file mode 100644
index 5fb6ecc8ff..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.cube.model.NDataLayout;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.cube.model.NDataflow;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.optimization.IndexOptimizer;
-import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory;
-import org.apache.kylin.metadata.model.NDataModel;
-
-import com.google.common.collect.Maps;
-
-import lombok.val;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class GarbageStorageCollector implements StorageInfoCollector {
-
-    @Override
-    public void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) {
-        Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap();
-        long storageSize = 0L;
-
-        for (val model : getModels(project)) {
-            val dataflow = getDataflow(model).copy();
-
-            final IndexOptimizer indexOptimizer = 
IndexOptimizerFactory.getOptimizer(dataflow, false);
-            val garbageLayouts = 
indexOptimizer.getGarbageLayoutMap(dataflow).keySet();
-            if (CollectionUtils.isNotEmpty(garbageLayouts)) {
-                storageSize += calculateLayoutSize(garbageLayouts, dataflow);
-                garbageIndexMap.put(model.getId(), garbageLayouts);
-            }
-        }
-
-        storageVolumeInfo.setGarbageModelIndexMap(garbageIndexMap);
-        storageVolumeInfo.setGarbageStorageSize(storageSize);
-    }
-
-    private List<NDataModel> getModels(String project) {
-        val dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        return dataflowManager.listUnderliningDataModels();
-    }
-
-    private NDataflow getDataflow(NDataModel model) {
-        val dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), 
model.getProject());
-        return dataflowManager.getDataflow(model.getUuid());
-    }
-
-    private long calculateLayoutSize(Set<Long> cuboidLayoutIdSet, NDataflow 
dataflow) {
-        long cuboidLayoutSize = 0L;
-        for (NDataSegment segment : 
dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING)) {
-            for (Long cuboidLayoutId : cuboidLayoutIdSet) {
-                NDataLayout dataCuboid = 
segment.getSegDetails().getLayoutById(cuboidLayoutId);
-                if (dataCuboid != null) {
-                    cuboidLayoutSize += dataCuboid.getByteSize();
-                }
-            }
-        }
-        return cuboidLayoutSize;
-    }
-
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
deleted file mode 100644
index 6a5f4d7c56..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-public class ProjectStorageInfoCollector {
-
-    private List<StorageInfoCollector> collectors = Lists.newArrayList();
-
-    private static final ImmutableMap<Class, StorageInfoEnum> collectorType = 
ImmutableMap
-            .<Class, StorageInfoEnum> 
builder().put(GarbageStorageCollector.class, StorageInfoEnum.GARBAGE_STORAGE)
-            .put(TotalStorageCollector.class, StorageInfoEnum.TOTAL_STORAGE)
-            .put(StorageQuotaCollector.class, 
StorageInfoEnum.STORAGE_QUOTA).build();
-
-    public ProjectStorageInfoCollector(List<StorageInfoEnum> storageInfoList) {
-        if (CollectionUtils.isNotEmpty(storageInfoList)) {
-            storageInfoList.forEach(si -> addCollectors(si));
-        }
-    }
-
-    private void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) {
-        for (StorageInfoCollector collector : collectors) {
-            try {
-                collector.collect(config, project, storageVolumeInfo);
-            } catch (Exception e) {
-                
storageVolumeInfo.getThrowableMap().put(collectorType.get(collector.getClass()),
 e);
-            }
-        }
-    }
-
-    private void addCollectors(StorageInfoEnum storageInfoEnum) {
-        switch (storageInfoEnum) {
-        case GARBAGE_STORAGE:
-            collectors.add(new GarbageStorageCollector());
-            break;
-        case TOTAL_STORAGE:
-            collectors.add(new TotalStorageCollector());
-            break;
-        case STORAGE_QUOTA:
-            collectors.add(new StorageQuotaCollector());
-            break;
-        default:
-            break;
-        }
-    }
-
-    public StorageVolumeInfo getStorageVolumeInfo(KylinConfig config, String 
project) {
-        StorageVolumeInfo storageVolumeInfo = new StorageVolumeInfo();
-        if (StringUtils.isBlank(project) || 
CollectionUtils.isEmpty(collectors)) {
-            return storageVolumeInfo;
-        }
-        collect(config, project, storageVolumeInfo);
-        return storageVolumeInfo;
-    }
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
deleted file mode 100644
index 4218244b06..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.KylinConfig;
-
-public interface StorageInfoCollector {
-
-    void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) throws IOException;
-
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
deleted file mode 100644
index ac29378738..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-public enum StorageInfoEnum {
-    GARBAGE_STORAGE, TOTAL_STORAGE, STORAGE_QUOTA
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
deleted file mode 100644
index 951f896ef6..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.metadata.project.NProjectManager;
-
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class StorageQuotaCollector implements StorageInfoCollector {
-
-    @Override
-    public void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) {
-        config = 
NProjectManager.getInstance(config).getProject(project).getConfig();
-        long quotaSize = config.getStorageQuotaSize();
-        storageVolumeInfo.setStorageQuotaSize(quotaSize);
-    }
-
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
deleted file mode 100644
index d77b18f18f..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.metadata.cube.storage;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.kylin.common.annotation.Clarification;
-
-@Getter
-@Setter
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class StorageVolumeInfo {
-    private long storageQuotaSize = -1L;
-    private long totalStorageSize = -1L;
-    private long garbageStorageSize = -1L;
-    private Map<String, Set<Long>> garbageModelIndexMap = Maps.newHashMap();
-    private Map<StorageInfoEnum, Throwable> throwableMap = Maps.newHashMap();
-}
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
 
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
deleted file mode 100644
index e5101a9f1a..0000000000
--- 
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-
-public class TotalStorageCollector implements StorageInfoCollector {
-
-    @Override
-    public void collect(KylinConfig config, String project, StorageVolumeInfo 
storageVolumeInfo) throws IOException {
-        String strPath = config.getWorkingDirectoryWithConfiguredFs(project);
-        Path path = new Path(strPath);
-        FileSystem fs = 
path.getFileSystem(HadoopUtil.getCurrentConfiguration());
-        long totalStorageSize = 0L;
-        if (fs.exists(path)) {
-            totalStorageSize = HadoopUtil.getContentSummary(fs, 
path).getLength();
-        }
-        storageVolumeInfo.setTotalStorageSize(totalStorageSize);
-    }
-
-}

Reply via email to