This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new c8d73369e4 KYLIN-5217 Create a initial commit
c8d73369e4 is described below
commit c8d73369e47013a564b263420d4250d87246e7f6
Author: junqing.cai <[email protected]>
AuthorDate: Tue Aug 9 14:27:42 2022 +0800
KYLIN-5217 Create a initial commit
---
.../apache/kylin/job/dao/ExecutableOutputPO.java | 149 ++++++++++++
.../org/apache/kylin/job/dao/ExecutablePO.java | 152 ++++++++++++
.../org/apache/kylin/job/dao/JobStatistics.java | 77 ++++++
.../apache/kylin/job/dao/JobStatisticsBasic.java | 53 +++++
.../apache/kylin/job/dao/JobStatisticsManager.java | 263 +++++++++++++++++++++
.../org/apache/kylin/job/dao/NExecutableDao.java | 133 +++++++++++
.../runners/AbstractDefaultSchedulerRunner.java | 93 ++++++++
.../apache/kylin/job/runners/FetcherRunner.java | 241 +++++++++++++++++++
.../kylin/job/runners/ForkBasedJobRunner.java | 48 ++++
.../kylin/job/runners/InMemoryJobRunner.java | 36 +++
.../apache/kylin/job/runners/JobCheckRunner.java | 90 +++++++
.../org/apache/kylin/job/runners/JobRunner.java | 76 ++++++
.../apache/kylin/job/runners/JobRunnerFactory.java | 185 +++++++++++++++
.../job/runners/LicenseCapacityCheckRunner.java | 63 +++++
.../kylin/job/runners/QuotaStorageCheckRunner.java | 77 ++++++
.../cube/storage/GarbageStorageCollector.java | 87 +++++++
.../cube/storage/ProjectStorageInfoCollector.java | 79 +++++++
.../cube/storage/StorageInfoCollector.java | 29 +++
.../metadata/cube/storage/StorageInfoEnum.java | 23 ++
.../cube/storage/StorageQuotaCollector.java | 35 +++
.../metadata/cube/storage/StorageVolumeInfo.java | 38 +++
.../cube/storage/TotalStorageCollector.java | 42 ++++
22 files changed, 2069 insertions(+)
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
new file mode 100644
index 0000000000..a3b69ab095
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ *
+ */
+@Setter
+@Getter
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE,
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility =
JsonAutoDetect.Visibility.NONE, setterVisibility =
JsonAutoDetect.Visibility.NONE)
+public class ExecutableOutputPO implements Serializable {
+
+ @JsonIgnore
+ private InputStream contentStream;
+
+ @JsonProperty("content")
+ private String content;
+
+ @JsonProperty("log_path")
+ private String logPath;
+
+ @JsonProperty("status")
+ private String status = "READY";
+
+ @JsonProperty("info")
+ private Map<String, String> info = Maps.newHashMap();
+
+ @JsonProperty("last_modified")
+ private long lastModified;
+
+ @JsonProperty("createTime")
+ private long createTime = System.currentTimeMillis();
+
+ @JsonProperty("start_time")
+ private long startTime;
+
+ @JsonProperty("end_time")
+ private long endTime;
+
+ @JsonProperty("wait_time")
+ private long waitTime;
+
+ @JsonProperty("duration")
+ private long duration;
+
+ @JsonProperty("last_running_start_time")
+ private long lastRunningStartTime;
+
+ @JsonProperty("is_resumable")
+ private boolean resumable = false;
+
+ @JsonProperty("byte_size")
+ private long byteSize;
+
+ @JsonProperty("failed_msg")
+ private String failedMsg;
+
+ @JsonProperty("failed_step_id")
+ private String failedStepId;
+
+ @JsonProperty("failed_segment_id")
+ private String failedSegmentId;
+
+ @JsonProperty("failed_stack")
+ private String failedStack;
+
+ @JsonProperty("failed_reason")
+ private String failedReason;
+
+ public void addStartTime(long time) {
+ //when ready -> suicidal/discarded, the start time is 0
+ if (startTime == 0) {
+ startTime = time;
+ }
+ endTime = 0;
+ }
+
+ public void addEndTime(long time) {
+ Preconditions.checkArgument(startTime > 0L);
+ endTime = time;
+ }
+
+ public void addDuration(long time) {
+ if (time != 0 && time > lastRunningStartTime) {
+ duration = duration + time - lastRunningStartTime;
+ }
+ }
+
+ public void addLastRunningStartTime(long time) {
+ lastRunningStartTime = time;
+ }
+
+ public void resetTime() {
+ createTime = System.currentTimeMillis();
+ startTime = 0L;
+ endTime = 0L;
+ waitTime = 0L;
+ duration = 0L;
+ lastRunningStartTime = 0L;
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
new file mode 100644
index 0000000000..b396efd047
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import static
org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_IDS_DELIMITER;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.JobTypeEnum;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ */
+@Setter
+@Getter
+@SuppressWarnings("serial")
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE,
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility =
JsonAutoDetect.Visibility.NONE, setterVisibility =
JsonAutoDetect.Visibility.NONE)
+public class ExecutablePO extends RootPersistentEntity {
+ public static final int HIGHEST_PRIORITY = 0;
+ public static final int DEFAULT_PRIORITY = 3;
+ public static final int LOWEST_PRIORITY = 4;
+ @JsonProperty("name")
+ private String name;
+
+ @JsonProperty("tasks")
+ private List<ExecutablePO> tasks;
+
+ @JsonProperty("type")
+ private String type;
+
+ @JsonProperty("handler_type")
+ private String handlerType;
+
+ @JsonProperty("params")
+ private Map<String, String> params = Maps.newHashMap();
+
+ @JsonProperty("segments")
+ private Set<NDataSegment> segments = Sets.newHashSet();
+
+ @JsonProperty("job_type")
+ private JobTypeEnum jobType;
+
+ @JsonProperty("data_range_start")
+ private long dataRangeStart;
+
+ @JsonProperty("data_range_end")
+ private long dataRangeEnd;
+
+ @JsonProperty("target_model")
+ private String targetModel;
+
+ @JsonProperty("target_segments")
+ private List<String> targetSegments;
+
+ @JsonProperty("output")
+ private ExecutableOutputPO output = new ExecutableOutputPO();
+
+ private String project;
+
+ @JsonProperty("target_partitions")
+ private Set<Long> targetPartitions = Sets.newHashSet();
+
+ @JsonProperty("priority")
+ private int priority = DEFAULT_PRIORITY;
+
+ @JsonProperty("tag")
+ private Object tag;
+
+ @JsonProperty("stages_map")
+ private Map<String, List<ExecutablePO>> stagesMap;
+
+ public void setPriority(int p) {
+ priority = isPriorityValid(p) ? p : DEFAULT_PRIORITY;
+ }
+
+ @Override
+ public String getResourcePath() {
+ return concatResourcePath(getUuid(), project);
+ }
+
+ public static String concatResourcePath(String name, String project) {
+ return new
StringBuilder().append("/").append(project).append(ResourceStore.EXECUTABLE_JOB).append("/")
+ .append(name).toString();
+ }
+
+ public static boolean isPriorityValid(int priority) {
+ return priority >= HIGHEST_PRIORITY && priority <= LOWEST_PRIORITY;
+ }
+
+ public static boolean isHigherPriority(int p1, int p2) {
+ return p1 < p2;
+ }
+
+ public void addYarnApplicationJob(String appId) {
+ String oldAppIds =
output.getInfo().getOrDefault(ExecutableConstants.YARN_APP_IDS, "");
+ Set<String> appIds = new
HashSet<>(Arrays.asList(oldAppIds.split(YARN_APP_IDS_DELIMITER)));
+ if (!appIds.contains(appId)) {
+ String newAppIds = oldAppIds + (StringUtils.isEmpty(oldAppIds) ?
"" : YARN_APP_IDS_DELIMITER) + appId;
+ output.getInfo().put(ExecutableConstants.YARN_APP_IDS, newAppIds);
+ }
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
new file mode 100644
index 0000000000..0b1928fbbf
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class JobStatistics extends JobStatisticsBasic {
+
+ @JsonProperty("date")
+ private long date;
+ @JsonProperty("model_stats")
+ private Map<String, JobStatisticsBasic> jobStatisticsByModels =
Maps.newHashMap();
+
+ @Override
+ public String resourceName() {
+ return String.valueOf(date);
+ }
+
+ public JobStatistics(long date, String model, long duration, long
byteSize) {
+ this.date = date;
+ setCount(1);
+ setTotalDuration(duration);
+ setTotalByteSize(byteSize);
+ jobStatisticsByModels.put(model, new JobStatisticsBasic(duration,
byteSize));
+ }
+
+ public JobStatistics(int count, long totalDuration, long totalByteSize) {
+ setCount(count);
+ setTotalDuration(totalDuration);
+ setTotalByteSize(totalByteSize);
+ }
+
+ public JobStatistics(long date, long totalDuration, long totalByteSize) {
+ this.date = date;
+ setCount(1);
+ setTotalDuration(totalDuration);
+ setTotalByteSize(totalByteSize);
+ }
+
+ public void update(String model, long duration, long byteSize, int
deltaCount) {
+ super.update(duration, byteSize, deltaCount);
+ JobStatisticsBasic jobStatisticsByModel =
jobStatisticsByModels.get(model);
+ if (jobStatisticsByModel == null) {
+ jobStatisticsByModel = new JobStatisticsBasic(duration, byteSize);
+ } else {
+ jobStatisticsByModel.update(duration, byteSize, deltaCount);
+ }
+
+ jobStatisticsByModels.put(model, jobStatisticsByModel);
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
new file mode 100644
index 0000000000..c05743c3cf
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class JobStatisticsBasic extends RootPersistentEntity {
+ @JsonProperty("count")
+ private int count;
+ @JsonProperty("total_duration")
+ private long totalDuration;
+ @JsonProperty("total_byte_size")
+ private long totalByteSize;
+
+ public void update(long duration, long byteSize, int deltaCount) {
+ this.count += deltaCount;
+ this.totalDuration += duration;
+ this.totalByteSize += byteSize;
+ }
+
+ public JobStatisticsBasic(long totalDuration, long totalByteSize) {
+ this.count = 1;
+ this.totalDuration = totalDuration;
+ this.totalByteSize = totalByteSize;
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
new file mode 100644
index 0000000000..2f0a65bfd3
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.time.DayOfWeek;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.TemporalAdjusters;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.model.NDataModelManager;
+
+import com.google.common.collect.Maps;
+
+public class JobStatisticsManager {
+
+ public static JobStatisticsManager getInstance(KylinConfig config, String
project) {
+ return config.getManager(project, JobStatisticsManager.class);
+ }
+
+ // called by reflection
+ static JobStatisticsManager newInstance(KylinConfig conf, String project) {
+ try {
+ String cls = JobStatisticsManager.class.getName();
+ Class<? extends JobStatisticsManager> clz = ClassUtil.forName(cls,
JobStatisticsManager.class);
+ return clz.getConstructor(KylinConfig.class,
String.class).newInstance(conf, project);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to init DataModelManager from "
+ conf, e);
+ }
+ }
+
+ //
============================================================================
+
+ private KylinConfig config;
+ private String project;
+
+ private CachedCrudAssist<JobStatistics> crud;
+
+ public JobStatisticsManager(KylinConfig config, String project) {
+ init(config, project);
+ }
+
+ protected void init(KylinConfig cfg, final String project) {
+ this.config = cfg;
+ this.project = project;
+ final ResourceStore store =
ResourceStore.getKylinMetaStore(this.config);
+ String resourceRootPath = "/" + this.project +
ResourceStore.JOB_STATISTICS;
+ this.crud = new CachedCrudAssist<JobStatistics>(store,
resourceRootPath, JobStatistics.class) {
+ @Override
+ protected JobStatistics initEntityAfterReload(JobStatistics
jobStatistics, String resourceName) {
+ return jobStatistics;
+ }
+ };
+ }
+
+ public List<JobStatistics> getAll() {
+ return crud.listAll();
+ }
+
+ public JobStatistics updateStatistics(long date, String model, long
duration, long byteSize, int deltaCount) {
+ JobStatistics jobStatistics = crud.get(String.valueOf(date));
+ JobStatistics jobStatisticsToUpdate;
+ if (jobStatistics == null) {
+ jobStatisticsToUpdate = new JobStatistics(date, model, duration,
byteSize);
+ return crud.save(jobStatisticsToUpdate);
+ }
+
+ jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
+ jobStatisticsToUpdate.update(model, duration, byteSize, deltaCount);
+ return crud.save(jobStatisticsToUpdate);
+ }
+
+ public JobStatistics updateStatistics(long date, long duration, long
byteSize, int deltaCount) {
+ JobStatistics jobStatistics = crud.get(String.valueOf(date));
+ JobStatistics jobStatisticsToUpdate;
+ if (jobStatistics == null) {
+ jobStatisticsToUpdate = new JobStatistics(date, duration,
byteSize);
+ return crud.save(jobStatisticsToUpdate);
+ }
+
+ jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
+ jobStatisticsToUpdate.update(duration, byteSize, deltaCount);
+ return crud.save(jobStatisticsToUpdate);
+ }
+
+ public Pair<Integer, JobStatistics> getOverallJobStats(final long
startTime, final long endTime) {
+ // filter
+ List<JobStatistics> filteredJobStats =
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
+ // aggregate all job stats
+ JobStatistics aggregatedStats = aggregateJobStats(filteredJobStats);
+
+ return new Pair<>(aggregatedStats.getCount(), aggregatedStats);
+ }
+
+ public Map<String, Integer> getJobCountByTime(final long startTime, final
long endTime,
+ final String timeDimension) {
+ Map<String, Integer> result = Maps.newHashMap();
+ aggregateJobStatsByTime(startTime, endTime,
timeDimension).forEach((key, value) -> {
+ result.put(key, value.getCount());
+ });
+ return result;
+ }
+
+ public Map<String, Integer> getJobCountByModel(long startTime, long
endTime) {
+ Map<String, Integer> result = Maps.newHashMap();
+
+ aggregateStatsByModel(startTime, endTime).forEach((modelName, value)
-> {
+ String modelAlias = getModelAlias(modelName);
+ if (modelAlias == null)
+ return;
+ result.put(modelAlias, value.getCount());
+ });
+
+ return result;
+ }
+
+ public Map<String, Double> getDurationPerByteByTime(final long startTime,
final long endTime,
+ final String timeDimension) {
+ Map<String, JobStatisticsBasic> aggregateResult =
aggregateJobStatsByTime(startTime, endTime, timeDimension);
+ return calculateDurationPerByte(aggregateResult);
+ }
+
+ public Map<String, Double> getDurationPerByteByModel(long startTime, long
endTime) {
+ Map<String, JobStatisticsBasic> transformedResult = Maps.newHashMap();
+
+ aggregateStatsByModel(startTime, endTime).forEach((modelName, value)
-> {
+ String modelAlias = getModelAlias(modelName);
+ if (modelAlias == null)
+ return;
+ transformedResult.put(modelAlias,
+ new JobStatisticsBasic(value.getTotalDuration(),
value.getTotalByteSize()));
+ });
+
+ return calculateDurationPerByte(transformedResult);
+ }
+
+ private String getModelAlias(String modelId) {
+ NDataModelManager dataModelManager =
NDataModelManager.getInstance(config, project);
+ NDataModel model = dataModelManager.getDataModelDesc(modelId);
+ if (model == null)
+ return null;
+
+ return model.getAlias();
+ }
+
+ private JobStatistics aggregateJobStats(List<JobStatistics>
jobStatisticsToAggregate) {
+ return jobStatisticsToAggregate.stream()
+ .reduce((x, y) -> new JobStatistics(x.getCount() +
y.getCount(),
+ x.getTotalDuration() + y.getTotalDuration(),
x.getTotalByteSize() + y.getTotalByteSize()))
+ .orElse(new JobStatistics());
+ }
+
+ // key is the date, value is the aggregated job stats
+ private Map<String, JobStatisticsBasic> aggregateJobStatsByTime(final long
startTime, final long endTime,
+ final String timeDimension) {
+ Map<String, JobStatisticsBasic> result = Maps.newHashMap();
+
+ List<JobStatistics> qulifiedJobStats =
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
+
+ long startDate = startTime;
+ while (startDate <= endTime) {
+ long nextDate = nextDate(startDate, timeDimension);
+ List<JobStatistics> list =
getFilteredJobStatsByTime(qulifiedJobStats, startDate, nextDate);
+ result.put(formatDateTime(startDate), aggregateJobStats(list));
+ startDate = nextDate;
+ }
+
+ return result;
+ }
+
+ // format epoch time to date string
+ private String formatDateTime(long time) {
+ ZoneId zoneId = TimeZone.getDefault().toZoneId();
+ LocalDateTime localDateTime =
Instant.ofEpochMilli(time).atZone(zoneId).toLocalDateTime();
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd",
+ Locale.getDefault(Locale.Category.FORMAT));
+ return localDateTime.format(formatter);
+ }
+
+ private long nextDate(final long date, final String timeDimension) {
+ ZoneId zoneId = TimeZone.getTimeZone(config.getTimeZone()).toZoneId();
+ LocalDate localDate =
Instant.ofEpochMilli(date).atZone(zoneId).toLocalDate();
+ switch (timeDimension) {
+ case "day":
+ localDate = localDate.plusDays(1);
+ break;
+ case "week":
+ localDate =
localDate.with(TemporalAdjusters.next(DayOfWeek.MONDAY));
+ break;
+ case "month":
+ localDate =
localDate.with(TemporalAdjusters.firstDayOfNextMonth());
+ break;
+ default:
+ localDate = localDate.plusDays(1);
+ break;
+ }
+
+ return localDate.atStartOfDay(zoneId).toInstant().toEpochMilli();
+ }
+
+ private Map<String, JobStatisticsBasic> aggregateStatsByModel(long
startTime, long endTime) {
+ return getFilteredJobStatsByTime(crud.listAll(), startTime,
endTime).stream()
+ .map(JobStatistics::getJobStatisticsByModels).reduce((x, y) ->
{
+ // merge two maps
+ Map<String, JobStatisticsBasic> mergedMap =
Maps.newHashMap(x);
+ y.forEach((k, v) -> mergedMap.merge(k, v,
+ (value1, value2) -> new
JobStatisticsBasic(value1.getCount() + value2.getCount(),
+ value1.getTotalDuration() +
value2.getTotalDuration(),
+ value1.getTotalByteSize() +
value2.getTotalByteSize())));
+ return mergedMap;
+ }).orElse(Maps.newHashMap());
+ }
+
+ private List<JobStatistics> getFilteredJobStatsByTime(final
List<JobStatistics> list, final long startTime,
+ final long endTime) {
+ return list.stream()
+ .filter(singleStats -> singleStats.getDate() >= startTime &&
singleStats.getDate() < endTime)
+ .collect(Collectors.toList());
+ }
+
+ private Map<String, Double> calculateDurationPerByte(Map<String,
JobStatisticsBasic> totalMetricMap) {
+ Map<String, Double> result = Maps.newHashMap();
+ for (Map.Entry<String, JobStatisticsBasic> entry :
totalMetricMap.entrySet()) {
+ double totalDuration = entry.getValue().getTotalDuration();
+ double totalByteSize = entry.getValue().getTotalByteSize();
+ if (totalByteSize == 0)
+ result.put(entry.getKey(), .0);
+ else {
+ result.put(entry.getKey(), totalDuration / totalByteSize);
+ }
+ }
+ return result;
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
new file mode 100644
index 0000000000..5c353d989c
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.dao;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import lombok.val;
+
+/**
+ */
+public class NExecutableDao {
+
+ private static final Serializer<ExecutablePO> JOB_SERIALIZER = new
JsonSerializer<>(ExecutablePO.class);
+ private static final Logger logger =
LoggerFactory.getLogger(NExecutableDao.class);
+
+ public static NExecutableDao getInstance(KylinConfig config, String
project) {
+ return config.getManager(project, NExecutableDao.class);
+ }
+
+ // called by reflection
+ static NExecutableDao newInstance(KylinConfig config, String project) {
+ return new NExecutableDao(config, project);
+ }
+ //
============================================================================
+
+ private String project;
+
+ private KylinConfig config;
+
+ private CachedCrudAssist<ExecutablePO> crud;
+
+ private NExecutableDao(KylinConfig config, String project) {
+ logger.trace("Using metadata url: {}", config);
+ this.project = project;
+ this.config = config;
+ String resourceRootPath = "/" + project +
ResourceStore.EXECUTE_RESOURCE_ROOT;
+ this.crud = new CachedCrudAssist<ExecutablePO>(getStore(),
resourceRootPath, "", ExecutablePO.class) {
+ @Override
+ protected ExecutablePO initEntityAfterReload(ExecutablePO entity,
String resourceName) {
+ entity.setProject(project);
+ return entity;
+ }
+ };
+ }
+
+ public List<ExecutablePO> getJobs() {
+ return crud.listAll();
+ }
+
+ public List<ExecutablePO> getPartialJobs(Predicate<String> predicate) {
+ return crud.listPartial(predicate);
+ }
+
+ public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) {
+ return crud.listAll().stream()
+ .filter(x -> x.getLastModified() >= timeStart &&
x.getLastModified() < timeEndExclusive)
+ .collect(Collectors.toList());
+ }
+
+ public List<String> getJobIds() {
+ return
crud.listAll().stream().sorted(Comparator.comparing(ExecutablePO::getCreateTime))
+
.sorted(Comparator.comparing(ExecutablePO::getPriority)).map(RootPersistentEntity::resourceName)
+ .collect(Collectors.toList());
+ }
+
+ public ExecutablePO getJobByUuid(String uuid) {
+ return crud.get(uuid);
+ }
+
+ public ExecutablePO addJob(ExecutablePO job) {
+ if (getJobByUuid(job.getUuid()) != null) {
+ throw new IllegalArgumentException("job id:" + job.getUuid() + "
already exists");
+ }
+ crud.save(job);
+ return job;
+ }
+
+ // for ut
+ @VisibleForTesting
+ public void deleteAllJob() {
+ getJobs().forEach(job -> deleteJob(job.getId()));
+ }
+
+ public void deleteJob(String uuid) {
+ crud.delete(uuid);
+ }
+
+ public void updateJob(String uuid, Predicate<ExecutablePO> updater) {
+ val job = getJobByUuid(uuid);
+ Preconditions.checkNotNull(job);
+ val copyForWrite = JsonUtil.copyBySerialization(job, JOB_SERIALIZER,
null);
+ if (updater.test(copyForWrite)) {
+ crud.save(copyForWrite);
+ }
+ }
+
+ private ResourceStore getStore() {
+ return ResourceStore.getKylinMetaStore(config);
+ }
+
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
new file mode 100644
index 0000000000..a8c6f3c973
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.logging.SetLogCategory;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.SneakyThrows;
+
+public abstract class AbstractDefaultSchedulerRunner implements Runnable {
+ private static final Logger logger =
LoggerFactory.getLogger(AbstractDefaultSchedulerRunner.class);
+ protected final NDefaultScheduler nDefaultScheduler;
+
+ protected final ExecutableContext context;
+
+ protected final String project;
+
+ public AbstractDefaultSchedulerRunner(final NDefaultScheduler
nDefaultScheduler) {
+ this.nDefaultScheduler = nDefaultScheduler;
+ this.context = nDefaultScheduler.getContext();
+ this.project = nDefaultScheduler.getProject();
+ }
+
+ @SneakyThrows
+ private boolean checkEpochIdFailed() {
+ //check failed if isInterrupted
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ if (!KylinConfig.getInstanceFromEnv().isUTEnv()
+ &&
!EpochManager.getInstance().checkEpochId(context.getEpochId(), project)) {
+ nDefaultScheduler.forceShutdown();
+ return true;
+ }
+ return false;
+ }
+
+ // stop job if Storage Quota Limit reached
+ protected void stopJobIfSQLReached(String jobId) {
+ if (context.isReachQuotaLimit()) {
+ try {
+ EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+ if (context.isReachQuotaLimit()) {
+
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).pauseJob(jobId);
+ logger.info("Job {} paused due to no available storage
quota.", jobId);
+ logger.info("Please clean up low-efficient storage in
time, "
+ + "increase the low-efficient storage
threshold, "
+ + "or notify the administrator to increase the
storage quota for this project.");
+ }
+ return null;
+ }, project, UnitOfWork.DEFAULT_MAX_RETRY,
context.getEpochId(), jobId);
+ } catch (Exception e) {
+ logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {}
failed to pause", project, jobId, e);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try (SetLogCategory ignored = new SetLogCategory("schedule")) {
+ if (checkEpochIdFailed()) {
+ return;
+ }
+ doRun();
+ }
+ }
+
+ protected abstract void doRun();
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
new file mode 100644
index 0000000000..c9bb123a45
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+
+public class FetcherRunner extends AbstractDefaultSchedulerRunner {
+
+ private static final Logger logger =
LoggerFactory.getLogger(FetcherRunner.class);
+
+ private final ExecutorService jobPool;
+
+ private final ScheduledExecutorService fetcherPool;
+
+ public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService
jobPool,
+ ScheduledExecutorService fetcherPool) {
+ super(nDefaultScheduler);
+ this.jobPool = jobPool;
+ this.fetcherPool = fetcherPool;
+ }
+
+ private boolean checkSuicide(String jobId) {
+ val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ if (executableManager.getJob(jobId).getStatus().isFinalState()) {
+ return false;
+ }
+ return executableManager.getJob(jobId).checkSuicide();
+ }
+
+ private boolean markSuicideJob(String jobId) {
+ try {
+ if (checkSuicide(jobId)) {
+ return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(()
-> {
+ if (checkSuicide(jobId)) {
+
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).suicideJob(jobId);
+ return true;
+ }
+ return false;
+ }, project, UnitOfWork.DEFAULT_MAX_RETRY,
context.getEpochId(), jobId);
+ }
+ } catch (Exception e) {
+ logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should
be suicidal but discard failed", project,
+ jobId, e);
+ }
+ return false;
+ }
+
+ private boolean markErrorJob(String jobId) {
+ try {
+ return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+ val manager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ manager.errorJob(jobId);
+ return true;
+ }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(),
jobId);
+ } catch (Exception e) {
+ logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should
be error but mark failed", project,
+ jobId, e);
+ }
+ return false;
+ }
+
+ @Override
+ public void doRun() {
+ try {
+ val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ Map<String, Executable> runningJobs = context.getRunningJobs();
+
+ int nRunning = 0;
+ int nReady = 0;
+ int nStopped = 0;
+ int nOthers = 0;
+ int nError = 0;
+ int nDiscarded = 0;
+ int nSucceed = 0;
+ int nSuicidal = 0;
+ for (final String id : executableManager.getJobs()) {
+ if (markSuicideJob(id)) {
+ nSuicidal++;
+ continue;
+ }
+
+ if (runningJobs.containsKey(id)) {
+
+ // this is very important to prevent from same job being
scheduled at same time.
+ // e.g. when a job is restarted, the old job may still be
running (even if we tried to interrupt it)
+ // until the old job is finished, the new job should not
start
+ nRunning++;
+ continue;
+ }
+
+ final Output output = executableManager.getOutput(id);
+
+ switch (output.getState()) {
+ case READY:
+ nReady++;
+ if (isJobPoolFull()) {
+ break;
+ }
+
+ if (context.isReachQuotaLimit()) {
+ stopJobIfSQLReached(id);
+ break;
+ }
+
+ logger.info("fetcher schedule {} ", id);
+ scheduleJob(id);
+ break;
+ case DISCARDED:
+ nDiscarded++;
+ break;
+ case ERROR:
+ nError++;
+ break;
+ case SUCCEED:
+ nSucceed++;
+ break;
+ case PAUSED:
+ nStopped++;
+ break;
+ case SUICIDAL:
+ nSuicidal++;
+ break;
+ default:
+ if (allSubTasksSuccess(id)) {
+ logger.info("All sub tasks are successful, reschedule
job {}", id);
+ scheduleJob(id);
+ break;
+ }
+ logger.warn("Unexpected status for {} <{}>", id,
output.getState());
+ if (markErrorJob(id)) {
+ nError++;
+ } else {
+ nOthers++;
+ }
+ break;
+ }
+ }
+
+ logger.info(
+ "Job Status in project {}: {} should running, {} actual
running, {} stopped, {} ready, {} already succeed, {} error, {} discarded, {}
suicidal, {} others",
+ project, nRunning, runningJobs.size(), nStopped, nReady,
nSucceed, nError, nDiscarded, nSuicidal,
+ nOthers);
+ } catch (Exception e) {
+ logger.warn("Job Fetcher caught a exception ", e);
+ }
+ }
+
+ private boolean allSubTasksSuccess(String id) {
+ val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+
+ // check special case, all sub task success, show make current job to
success
+ AbstractExecutable job = executableManager.getJob(id);
+ if (job instanceof DefaultChainedExecutable) {
+ return ((DefaultChainedExecutable) job).getTasks().stream()
+ .allMatch(abstractExecutable ->
abstractExecutable.getStatus() == ExecutableState.SUCCEED);
+ }
+
+ return false;
+ }
+
+ private void scheduleJob(String id) {
+ AbstractExecutable executable = null;
+ String jobDesc = null;
+
+ boolean memoryLock = false;
+ int useMemoryCapacity = 0;
+ try {
+ val config = KylinConfig.getInstanceFromEnv();
+ val executableManager = NExecutableManager.getInstance(config,
project);
+ executable = executableManager.getJob(id);
+ if (!config.getDeployMode().equals("cluster")) {
+ useMemoryCapacity = executable.computeStepDriverMemory();
+ }
+ memoryLock =
NDefaultScheduler.getMemoryRemaining().tryAcquire(useMemoryCapacity);
+ if (memoryLock) {
+ jobDesc = executable.toString();
+ logger.info("{} prepare to schedule", jobDesc);
+ context.addRunningJob(executable);
+ jobPool.execute(new JobRunner(nDefaultScheduler, executable,
this));
+ logger.info("{} scheduled", jobDesc);
+ } else {
+ logger.info("memory is not enough, remaining: {} MB",
+
NDefaultScheduler.getMemoryRemaining().availablePermits());
+ }
+ } catch (Exception ex) {
+ if (executable != null) {
+ context.removeRunningJob(executable);
+ if (memoryLock) {
+ // may release twice when exception raise after jobPool
execute executable
+
NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity);
+ }
+ }
+ logger.warn("{} fail to schedule", jobDesc, ex);
+ }
+ }
+
+ private boolean isJobPoolFull() {
+ if (context.getRunningJobs().size() >=
nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
+ logger.warn("There are too many jobs running, Job Fetch will wait
until next schedule time.");
+ return true;
+ }
+ return false;
+ }
+
+ void scheduleNext() {
+ fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
new file mode 100644
index 0000000000..fc29a5ebb4
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BufferedLogger;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.ExecutableApplication;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ForkBasedJobRunner extends JobRunnerFactory.AbstractJobRunner {
+
+ private final CliCommandExecutor cliExecutor = new CliCommandExecutor();
+
+ public ForkBasedJobRunner(KylinConfig kylinConfig, String project,
List<String> originResources) {
+ super(kylinConfig, project, originResources);
+ }
+
+ @Override
+ protected void doExecute(ExecutableApplication app, Map<String, String>
args) throws Exception {
+ String finalCommand = String.format(Locale.ROOT, "bash -x
%s/sbin/bootstrap.sh %s %s 2>>%s",
+ KylinConfig.getKylinHome(), app.getClass().getName(),
formatArgs(args), getJobTmpDir() + "/job.log");
+ log.info("Try to execute {}", finalCommand);
+ cliExecutor.execute(finalCommand, new BufferedLogger(log), jobId);
+ }
+
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
new file mode 100644
index 0000000000..119c3313e3
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ExecutableApplication;
+
+public class InMemoryJobRunner extends JobRunnerFactory.AbstractJobRunner {
+
+ public InMemoryJobRunner(KylinConfig config, String project, List<String>
resources) {
+ super(config, project, resources);
+ }
+
+ @Override
+ public void doExecute(ExecutableApplication app, Map<String, String> args)
throws Exception {
+ app.execute(formatArgs(args).split(" "));
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
new file mode 100644
index 0000000000..195a244900
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+
+public class JobCheckRunner extends AbstractDefaultSchedulerRunner {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JobCheckRunner.class);
+
+ public JobCheckRunner(NDefaultScheduler nDefaultScheduler) {
+ super(nDefaultScheduler);
+ }
+
+ private boolean checkTimeoutIfNeeded(String jobId, Long startTime) {
+ Integer timeOutMinute =
KylinConfig.getInstanceFromEnv().getSchedulerJobTimeOutMinute();
+ if (timeOutMinute == 0) {
+ return false;
+ }
+ val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ if (executableManager.getJob(jobId).getStatus().isFinalState()) {
+ return false;
+ }
+ long duration = System.currentTimeMillis() - startTime;
+ long durationMins = Math.toIntExact(duration / (60 * 1000));
+ return durationMins >= timeOutMinute;
+ }
+
+ private boolean discardTimeoutJob(String jobId, Long startTime) {
+ try {
+ if (checkTimeoutIfNeeded(jobId, startTime)) {
+ return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(()
-> {
+ if (checkTimeoutIfNeeded(jobId, startTime)) {
+ logger.error("project {} job {} running timeout.",
project, jobId);
+
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).errorJob(jobId);
+ return true;
+ }
+ return false;
+ }, project, UnitOfWork.DEFAULT_MAX_RETRY,
context.getEpochId(), jobId);
+ }
+ } catch (Exception e) {
+ logger.warn("[UNEXPECTED_THINGS_HAPPENED] project " + project + "
job " + jobId
+ + " should be timeout but discard failed", e);
+ }
+ return false;
+ }
+
+ @Override
+ protected void doRun() {
+
+ logger.info("start check project {} job pool.", project);
+
+ val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ Map<String, Executable> runningJobs = context.getRunningJobs();
+ Map<String, Long> runningJobInfos = context.getRunningJobInfos();
+ for (final String id : executableManager.getJobs()) {
+ if (runningJobs.containsKey(id)) {
+ discardTimeoutJob(id, runningJobInfos.get(id));
+ stopJobIfSQLReached(id);
+ }
+ }
+
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
new file mode 100644
index 0000000000..3e839f57f4
--- /dev/null
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SetThreadName;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
+import org.apache.kylin.job.exception.JobStoppedVoluntarilyException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.common.logging.SetLogCategory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+import lombok.var;
+
+public class JobRunner extends AbstractDefaultSchedulerRunner {
+ private static final Logger logger =
LoggerFactory.getLogger(JobRunner.class);
+
+ private final AbstractExecutable executable;
+
+ private final FetcherRunner fetcherRunner;
+
+ public JobRunner(NDefaultScheduler nDefaultScheduler, AbstractExecutable
executable, FetcherRunner fetcherRunner) {
+ super(nDefaultScheduler);
+ this.fetcherRunner = fetcherRunner;
+ this.executable = executable;
+ }
+
+ @Override
+ protected void doRun() {
+ //only the first 8 chars of the job uuid
+ val jobIdSimple = executable.getId().substring(0, 8);
+ try (SetThreadName ignored = new
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
+ SetLogCategory logCategory = new SetLogCategory("schedule")) {
+ executable.execute(context);
+ // trigger the next step asap
+ fetcherRunner.scheduleNext();
+ } catch (JobStoppedVoluntarilyException |
JobStoppedNonVoluntarilyException e) {
+ logger.info("Job quits either voluntarily or non-voluntarily,job:
{}", jobIdSimple, e);
+ } catch (ExecuteException e) {
+ logger.error("ExecuteException occurred while job: " +
executable.getId(), e);
+ } catch (Exception e) {
+ logger.error("unknown error execute job: " + executable.getId(),
e);
+ } finally {
+ context.removeRunningJob(executable);
+ val config = KylinConfig.getInstanceFromEnv();
+ var usingMemory = 0;
+ if (!config.getDeployMode().equals("cluster")) {
+ usingMemory = executable.computeStepDriverMemory();
+ }
+ logger.info("Before job:{} global memory release {}", jobIdSimple,
+ NDefaultScheduler.getMemoryRemaining().availablePermits());
+ NDefaultScheduler.getMemoryRemaining().release(usingMemory);
+ logger.info("After job:{} global memory release {}", jobIdSimple,
+ NDefaultScheduler.getMemoryRemaining().availablePermits());
+ }
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
new file mode 100644
index 0000000000..0713558d35
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigBase;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ExecutableApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.ZipFileUtils;
+import org.apache.kylin.common.persistence.metadata.MetadataStore;
+import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
+import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+
+import com.google.common.collect.Maps;
+
+import lombok.RequiredArgsConstructor;
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class JobRunnerFactory {
+
+ private JobRunnerFactory() {
+ // Just implement it
+ }
+
+ public static AbstractJobRunner createRunner(KylinConfig config, String
type, String project,
+ List<String> resources) {
+ switch (type) {
+ case "fork":
+ return new ForkBasedJobRunner(config, project, resources);
+ case "in-memory":
+ return new InMemoryJobRunner(config, project, resources);
+ default:
+ throw new NotImplementedException("Runner type " + type + " not
implement");
+ }
+ }
+
+ @RequiredArgsConstructor
+ public abstract static class AbstractJobRunner {
+
+ protected final KylinConfig kylinConfig;
+ protected final String project;
+ protected final List<String> originResources;
+ private Consumer<Properties> configUpdater;
+
+ protected String metaDumpUrl;
+ protected String jobId;
+
+ public String prepareEnv(String jobId) throws IOException {
+ this.jobId = jobId;
+ val jobTmpDir = getJobTmpDir();
+ FileUtils.forceMkdir(new File(jobTmpDir));
+
+ metaDumpUrl = kylinConfig.getMetadataUrlPrefix() +
"@hdfs,path=file://" + jobTmpDir + "/meta";
+ return jobTmpDir;
+ }
+
+ public void start(ExecutableApplication app, Map<String, String> args)
throws Exception {
+ attachMetadataAndKylinProps(false);
+ args.put("meta", metaDumpUrl);
+ args.put("metaOutput", metaDumpUrl + "_output");
+ doExecute(app, args);
+ uploadJobLog();
+ }
+
+ public void cleanupEnv() {
+ FileUtils.deleteQuietly(new File(getJobTmpDir()));
+ }
+
+ protected abstract void doExecute(ExecutableApplication app,
Map<String, String> args) throws Exception;
+
+ protected String formatArgs(Map<String, String> args) {
+ return args.entrySet().stream().map(entry -> "--" + entry.getKey()
+ "=" + entry.getValue())
+ .collect(Collectors.joining(" "));
+ }
+
+ public String getJobTmpDir() {
+ return KylinConfigBase.getKylinHome() + "/tmp/" + jobId;
+ }
+
+ public void setConfigUpdater(Consumer<Properties> consumer) {
+ this.configUpdater = consumer;
+ }
+
+ protected boolean uploadJobLog() {
+ val jobContentZip = getJobTmpDir() + ".zip";
+ try {
+ ZipFileUtils.compressZipFile(getJobTmpDir(), jobContentZip);
+ val jobDir = kylinConfig.getJobTmpDir(project, true);
+ val fs = HadoopUtil.getFileSystem(jobDir);
+
+ try (val in = new FileInputStream(new File(jobContentZip));
+ val out = fs.create(new Path(jobDir + jobId + ".zip"),
true)) {
+ IOUtils.copy(in, out);
+ }
+ return true;
+ } catch (IOException e) {
+ log.warn("Upload Job Evidence failed {}", jobId, e);
+ } finally {
+ FileUtils.deleteQuietly(new File(jobContentZip));
+ }
+ return false;
+ }
+
+ protected void attachMetadataAndKylinProps(boolean kylinPropsOnly)
throws IOException {
+ if (StringUtils.isEmpty(metaDumpUrl)) {
+ throw new RuntimeException("Missing metaUrl");
+ }
+
+ File tmpDir = File.createTempFile("kylin_job_meta", "");
+ try {
+ org.apache.commons.io.FileUtils.forceDelete(tmpDir); // we
need a directory, so delete the file first
+
+ Properties props = kylinConfig.exportToProperties();
+ props.setProperty("kylin.metadata.url", metaDumpUrl);
+ if (configUpdater != null) {
+ configUpdater.accept(props);
+ }
+
+ if (kylinPropsOnly) {
+ ResourceStore.dumpKylinProps(tmpDir, props);
+ } else {
+ // The way of Updating metadata is CopyOnWrite. So it is
safe to use Reference in the value.
+ Map<String, RawResource> dumpMap =
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(
+ UnitOfWorkParams.<Map>
builder().readonly(true).unitName(project).processor(() -> {
+ Map<String, RawResource> retMap =
Maps.newHashMap();
+ for (String resPath : originResources) {
+ ResourceStore resourceStore =
ResourceStore.getKylinMetaStore(kylinConfig);
+ RawResource rawResource =
resourceStore.getResource(resPath);
+ retMap.put(resPath, rawResource);
+ }
+ return retMap;
+ }).build());
+
+ if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) {
+ return;
+ }
+ // dump metadata
+ ResourceStore.dumpResourceMaps(kylinConfig, tmpDir,
dumpMap, props);
+ }
+
+ // copy metadata to target metaUrl
+ KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+
MetadataStore.createMetadataStore(dstConfig).uploadFromFile(tmpDir);
+ // clean up
+ log.debug("Copied metadata to the target metaUrl, delete the
temp dir: {}", tmpDir);
+ } finally {
+ FileUtils.forceDelete(tmpDir);
+ }
+ }
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
new file mode 100644
index 0000000000..1b51dd2c3b
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import static
org.apache.kylin.common.exception.CommonErrorCode.LICENSE_OVER_CAPACITY;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import lombok.val;
+
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class LicenseCapacityCheckRunner extends AbstractDefaultSchedulerRunner
{
+ private static final Logger logger =
LoggerFactory.getLogger(LicenseCapacityCheckRunner.class);
+
+ public LicenseCapacityCheckRunner(NDefaultScheduler nDefaultScheduler) {
+ super(nDefaultScheduler);
+ }
+
+ @Override
+ protected void doRun() {
+ logger.info("start check license capacity for project {}", project);
+ context.setLicenseOverCapacity(isLicenseOverCapacity());
+ }
+
+ private boolean isLicenseOverCapacity() {
+ val sourceUsageManager =
SourceUsageManager.getInstance(KylinConfig.getInstanceFromEnv());
+
+ try {
+ sourceUsageManager.checkIsOverCapacity(project);
+ } catch (KylinException e) {
+ if (LICENSE_OVER_CAPACITY.toErrorCode() == e.getErrorCode()) {
+ logger.warn("Source usage over capacity, no job will be
scheduled.", e);
+ return true;
+ }
+ } catch (Throwable e) {
+ logger.warn("Check source usage over capacity failed.", e);
+ }
+
+ // not over capacity
+ return false;
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
new file mode 100644
index 0000000000..c7f3140671
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.job.runners;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
+import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector;
+import org.apache.kylin.metadata.cube.storage.StorageInfoEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import lombok.val;
+import lombok.var;
+
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class QuotaStorageCheckRunner extends AbstractDefaultSchedulerRunner {
+ private static final Logger logger =
LoggerFactory.getLogger(QuotaStorageCheckRunner.class);
+
+ private final ProjectStorageInfoCollector collector;
+
+ public QuotaStorageCheckRunner(NDefaultScheduler nDefaultScheduler) {
+ super(nDefaultScheduler);
+
+ val storageInfoEnumList =
Lists.newArrayList(StorageInfoEnum.STORAGE_QUOTA,
StorageInfoEnum.TOTAL_STORAGE);
+ collector = new ProjectStorageInfoCollector(storageInfoEnumList);
+ }
+
+ @Override
+ protected void doRun() {
+ logger.info("start check project {} storage quota.",
nDefaultScheduler.getProject());
+ context.setReachQuotaLimit(reachStorageQuota());
+ }
+
+ private boolean reachStorageQuota() {
+ var storageVolumeInfo =
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
+ nDefaultScheduler.getProject());
+ var totalSize = storageVolumeInfo.getTotalStorageSize();
+ int retryCount = 3;
+ while (retryCount-- > 0 && totalSize < 0) {
+ storageVolumeInfo =
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
+ nDefaultScheduler.getProject());
+ totalSize = storageVolumeInfo.getTotalStorageSize();
+ }
+ val storageQuotaSize = storageVolumeInfo.getStorageQuotaSize();
+ if (totalSize < 0) {
+ logger.error(
+ "Project '{}' : an exception occurs when getting storage
volume info, no job will be scheduled!!! The error info : {}",
+ nDefaultScheduler.getProject(),
+
storageVolumeInfo.getThrowableMap().get(StorageInfoEnum.TOTAL_STORAGE));
+ return true;
+ }
+ if (totalSize >= storageQuotaSize) {
+ logger.info("Project '{}' reach storage quota, no job will be
scheduled!!!",
+ nDefaultScheduler.getProject());
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
new file mode 100644
index 0000000000..5fb6ecc8ff
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.cube.model.NDataLayout;
+import org.apache.kylin.metadata.cube.model.NDataSegment;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.optimization.IndexOptimizer;
+import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory;
+import org.apache.kylin.metadata.model.NDataModel;
+
+import com.google.common.collect.Maps;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class GarbageStorageCollector implements StorageInfoCollector {
+
+ @Override
+ public void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) {
+ Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap();
+ long storageSize = 0L;
+
+ for (val model : getModels(project)) {
+ val dataflow = getDataflow(model).copy();
+
+ final IndexOptimizer indexOptimizer =
IndexOptimizerFactory.getOptimizer(dataflow, false);
+ val garbageLayouts =
indexOptimizer.getGarbageLayoutMap(dataflow).keySet();
+ if (CollectionUtils.isNotEmpty(garbageLayouts)) {
+ storageSize += calculateLayoutSize(garbageLayouts, dataflow);
+ garbageIndexMap.put(model.getId(), garbageLayouts);
+ }
+ }
+
+ storageVolumeInfo.setGarbageModelIndexMap(garbageIndexMap);
+ storageVolumeInfo.setGarbageStorageSize(storageSize);
+ }
+
+ private List<NDataModel> getModels(String project) {
+ val dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+ return dataflowManager.listUnderliningDataModels();
+ }
+
+ private NDataflow getDataflow(NDataModel model) {
+ val dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(),
model.getProject());
+ return dataflowManager.getDataflow(model.getUuid());
+ }
+
+ private long calculateLayoutSize(Set<Long> cuboidLayoutIdSet, NDataflow
dataflow) {
+ long cuboidLayoutSize = 0L;
+ for (NDataSegment segment :
dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING)) {
+ for (Long cuboidLayoutId : cuboidLayoutIdSet) {
+ NDataLayout dataCuboid =
segment.getSegDetails().getLayoutById(cuboidLayoutId);
+ if (dataCuboid != null) {
+ cuboidLayoutSize += dataCuboid.getByteSize();
+ }
+ }
+ }
+ return cuboidLayoutSize;
+ }
+
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
new file mode 100644
index 0000000000..6a5f4d7c56
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.util.List;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+public class ProjectStorageInfoCollector {
+
+ private List<StorageInfoCollector> collectors = Lists.newArrayList();
+
+ private static final ImmutableMap<Class, StorageInfoEnum> collectorType =
ImmutableMap
+ .<Class, StorageInfoEnum>
builder().put(GarbageStorageCollector.class, StorageInfoEnum.GARBAGE_STORAGE)
+ .put(TotalStorageCollector.class, StorageInfoEnum.TOTAL_STORAGE)
+ .put(StorageQuotaCollector.class,
StorageInfoEnum.STORAGE_QUOTA).build();
+
+ public ProjectStorageInfoCollector(List<StorageInfoEnum> storageInfoList) {
+ if (CollectionUtils.isNotEmpty(storageInfoList)) {
+ storageInfoList.forEach(si -> addCollectors(si));
+ }
+ }
+
+ private void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) {
+ for (StorageInfoCollector collector : collectors) {
+ try {
+ collector.collect(config, project, storageVolumeInfo);
+ } catch (Exception e) {
+
storageVolumeInfo.getThrowableMap().put(collectorType.get(collector.getClass()),
e);
+ }
+ }
+ }
+
+ private void addCollectors(StorageInfoEnum storageInfoEnum) {
+ switch (storageInfoEnum) {
+ case GARBAGE_STORAGE:
+ collectors.add(new GarbageStorageCollector());
+ break;
+ case TOTAL_STORAGE:
+ collectors.add(new TotalStorageCollector());
+ break;
+ case STORAGE_QUOTA:
+ collectors.add(new StorageQuotaCollector());
+ break;
+ default:
+ break;
+ }
+ }
+
+ public StorageVolumeInfo getStorageVolumeInfo(KylinConfig config, String
project) {
+ StorageVolumeInfo storageVolumeInfo = new StorageVolumeInfo();
+ if (StringUtils.isBlank(project) ||
CollectionUtils.isEmpty(collectors)) {
+ return storageVolumeInfo;
+ }
+ collect(config, project, storageVolumeInfo);
+ return storageVolumeInfo;
+ }
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
new file mode 100644
index 0000000000..4218244b06
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.KylinConfig;
+
+public interface StorageInfoCollector {
+
+ void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) throws IOException;
+
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
new file mode 100644
index 0000000000..ac29378738
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+public enum StorageInfoEnum {
+ GARBAGE_STORAGE, TOTAL_STORAGE, STORAGE_QUOTA
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
new file mode 100644
index 0000000000..951f896ef6
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.metadata.project.NProjectManager;
+
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class StorageQuotaCollector implements StorageInfoCollector {
+
+ @Override
+ public void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) {
+ config =
NProjectManager.getInstance(config).getProject(project).getConfig();
+ long quotaSize = config.getStorageQuotaSize();
+ storageVolumeInfo.setStorageQuotaSize(quotaSize);
+ }
+
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
new file mode 100644
index 0000000000..d77b18f18f
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.metadata.cube.storage;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Maps;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.kylin.common.annotation.Clarification;
+
+@Getter
+@Setter
+@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
+public class StorageVolumeInfo {
+ private long storageQuotaSize = -1L;
+ private long totalStorageSize = -1L;
+ private long garbageStorageSize = -1L;
+ private Map<String, Set<Long>> garbageModelIndexMap = Maps.newHashMap();
+ private Map<StorageInfoEnum, Throwable> throwableMap = Maps.newHashMap();
+}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
new file mode 100644
index 0000000000..e5101a9f1a
--- /dev/null
+++
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.storage;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+
+public class TotalStorageCollector implements StorageInfoCollector {
+
+ @Override
+ public void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) throws IOException {
+ String strPath = config.getWorkingDirectoryWithConfiguredFs(project);
+ Path path = new Path(strPath);
+ FileSystem fs =
path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+ long totalStorageSize = 0L;
+ if (fs.exists(path)) {
+ totalStorageSize = HadoopUtil.getContentSummary(fs,
path).getLength();
+ }
+ storageVolumeInfo.setTotalStorageSize(totalStorageSize);
+ }
+
+}