This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 205bf2c50f KYLIN-5217 Create a initial commit
205bf2c50f is described below
commit 205bf2c50fc6416910fd83846cf62d86747f2330
Author: junqing.cai <[email protected]>
AuthorDate: Tue Aug 9 14:20:14 2022 +0800
KYLIN-5217 Create a initial commit
---
.../apache/kylin/job/dao/ExecutableOutputPO.java | 149 ------------
.../org/apache/kylin/job/dao/ExecutablePO.java | 152 ------------
.../org/apache/kylin/job/dao/JobStatistics.java | 77 ------
.../apache/kylin/job/dao/JobStatisticsBasic.java | 53 -----
.../apache/kylin/job/dao/JobStatisticsManager.java | 263 ---------------------
.../org/apache/kylin/job/dao/NExecutableDao.java | 133 -----------
.../runners/AbstractDefaultSchedulerRunner.java | 93 --------
.../apache/kylin/job/runners/FetcherRunner.java | 241 -------------------
.../kylin/job/runners/ForkBasedJobRunner.java | 48 ----
.../kylin/job/runners/InMemoryJobRunner.java | 36 ---
.../apache/kylin/job/runners/JobCheckRunner.java | 90 -------
.../org/apache/kylin/job/runners/JobRunner.java | 76 ------
.../apache/kylin/job/runners/JobRunnerFactory.java | 185 ---------------
.../job/runners/LicenseCapacityCheckRunner.java | 63 -----
.../kylin/job/runners/QuotaStorageCheckRunner.java | 77 ------
.../cube/storage/GarbageStorageCollector.java | 87 -------
.../cube/storage/ProjectStorageInfoCollector.java | 79 -------
.../cube/storage/StorageInfoCollector.java | 29 ---
.../metadata/cube/storage/StorageInfoEnum.java | 23 --
.../cube/storage/StorageQuotaCollector.java | 35 ---
.../metadata/cube/storage/StorageVolumeInfo.java | 38 ---
.../cube/storage/TotalStorageCollector.java | 42 ----
22 files changed, 2069 deletions(-)
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index a3b69ab095..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- *
- */
-@Setter
-@Getter
-@SuppressWarnings("serial")
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE,
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility =
JsonAutoDetect.Visibility.NONE, setterVisibility =
JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO implements Serializable {
-
- @JsonIgnore
- private InputStream contentStream;
-
- @JsonProperty("content")
- private String content;
-
- @JsonProperty("log_path")
- private String logPath;
-
- @JsonProperty("status")
- private String status = "READY";
-
- @JsonProperty("info")
- private Map<String, String> info = Maps.newHashMap();
-
- @JsonProperty("last_modified")
- private long lastModified;
-
- @JsonProperty("createTime")
- private long createTime = System.currentTimeMillis();
-
- @JsonProperty("start_time")
- private long startTime;
-
- @JsonProperty("end_time")
- private long endTime;
-
- @JsonProperty("wait_time")
- private long waitTime;
-
- @JsonProperty("duration")
- private long duration;
-
- @JsonProperty("last_running_start_time")
- private long lastRunningStartTime;
-
- @JsonProperty("is_resumable")
- private boolean resumable = false;
-
- @JsonProperty("byte_size")
- private long byteSize;
-
- @JsonProperty("failed_msg")
- private String failedMsg;
-
- @JsonProperty("failed_step_id")
- private String failedStepId;
-
- @JsonProperty("failed_segment_id")
- private String failedSegmentId;
-
- @JsonProperty("failed_stack")
- private String failedStack;
-
- @JsonProperty("failed_reason")
- private String failedReason;
-
- public void addStartTime(long time) {
- //when ready -> suicidal/discarded, the start time is 0
- if (startTime == 0) {
- startTime = time;
- }
- endTime = 0;
- }
-
- public void addEndTime(long time) {
- Preconditions.checkArgument(startTime > 0L);
- endTime = time;
- }
-
- public void addDuration(long time) {
- if (time != 0 && time > lastRunningStartTime) {
- duration = duration + time - lastRunningStartTime;
- }
- }
-
- public void addLastRunningStartTime(long time) {
- lastRunningStartTime = time;
- }
-
- public void resetTime() {
- createTime = System.currentTimeMillis();
- startTime = 0L;
- endTime = 0L;
- waitTime = 0L;
- duration = 0L;
- lastRunningStartTime = 0L;
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index b396efd047..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import static
org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_IDS_DELIMITER;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- */
-@Setter
-@Getter
-@SuppressWarnings("serial")
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE,
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility =
JsonAutoDetect.Visibility.NONE, setterVisibility =
JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
- public static final int HIGHEST_PRIORITY = 0;
- public static final int DEFAULT_PRIORITY = 3;
- public static final int LOWEST_PRIORITY = 4;
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("tasks")
- private List<ExecutablePO> tasks;
-
- @JsonProperty("type")
- private String type;
-
- @JsonProperty("handler_type")
- private String handlerType;
-
- @JsonProperty("params")
- private Map<String, String> params = Maps.newHashMap();
-
- @JsonProperty("segments")
- private Set<NDataSegment> segments = Sets.newHashSet();
-
- @JsonProperty("job_type")
- private JobTypeEnum jobType;
-
- @JsonProperty("data_range_start")
- private long dataRangeStart;
-
- @JsonProperty("data_range_end")
- private long dataRangeEnd;
-
- @JsonProperty("target_model")
- private String targetModel;
-
- @JsonProperty("target_segments")
- private List<String> targetSegments;
-
- @JsonProperty("output")
- private ExecutableOutputPO output = new ExecutableOutputPO();
-
- private String project;
-
- @JsonProperty("target_partitions")
- private Set<Long> targetPartitions = Sets.newHashSet();
-
- @JsonProperty("priority")
- private int priority = DEFAULT_PRIORITY;
-
- @JsonProperty("tag")
- private Object tag;
-
- @JsonProperty("stages_map")
- private Map<String, List<ExecutablePO>> stagesMap;
-
- public void setPriority(int p) {
- priority = isPriorityValid(p) ? p : DEFAULT_PRIORITY;
- }
-
- @Override
- public String getResourcePath() {
- return concatResourcePath(getUuid(), project);
- }
-
- public static String concatResourcePath(String name, String project) {
- return new
StringBuilder().append("/").append(project).append(ResourceStore.EXECUTABLE_JOB).append("/")
- .append(name).toString();
- }
-
- public static boolean isPriorityValid(int priority) {
- return priority >= HIGHEST_PRIORITY && priority <= LOWEST_PRIORITY;
- }
-
- public static boolean isHigherPriority(int p1, int p2) {
- return p1 < p2;
- }
-
- public void addYarnApplicationJob(String appId) {
- String oldAppIds =
output.getInfo().getOrDefault(ExecutableConstants.YARN_APP_IDS, "");
- Set<String> appIds = new
HashSet<>(Arrays.asList(oldAppIds.split(YARN_APP_IDS_DELIMITER)));
- if (!appIds.contains(appId)) {
- String newAppIds = oldAppIds + (StringUtils.isEmpty(oldAppIds) ?
"" : YARN_APP_IDS_DELIMITER) + appId;
- output.getInfo().put(ExecutableConstants.YARN_APP_IDS, newAppIds);
- }
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
deleted file mode 100644
index 0b1928fbbf..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-@Getter
-@Setter
-@NoArgsConstructor
-public class JobStatistics extends JobStatisticsBasic {
-
- @JsonProperty("date")
- private long date;
- @JsonProperty("model_stats")
- private Map<String, JobStatisticsBasic> jobStatisticsByModels =
Maps.newHashMap();
-
- @Override
- public String resourceName() {
- return String.valueOf(date);
- }
-
- public JobStatistics(long date, String model, long duration, long
byteSize) {
- this.date = date;
- setCount(1);
- setTotalDuration(duration);
- setTotalByteSize(byteSize);
- jobStatisticsByModels.put(model, new JobStatisticsBasic(duration,
byteSize));
- }
-
- public JobStatistics(int count, long totalDuration, long totalByteSize) {
- setCount(count);
- setTotalDuration(totalDuration);
- setTotalByteSize(totalByteSize);
- }
-
- public JobStatistics(long date, long totalDuration, long totalByteSize) {
- this.date = date;
- setCount(1);
- setTotalDuration(totalDuration);
- setTotalByteSize(totalByteSize);
- }
-
- public void update(String model, long duration, long byteSize, int
deltaCount) {
- super.update(duration, byteSize, deltaCount);
- JobStatisticsBasic jobStatisticsByModel =
jobStatisticsByModels.get(model);
- if (jobStatisticsByModel == null) {
- jobStatisticsByModel = new JobStatisticsBasic(duration, byteSize);
- } else {
- jobStatisticsByModel.update(duration, byteSize, deltaCount);
- }
-
- jobStatisticsByModels.put(model, jobStatisticsByModel);
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
deleted file mode 100644
index c05743c3cf..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
-@Getter
-@Setter
-@NoArgsConstructor
-@AllArgsConstructor
-public class JobStatisticsBasic extends RootPersistentEntity {
- @JsonProperty("count")
- private int count;
- @JsonProperty("total_duration")
- private long totalDuration;
- @JsonProperty("total_byte_size")
- private long totalByteSize;
-
- public void update(long duration, long byteSize, int deltaCount) {
- this.count += deltaCount;
- this.totalDuration += duration;
- this.totalByteSize += byteSize;
- }
-
- public JobStatisticsBasic(long totalDuration, long totalByteSize) {
- this.count = 1;
- this.totalDuration = totalDuration;
- this.totalByteSize = totalByteSize;
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
deleted file mode 100644
index 2f0a65bfd3..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.time.DayOfWeek;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.temporal.TemporalAdjusters;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.stream.Collectors;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-
-import com.google.common.collect.Maps;
-
-public class JobStatisticsManager {
-
- public static JobStatisticsManager getInstance(KylinConfig config, String
project) {
- return config.getManager(project, JobStatisticsManager.class);
- }
-
- // called by reflection
- static JobStatisticsManager newInstance(KylinConfig conf, String project) {
- try {
- String cls = JobStatisticsManager.class.getName();
- Class<? extends JobStatisticsManager> clz = ClassUtil.forName(cls,
JobStatisticsManager.class);
- return clz.getConstructor(KylinConfig.class,
String.class).newInstance(conf, project);
- } catch (Exception e) {
- throw new RuntimeException("Failed to init DataModelManager from "
+ conf, e);
- }
- }
-
- //
============================================================================
-
- private KylinConfig config;
- private String project;
-
- private CachedCrudAssist<JobStatistics> crud;
-
- public JobStatisticsManager(KylinConfig config, String project) {
- init(config, project);
- }
-
- protected void init(KylinConfig cfg, final String project) {
- this.config = cfg;
- this.project = project;
- final ResourceStore store =
ResourceStore.getKylinMetaStore(this.config);
- String resourceRootPath = "/" + this.project +
ResourceStore.JOB_STATISTICS;
- this.crud = new CachedCrudAssist<JobStatistics>(store,
resourceRootPath, JobStatistics.class) {
- @Override
- protected JobStatistics initEntityAfterReload(JobStatistics
jobStatistics, String resourceName) {
- return jobStatistics;
- }
- };
- }
-
- public List<JobStatistics> getAll() {
- return crud.listAll();
- }
-
- public JobStatistics updateStatistics(long date, String model, long
duration, long byteSize, int deltaCount) {
- JobStatistics jobStatistics = crud.get(String.valueOf(date));
- JobStatistics jobStatisticsToUpdate;
- if (jobStatistics == null) {
- jobStatisticsToUpdate = new JobStatistics(date, model, duration,
byteSize);
- return crud.save(jobStatisticsToUpdate);
- }
-
- jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
- jobStatisticsToUpdate.update(model, duration, byteSize, deltaCount);
- return crud.save(jobStatisticsToUpdate);
- }
-
- public JobStatistics updateStatistics(long date, long duration, long
byteSize, int deltaCount) {
- JobStatistics jobStatistics = crud.get(String.valueOf(date));
- JobStatistics jobStatisticsToUpdate;
- if (jobStatistics == null) {
- jobStatisticsToUpdate = new JobStatistics(date, duration,
byteSize);
- return crud.save(jobStatisticsToUpdate);
- }
-
- jobStatisticsToUpdate = crud.copyForWrite(jobStatistics);
- jobStatisticsToUpdate.update(duration, byteSize, deltaCount);
- return crud.save(jobStatisticsToUpdate);
- }
-
- public Pair<Integer, JobStatistics> getOverallJobStats(final long
startTime, final long endTime) {
- // filter
- List<JobStatistics> filteredJobStats =
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
- // aggregate all job stats
- JobStatistics aggregatedStats = aggregateJobStats(filteredJobStats);
-
- return new Pair<>(aggregatedStats.getCount(), aggregatedStats);
- }
-
- public Map<String, Integer> getJobCountByTime(final long startTime, final
long endTime,
- final String timeDimension) {
- Map<String, Integer> result = Maps.newHashMap();
- aggregateJobStatsByTime(startTime, endTime,
timeDimension).forEach((key, value) -> {
- result.put(key, value.getCount());
- });
- return result;
- }
-
- public Map<String, Integer> getJobCountByModel(long startTime, long
endTime) {
- Map<String, Integer> result = Maps.newHashMap();
-
- aggregateStatsByModel(startTime, endTime).forEach((modelName, value)
-> {
- String modelAlias = getModelAlias(modelName);
- if (modelAlias == null)
- return;
- result.put(modelAlias, value.getCount());
- });
-
- return result;
- }
-
- public Map<String, Double> getDurationPerByteByTime(final long startTime,
final long endTime,
- final String timeDimension) {
- Map<String, JobStatisticsBasic> aggregateResult =
aggregateJobStatsByTime(startTime, endTime, timeDimension);
- return calculateDurationPerByte(aggregateResult);
- }
-
- public Map<String, Double> getDurationPerByteByModel(long startTime, long
endTime) {
- Map<String, JobStatisticsBasic> transformedResult = Maps.newHashMap();
-
- aggregateStatsByModel(startTime, endTime).forEach((modelName, value)
-> {
- String modelAlias = getModelAlias(modelName);
- if (modelAlias == null)
- return;
- transformedResult.put(modelAlias,
- new JobStatisticsBasic(value.getTotalDuration(),
value.getTotalByteSize()));
- });
-
- return calculateDurationPerByte(transformedResult);
- }
-
- private String getModelAlias(String modelId) {
- NDataModelManager dataModelManager =
NDataModelManager.getInstance(config, project);
- NDataModel model = dataModelManager.getDataModelDesc(modelId);
- if (model == null)
- return null;
-
- return model.getAlias();
- }
-
- private JobStatistics aggregateJobStats(List<JobStatistics>
jobStatisticsToAggregate) {
- return jobStatisticsToAggregate.stream()
- .reduce((x, y) -> new JobStatistics(x.getCount() +
y.getCount(),
- x.getTotalDuration() + y.getTotalDuration(),
x.getTotalByteSize() + y.getTotalByteSize()))
- .orElse(new JobStatistics());
- }
-
- // key is the date, value is the aggregated job stats
- private Map<String, JobStatisticsBasic> aggregateJobStatsByTime(final long
startTime, final long endTime,
- final String timeDimension) {
- Map<String, JobStatisticsBasic> result = Maps.newHashMap();
-
- List<JobStatistics> qulifiedJobStats =
getFilteredJobStatsByTime(crud.listAll(), startTime, endTime);
-
- long startDate = startTime;
- while (startDate <= endTime) {
- long nextDate = nextDate(startDate, timeDimension);
- List<JobStatistics> list =
getFilteredJobStatsByTime(qulifiedJobStats, startDate, nextDate);
- result.put(formatDateTime(startDate), aggregateJobStats(list));
- startDate = nextDate;
- }
-
- return result;
- }
-
- // format epoch time to date string
- private String formatDateTime(long time) {
- ZoneId zoneId = TimeZone.getDefault().toZoneId();
- LocalDateTime localDateTime =
Instant.ofEpochMilli(time).atZone(zoneId).toLocalDateTime();
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd",
- Locale.getDefault(Locale.Category.FORMAT));
- return localDateTime.format(formatter);
- }
-
- private long nextDate(final long date, final String timeDimension) {
- ZoneId zoneId = TimeZone.getTimeZone(config.getTimeZone()).toZoneId();
- LocalDate localDate =
Instant.ofEpochMilli(date).atZone(zoneId).toLocalDate();
- switch (timeDimension) {
- case "day":
- localDate = localDate.plusDays(1);
- break;
- case "week":
- localDate =
localDate.with(TemporalAdjusters.next(DayOfWeek.MONDAY));
- break;
- case "month":
- localDate =
localDate.with(TemporalAdjusters.firstDayOfNextMonth());
- break;
- default:
- localDate = localDate.plusDays(1);
- break;
- }
-
- return localDate.atStartOfDay(zoneId).toInstant().toEpochMilli();
- }
-
- private Map<String, JobStatisticsBasic> aggregateStatsByModel(long
startTime, long endTime) {
- return getFilteredJobStatsByTime(crud.listAll(), startTime,
endTime).stream()
- .map(JobStatistics::getJobStatisticsByModels).reduce((x, y) ->
{
- // merge two maps
- Map<String, JobStatisticsBasic> mergedMap =
Maps.newHashMap(x);
- y.forEach((k, v) -> mergedMap.merge(k, v,
- (value1, value2) -> new
JobStatisticsBasic(value1.getCount() + value2.getCount(),
- value1.getTotalDuration() +
value2.getTotalDuration(),
- value1.getTotalByteSize() +
value2.getTotalByteSize())));
- return mergedMap;
- }).orElse(Maps.newHashMap());
- }
-
- private List<JobStatistics> getFilteredJobStatsByTime(final
List<JobStatistics> list, final long startTime,
- final long endTime) {
- return list.stream()
- .filter(singleStats -> singleStats.getDate() >= startTime &&
singleStats.getDate() < endTime)
- .collect(Collectors.toList());
- }
-
- private Map<String, Double> calculateDurationPerByte(Map<String,
JobStatisticsBasic> totalMetricMap) {
- Map<String, Double> result = Maps.newHashMap();
- for (Map.Entry<String, JobStatisticsBasic> entry :
totalMetricMap.entrySet()) {
- double totalDuration = entry.getValue().getTotalDuration();
- double totalByteSize = entry.getValue().getTotalByteSize();
- if (totalByteSize == 0)
- result.put(entry.getKey(), .0);
- else {
- result.put(entry.getKey(), totalDuration / totalByteSize);
- }
- }
- return result;
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
deleted file mode 100644
index 5c353d989c..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.dao;
-
-import java.util.Comparator;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-import lombok.val;
-
-/**
- */
-public class NExecutableDao {
-
- private static final Serializer<ExecutablePO> JOB_SERIALIZER = new
JsonSerializer<>(ExecutablePO.class);
- private static final Logger logger =
LoggerFactory.getLogger(NExecutableDao.class);
-
- public static NExecutableDao getInstance(KylinConfig config, String
project) {
- return config.getManager(project, NExecutableDao.class);
- }
-
- // called by reflection
- static NExecutableDao newInstance(KylinConfig config, String project) {
- return new NExecutableDao(config, project);
- }
- //
============================================================================
-
- private String project;
-
- private KylinConfig config;
-
- private CachedCrudAssist<ExecutablePO> crud;
-
- private NExecutableDao(KylinConfig config, String project) {
- logger.trace("Using metadata url: {}", config);
- this.project = project;
- this.config = config;
- String resourceRootPath = "/" + project +
ResourceStore.EXECUTE_RESOURCE_ROOT;
- this.crud = new CachedCrudAssist<ExecutablePO>(getStore(),
resourceRootPath, "", ExecutablePO.class) {
- @Override
- protected ExecutablePO initEntityAfterReload(ExecutablePO entity,
String resourceName) {
- entity.setProject(project);
- return entity;
- }
- };
- }
-
- public List<ExecutablePO> getJobs() {
- return crud.listAll();
- }
-
- public List<ExecutablePO> getPartialJobs(Predicate<String> predicate) {
- return crud.listPartial(predicate);
- }
-
- public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) {
- return crud.listAll().stream()
- .filter(x -> x.getLastModified() >= timeStart &&
x.getLastModified() < timeEndExclusive)
- .collect(Collectors.toList());
- }
-
- public List<String> getJobIds() {
- return
crud.listAll().stream().sorted(Comparator.comparing(ExecutablePO::getCreateTime))
-
.sorted(Comparator.comparing(ExecutablePO::getPriority)).map(RootPersistentEntity::resourceName)
- .collect(Collectors.toList());
- }
-
- public ExecutablePO getJobByUuid(String uuid) {
- return crud.get(uuid);
- }
-
- public ExecutablePO addJob(ExecutablePO job) {
- if (getJobByUuid(job.getUuid()) != null) {
- throw new IllegalArgumentException("job id:" + job.getUuid() + "
already exists");
- }
- crud.save(job);
- return job;
- }
-
- // for ut
- @VisibleForTesting
- public void deleteAllJob() {
- getJobs().forEach(job -> deleteJob(job.getId()));
- }
-
- public void deleteJob(String uuid) {
- crud.delete(uuid);
- }
-
- public void updateJob(String uuid, Predicate<ExecutablePO> updater) {
- val job = getJobByUuid(uuid);
- Preconditions.checkNotNull(job);
- val copyForWrite = JsonUtil.copyBySerialization(job, JOB_SERIALIZER,
null);
- if (updater.test(copyForWrite)) {
- crud.save(copyForWrite);
- }
- }
-
- private ResourceStore getStore() {
- return ResourceStore.getKylinMetaStore(config);
- }
-
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
deleted file mode 100644
index a8c6f3c973..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.logging.SetLogCategory;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.epoch.EpochManager;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.SneakyThrows;
-
-public abstract class AbstractDefaultSchedulerRunner implements Runnable {
- private static final Logger logger =
LoggerFactory.getLogger(AbstractDefaultSchedulerRunner.class);
- protected final NDefaultScheduler nDefaultScheduler;
-
- protected final ExecutableContext context;
-
- protected final String project;
-
- public AbstractDefaultSchedulerRunner(final NDefaultScheduler
nDefaultScheduler) {
- this.nDefaultScheduler = nDefaultScheduler;
- this.context = nDefaultScheduler.getContext();
- this.project = nDefaultScheduler.getProject();
- }
-
- @SneakyThrows
- private boolean checkEpochIdFailed() {
- //check failed if isInterrupted
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
-
- if (!KylinConfig.getInstanceFromEnv().isUTEnv()
- &&
!EpochManager.getInstance().checkEpochId(context.getEpochId(), project)) {
- nDefaultScheduler.forceShutdown();
- return true;
- }
- return false;
- }
-
- // stop job if Storage Quota Limit reached
- protected void stopJobIfSQLReached(String jobId) {
- if (context.isReachQuotaLimit()) {
- try {
- EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- if (context.isReachQuotaLimit()) {
-
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).pauseJob(jobId);
- logger.info("Job {} paused due to no available storage
quota.", jobId);
- logger.info("Please clean up low-efficient storage in
time, "
- + "increase the low-efficient storage
threshold, "
- + "or notify the administrator to increase the
storage quota for this project.");
- }
- return null;
- }, project, UnitOfWork.DEFAULT_MAX_RETRY,
context.getEpochId(), jobId);
- } catch (Exception e) {
- logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {}
failed to pause", project, jobId, e);
- }
- }
- }
-
- @Override
- public void run() {
- try (SetLogCategory ignored = new SetLogCategory("schedule")) {
- if (checkEpochIdFailed()) {
- return;
- }
- doRun();
- }
- }
-
- protected abstract void doRun();
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
deleted file mode 100644
index c9bb123a45..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-
-public class FetcherRunner extends AbstractDefaultSchedulerRunner {
-
- private static final Logger logger =
LoggerFactory.getLogger(FetcherRunner.class);
-
- private final ExecutorService jobPool;
-
- private final ScheduledExecutorService fetcherPool;
-
- public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService
jobPool,
- ScheduledExecutorService fetcherPool) {
- super(nDefaultScheduler);
- this.jobPool = jobPool;
- this.fetcherPool = fetcherPool;
- }
-
- private boolean checkSuicide(String jobId) {
- val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- if (executableManager.getJob(jobId).getStatus().isFinalState()) {
- return false;
- }
- return executableManager.getJob(jobId).checkSuicide();
- }
-
- private boolean markSuicideJob(String jobId) {
- try {
- if (checkSuicide(jobId)) {
- return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(()
-> {
- if (checkSuicide(jobId)) {
-
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).suicideJob(jobId);
- return true;
- }
- return false;
- }, project, UnitOfWork.DEFAULT_MAX_RETRY,
context.getEpochId(), jobId);
- }
- } catch (Exception e) {
- logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should
be suicidal but discard failed", project,
- jobId, e);
- }
- return false;
- }
-
- private boolean markErrorJob(String jobId) {
- try {
- return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
- val manager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- manager.errorJob(jobId);
- return true;
- }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(),
jobId);
- } catch (Exception e) {
- logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should
be error but mark failed", project,
- jobId, e);
- }
- return false;
- }
-
- @Override
- public void doRun() {
- try {
- val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- Map<String, Executable> runningJobs = context.getRunningJobs();
-
- int nRunning = 0;
- int nReady = 0;
- int nStopped = 0;
- int nOthers = 0;
- int nError = 0;
- int nDiscarded = 0;
- int nSucceed = 0;
- int nSuicidal = 0;
- for (final String id : executableManager.getJobs()) {
- if (markSuicideJob(id)) {
- nSuicidal++;
- continue;
- }
-
- if (runningJobs.containsKey(id)) {
-
- // this is very important to prevent from same job being
scheduled at same time.
- // e.g. when a job is restarted, the old job may still be
running (even if we tried to interrupt it)
- // until the old job is finished, the new job should not
start
- nRunning++;
- continue;
- }
-
- final Output output = executableManager.getOutput(id);
-
- switch (output.getState()) {
- case READY:
- nReady++;
- if (isJobPoolFull()) {
- break;
- }
-
- if (context.isReachQuotaLimit()) {
- stopJobIfSQLReached(id);
- break;
- }
-
- logger.info("fetcher schedule {} ", id);
- scheduleJob(id);
- break;
- case DISCARDED:
- nDiscarded++;
- break;
- case ERROR:
- nError++;
- break;
- case SUCCEED:
- nSucceed++;
- break;
- case PAUSED:
- nStopped++;
- break;
- case SUICIDAL:
- nSuicidal++;
- break;
- default:
- if (allSubTasksSuccess(id)) {
- logger.info("All sub tasks are successful, reschedule
job {}", id);
- scheduleJob(id);
- break;
- }
- logger.warn("Unexpected status for {} <{}>", id,
output.getState());
- if (markErrorJob(id)) {
- nError++;
- } else {
- nOthers++;
- }
- break;
- }
- }
-
- logger.info(
- "Job Status in project {}: {} should running, {} actual
running, {} stopped, {} ready, {} already succeed, {} error, {} discarded, {}
suicidal, {} others",
- project, nRunning, runningJobs.size(), nStopped, nReady,
nSucceed, nError, nDiscarded, nSuicidal,
- nOthers);
- } catch (Exception e) {
- logger.warn("Job Fetcher caught a exception ", e);
- }
- }
-
- private boolean allSubTasksSuccess(String id) {
- val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-
- // check special case, all sub task success, show make current job to
success
- AbstractExecutable job = executableManager.getJob(id);
- if (job instanceof DefaultChainedExecutable) {
- return ((DefaultChainedExecutable) job).getTasks().stream()
- .allMatch(abstractExecutable ->
abstractExecutable.getStatus() == ExecutableState.SUCCEED);
- }
-
- return false;
- }
-
- private void scheduleJob(String id) {
- AbstractExecutable executable = null;
- String jobDesc = null;
-
- boolean memoryLock = false;
- int useMemoryCapacity = 0;
- try {
- val config = KylinConfig.getInstanceFromEnv();
- val executableManager = NExecutableManager.getInstance(config,
project);
- executable = executableManager.getJob(id);
- if (!config.getDeployMode().equals("cluster")) {
- useMemoryCapacity = executable.computeStepDriverMemory();
- }
- memoryLock =
NDefaultScheduler.getMemoryRemaining().tryAcquire(useMemoryCapacity);
- if (memoryLock) {
- jobDesc = executable.toString();
- logger.info("{} prepare to schedule", jobDesc);
- context.addRunningJob(executable);
- jobPool.execute(new JobRunner(nDefaultScheduler, executable,
this));
- logger.info("{} scheduled", jobDesc);
- } else {
- logger.info("memory is not enough, remaining: {} MB",
-
NDefaultScheduler.getMemoryRemaining().availablePermits());
- }
- } catch (Exception ex) {
- if (executable != null) {
- context.removeRunningJob(executable);
- if (memoryLock) {
- // may release twice when exception raise after jobPool
execute executable
-
NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity);
- }
- }
- logger.warn("{} fail to schedule", jobDesc, ex);
- }
- }
-
- private boolean isJobPoolFull() {
- if (context.getRunningJobs().size() >=
nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
- logger.warn("There are too many jobs running, Job Fetch will wait
until next schedule time.");
- return true;
- }
- return false;
- }
-
- void scheduleNext() {
- fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
deleted file mode 100644
index fc29a5ebb4..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BufferedLogger;
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.common.util.ExecutableApplication;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ForkBasedJobRunner extends JobRunnerFactory.AbstractJobRunner {
-
- private final CliCommandExecutor cliExecutor = new CliCommandExecutor();
-
- public ForkBasedJobRunner(KylinConfig kylinConfig, String project,
List<String> originResources) {
- super(kylinConfig, project, originResources);
- }
-
- @Override
- protected void doExecute(ExecutableApplication app, Map<String, String>
args) throws Exception {
- String finalCommand = String.format(Locale.ROOT, "bash -x
%s/sbin/bootstrap.sh %s %s 2>>%s",
- KylinConfig.getKylinHome(), app.getClass().getName(),
formatArgs(args), getJobTmpDir() + "/job.log");
- log.info("Try to execute {}", finalCommand);
- cliExecutor.execute(finalCommand, new BufferedLogger(log), jobId);
- }
-
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
deleted file mode 100644
index 119c3313e3..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ExecutableApplication;
-
-public class InMemoryJobRunner extends JobRunnerFactory.AbstractJobRunner {
-
- public InMemoryJobRunner(KylinConfig config, String project, List<String>
resources) {
- super(config, project, resources);
- }
-
- @Override
- public void doExecute(ExecutableApplication app, Map<String, String> args)
throws Exception {
- app.execute(formatArgs(args).split(" "));
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
deleted file mode 100644
index 195a244900..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-
-public class JobCheckRunner extends AbstractDefaultSchedulerRunner {
-
- private static final Logger logger =
LoggerFactory.getLogger(JobCheckRunner.class);
-
- public JobCheckRunner(NDefaultScheduler nDefaultScheduler) {
- super(nDefaultScheduler);
- }
-
- private boolean checkTimeoutIfNeeded(String jobId, Long startTime) {
- Integer timeOutMinute =
KylinConfig.getInstanceFromEnv().getSchedulerJobTimeOutMinute();
- if (timeOutMinute == 0) {
- return false;
- }
- val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- if (executableManager.getJob(jobId).getStatus().isFinalState()) {
- return false;
- }
- long duration = System.currentTimeMillis() - startTime;
- long durationMins = Math.toIntExact(duration / (60 * 1000));
- return durationMins >= timeOutMinute;
- }
-
- private boolean discardTimeoutJob(String jobId, Long startTime) {
- try {
- if (checkTimeoutIfNeeded(jobId, startTime)) {
- return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(()
-> {
- if (checkTimeoutIfNeeded(jobId, startTime)) {
- logger.error("project {} job {} running timeout.",
project, jobId);
-
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(),
project).errorJob(jobId);
- return true;
- }
- return false;
- }, project, UnitOfWork.DEFAULT_MAX_RETRY,
context.getEpochId(), jobId);
- }
- } catch (Exception e) {
- logger.warn("[UNEXPECTED_THINGS_HAPPENED] project " + project + "
job " + jobId
- + " should be timeout but discard failed", e);
- }
- return false;
- }
-
- @Override
- protected void doRun() {
-
- logger.info("start check project {} job pool.", project);
-
- val executableManager =
NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- Map<String, Executable> runningJobs = context.getRunningJobs();
- Map<String, Long> runningJobInfos = context.getRunningJobInfos();
- for (final String id : executableManager.getJobs()) {
- if (runningJobs.containsKey(id)) {
- discardTimeoutJob(id, runningJobInfos.get(id));
- stopJobIfSQLReached(id);
- }
- }
-
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
deleted file mode 100644
index 3e839f57f4..0000000000
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.SetThreadName;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
-import org.apache.kylin.job.exception.JobStoppedVoluntarilyException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.common.logging.SetLogCategory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-import lombok.var;
-
-public class JobRunner extends AbstractDefaultSchedulerRunner {
- private static final Logger logger =
LoggerFactory.getLogger(JobRunner.class);
-
- private final AbstractExecutable executable;
-
- private final FetcherRunner fetcherRunner;
-
- public JobRunner(NDefaultScheduler nDefaultScheduler, AbstractExecutable
executable, FetcherRunner fetcherRunner) {
- super(nDefaultScheduler);
- this.fetcherRunner = fetcherRunner;
- this.executable = executable;
- }
-
- @Override
- protected void doRun() {
- //only the first 8 chars of the job uuid
- val jobIdSimple = executable.getId().substring(0, 8);
- try (SetThreadName ignored = new
SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple);
- SetLogCategory logCategory = new SetLogCategory("schedule")) {
- executable.execute(context);
- // trigger the next step asap
- fetcherRunner.scheduleNext();
- } catch (JobStoppedVoluntarilyException |
JobStoppedNonVoluntarilyException e) {
- logger.info("Job quits either voluntarily or non-voluntarily,job:
{}", jobIdSimple, e);
- } catch (ExecuteException e) {
- logger.error("ExecuteException occurred while job: " +
executable.getId(), e);
- } catch (Exception e) {
- logger.error("unknown error execute job: " + executable.getId(),
e);
- } finally {
- context.removeRunningJob(executable);
- val config = KylinConfig.getInstanceFromEnv();
- var usingMemory = 0;
- if (!config.getDeployMode().equals("cluster")) {
- usingMemory = executable.computeStepDriverMemory();
- }
- logger.info("Before job:{} global memory release {}", jobIdSimple,
- NDefaultScheduler.getMemoryRemaining().availablePermits());
- NDefaultScheduler.getMemoryRemaining().release(usingMemory);
- logger.info("After job:{} global memory release {}", jobIdSimple,
- NDefaultScheduler.getMemoryRemaining().availablePermits());
- }
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
deleted file mode 100644
index 0713558d35..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigBase;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ExecutableApplication;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.ZipFileUtils;
-import org.apache.kylin.common.persistence.metadata.MetadataStore;
-import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
-import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
-
-import com.google.common.collect.Maps;
-
-import lombok.RequiredArgsConstructor;
-import lombok.val;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class JobRunnerFactory {
-
- private JobRunnerFactory() {
- // Just implement it
- }
-
- public static AbstractJobRunner createRunner(KylinConfig config, String
type, String project,
- List<String> resources) {
- switch (type) {
- case "fork":
- return new ForkBasedJobRunner(config, project, resources);
- case "in-memory":
- return new InMemoryJobRunner(config, project, resources);
- default:
- throw new NotImplementedException("Runner type " + type + " not
implement");
- }
- }
-
- @RequiredArgsConstructor
- public abstract static class AbstractJobRunner {
-
- protected final KylinConfig kylinConfig;
- protected final String project;
- protected final List<String> originResources;
- private Consumer<Properties> configUpdater;
-
- protected String metaDumpUrl;
- protected String jobId;
-
- public String prepareEnv(String jobId) throws IOException {
- this.jobId = jobId;
- val jobTmpDir = getJobTmpDir();
- FileUtils.forceMkdir(new File(jobTmpDir));
-
- metaDumpUrl = kylinConfig.getMetadataUrlPrefix() +
"@hdfs,path=file://" + jobTmpDir + "/meta";
- return jobTmpDir;
- }
-
- public void start(ExecutableApplication app, Map<String, String> args)
throws Exception {
- attachMetadataAndKylinProps(false);
- args.put("meta", metaDumpUrl);
- args.put("metaOutput", metaDumpUrl + "_output");
- doExecute(app, args);
- uploadJobLog();
- }
-
- public void cleanupEnv() {
- FileUtils.deleteQuietly(new File(getJobTmpDir()));
- }
-
- protected abstract void doExecute(ExecutableApplication app,
Map<String, String> args) throws Exception;
-
- protected String formatArgs(Map<String, String> args) {
- return args.entrySet().stream().map(entry -> "--" + entry.getKey()
+ "=" + entry.getValue())
- .collect(Collectors.joining(" "));
- }
-
- public String getJobTmpDir() {
- return KylinConfigBase.getKylinHome() + "/tmp/" + jobId;
- }
-
- public void setConfigUpdater(Consumer<Properties> consumer) {
- this.configUpdater = consumer;
- }
-
- protected boolean uploadJobLog() {
- val jobContentZip = getJobTmpDir() + ".zip";
- try {
- ZipFileUtils.compressZipFile(getJobTmpDir(), jobContentZip);
- val jobDir = kylinConfig.getJobTmpDir(project, true);
- val fs = HadoopUtil.getFileSystem(jobDir);
-
- try (val in = new FileInputStream(new File(jobContentZip));
- val out = fs.create(new Path(jobDir + jobId + ".zip"),
true)) {
- IOUtils.copy(in, out);
- }
- return true;
- } catch (IOException e) {
- log.warn("Upload Job Evidence failed {}", jobId, e);
- } finally {
- FileUtils.deleteQuietly(new File(jobContentZip));
- }
- return false;
- }
-
- protected void attachMetadataAndKylinProps(boolean kylinPropsOnly)
throws IOException {
- if (StringUtils.isEmpty(metaDumpUrl)) {
- throw new RuntimeException("Missing metaUrl");
- }
-
- File tmpDir = File.createTempFile("kylin_job_meta", "");
- try {
- org.apache.commons.io.FileUtils.forceDelete(tmpDir); // we
need a directory, so delete the file first
-
- Properties props = kylinConfig.exportToProperties();
- props.setProperty("kylin.metadata.url", metaDumpUrl);
- if (configUpdater != null) {
- configUpdater.accept(props);
- }
-
- if (kylinPropsOnly) {
- ResourceStore.dumpKylinProps(tmpDir, props);
- } else {
- // The way of Updating metadata is CopyOnWrite. So it is
safe to use Reference in the value.
- Map<String, RawResource> dumpMap =
EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(
- UnitOfWorkParams.<Map>
builder().readonly(true).unitName(project).processor(() -> {
- Map<String, RawResource> retMap =
Maps.newHashMap();
- for (String resPath : originResources) {
- ResourceStore resourceStore =
ResourceStore.getKylinMetaStore(kylinConfig);
- RawResource rawResource =
resourceStore.getResource(resPath);
- retMap.put(resPath, rawResource);
- }
- return retMap;
- }).build());
-
- if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) {
- return;
- }
- // dump metadata
- ResourceStore.dumpResourceMaps(kylinConfig, tmpDir,
dumpMap, props);
- }
-
- // copy metadata to target metaUrl
- KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
-
MetadataStore.createMetadataStore(dstConfig).uploadFromFile(tmpDir);
- // clean up
- log.debug("Copied metadata to the target metaUrl, delete the
temp dir: {}", tmpDir);
- } finally {
- FileUtils.forceDelete(tmpDir);
- }
- }
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
deleted file mode 100644
index 1b51dd2c3b..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import static
org.apache.kylin.common.exception.CommonErrorCode.LICENSE_OVER_CAPACITY;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import lombok.val;
-
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class LicenseCapacityCheckRunner extends AbstractDefaultSchedulerRunner
{
- private static final Logger logger =
LoggerFactory.getLogger(LicenseCapacityCheckRunner.class);
-
- public LicenseCapacityCheckRunner(NDefaultScheduler nDefaultScheduler) {
- super(nDefaultScheduler);
- }
-
- @Override
- protected void doRun() {
- logger.info("start check license capacity for project {}", project);
- context.setLicenseOverCapacity(isLicenseOverCapacity());
- }
-
- private boolean isLicenseOverCapacity() {
- val sourceUsageManager =
SourceUsageManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- try {
- sourceUsageManager.checkIsOverCapacity(project);
- } catch (KylinException e) {
- if (LICENSE_OVER_CAPACITY.toErrorCode() == e.getErrorCode()) {
- logger.warn("Source usage over capacity, no job will be
scheduled.", e);
- return true;
- }
- } catch (Throwable e) {
- logger.warn("Check source usage over capacity failed.", e);
- }
-
- // not over capacity
- return false;
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
deleted file mode 100644
index c7f3140671..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.job.runners;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector;
-import org.apache.kylin.metadata.cube.storage.StorageInfoEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-import lombok.val;
-import lombok.var;
-
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class QuotaStorageCheckRunner extends AbstractDefaultSchedulerRunner {
- private static final Logger logger =
LoggerFactory.getLogger(QuotaStorageCheckRunner.class);
-
- private final ProjectStorageInfoCollector collector;
-
- public QuotaStorageCheckRunner(NDefaultScheduler nDefaultScheduler) {
- super(nDefaultScheduler);
-
- val storageInfoEnumList =
Lists.newArrayList(StorageInfoEnum.STORAGE_QUOTA,
StorageInfoEnum.TOTAL_STORAGE);
- collector = new ProjectStorageInfoCollector(storageInfoEnumList);
- }
-
- @Override
- protected void doRun() {
- logger.info("start check project {} storage quota.",
nDefaultScheduler.getProject());
- context.setReachQuotaLimit(reachStorageQuota());
- }
-
- private boolean reachStorageQuota() {
- var storageVolumeInfo =
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
- nDefaultScheduler.getProject());
- var totalSize = storageVolumeInfo.getTotalStorageSize();
- int retryCount = 3;
- while (retryCount-- > 0 && totalSize < 0) {
- storageVolumeInfo =
collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(),
- nDefaultScheduler.getProject());
- totalSize = storageVolumeInfo.getTotalStorageSize();
- }
- val storageQuotaSize = storageVolumeInfo.getStorageQuotaSize();
- if (totalSize < 0) {
- logger.error(
- "Project '{}' : an exception occurs when getting storage
volume info, no job will be scheduled!!! The error info : {}",
- nDefaultScheduler.getProject(),
-
storageVolumeInfo.getThrowableMap().get(StorageInfoEnum.TOTAL_STORAGE));
- return true;
- }
- if (totalSize >= storageQuotaSize) {
- logger.info("Project '{}' reach storage quota, no job will be
scheduled!!!",
- nDefaultScheduler.getProject());
- return true;
- }
- return false;
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
deleted file mode 100644
index 5fb6ecc8ff..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.cube.model.NDataLayout;
-import org.apache.kylin.metadata.cube.model.NDataSegment;
-import org.apache.kylin.metadata.cube.model.NDataflow;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.optimization.IndexOptimizer;
-import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory;
-import org.apache.kylin.metadata.model.NDataModel;
-
-import com.google.common.collect.Maps;
-
-import lombok.val;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class GarbageStorageCollector implements StorageInfoCollector {
-
- @Override
- public void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) {
- Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap();
- long storageSize = 0L;
-
- for (val model : getModels(project)) {
- val dataflow = getDataflow(model).copy();
-
- final IndexOptimizer indexOptimizer =
IndexOptimizerFactory.getOptimizer(dataflow, false);
- val garbageLayouts =
indexOptimizer.getGarbageLayoutMap(dataflow).keySet();
- if (CollectionUtils.isNotEmpty(garbageLayouts)) {
- storageSize += calculateLayoutSize(garbageLayouts, dataflow);
- garbageIndexMap.put(model.getId(), garbageLayouts);
- }
- }
-
- storageVolumeInfo.setGarbageModelIndexMap(garbageIndexMap);
- storageVolumeInfo.setGarbageStorageSize(storageSize);
- }
-
- private List<NDataModel> getModels(String project) {
- val dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- return dataflowManager.listUnderliningDataModels();
- }
-
- private NDataflow getDataflow(NDataModel model) {
- val dataflowManager =
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(),
model.getProject());
- return dataflowManager.getDataflow(model.getUuid());
- }
-
- private long calculateLayoutSize(Set<Long> cuboidLayoutIdSet, NDataflow
dataflow) {
- long cuboidLayoutSize = 0L;
- for (NDataSegment segment :
dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING)) {
- for (Long cuboidLayoutId : cuboidLayoutIdSet) {
- NDataLayout dataCuboid =
segment.getSegDetails().getLayoutById(cuboidLayoutId);
- if (dataCuboid != null) {
- cuboidLayoutSize += dataCuboid.getByteSize();
- }
- }
- }
- return cuboidLayoutSize;
- }
-
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
deleted file mode 100644
index 6a5f4d7c56..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-public class ProjectStorageInfoCollector {
-
- private List<StorageInfoCollector> collectors = Lists.newArrayList();
-
- private static final ImmutableMap<Class, StorageInfoEnum> collectorType =
ImmutableMap
- .<Class, StorageInfoEnum>
builder().put(GarbageStorageCollector.class, StorageInfoEnum.GARBAGE_STORAGE)
- .put(TotalStorageCollector.class, StorageInfoEnum.TOTAL_STORAGE)
- .put(StorageQuotaCollector.class,
StorageInfoEnum.STORAGE_QUOTA).build();
-
- public ProjectStorageInfoCollector(List<StorageInfoEnum> storageInfoList) {
- if (CollectionUtils.isNotEmpty(storageInfoList)) {
- storageInfoList.forEach(si -> addCollectors(si));
- }
- }
-
- private void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) {
- for (StorageInfoCollector collector : collectors) {
- try {
- collector.collect(config, project, storageVolumeInfo);
- } catch (Exception e) {
-
storageVolumeInfo.getThrowableMap().put(collectorType.get(collector.getClass()),
e);
- }
- }
- }
-
- private void addCollectors(StorageInfoEnum storageInfoEnum) {
- switch (storageInfoEnum) {
- case GARBAGE_STORAGE:
- collectors.add(new GarbageStorageCollector());
- break;
- case TOTAL_STORAGE:
- collectors.add(new TotalStorageCollector());
- break;
- case STORAGE_QUOTA:
- collectors.add(new StorageQuotaCollector());
- break;
- default:
- break;
- }
- }
-
- public StorageVolumeInfo getStorageVolumeInfo(KylinConfig config, String
project) {
- StorageVolumeInfo storageVolumeInfo = new StorageVolumeInfo();
- if (StringUtils.isBlank(project) ||
CollectionUtils.isEmpty(collectors)) {
- return storageVolumeInfo;
- }
- collect(config, project, storageVolumeInfo);
- return storageVolumeInfo;
- }
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
deleted file mode 100644
index 4218244b06..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.io.IOException;
-
-import org.apache.kylin.common.KylinConfig;
-
-public interface StorageInfoCollector {
-
- void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) throws IOException;
-
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
deleted file mode 100644
index ac29378738..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-public enum StorageInfoEnum {
- GARBAGE_STORAGE, TOTAL_STORAGE, STORAGE_QUOTA
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
deleted file mode 100644
index 951f896ef6..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.metadata.project.NProjectManager;
-
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class StorageQuotaCollector implements StorageInfoCollector {
-
- @Override
- public void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) {
- config =
NProjectManager.getInstance(config).getProject(project).getConfig();
- long quotaSize = config.getStorageQuotaSize();
- storageVolumeInfo.setStorageQuotaSize(quotaSize);
- }
-
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
deleted file mode 100644
index d77b18f18f..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.metadata.cube.storage;
-
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Maps;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.apache.kylin.common.annotation.Clarification;
-
-@Getter
-@Setter
-@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
-public class StorageVolumeInfo {
- private long storageQuotaSize = -1L;
- private long totalStorageSize = -1L;
- private long garbageStorageSize = -1L;
- private Map<String, Set<Long>> garbageModelIndexMap = Maps.newHashMap();
- private Map<StorageInfoEnum, Throwable> throwableMap = Maps.newHashMap();
-}
diff --git
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
deleted file mode 100644
index e5101a9f1a..0000000000
---
a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.metadata.cube.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-
-public class TotalStorageCollector implements StorageInfoCollector {
-
- @Override
- public void collect(KylinConfig config, String project, StorageVolumeInfo
storageVolumeInfo) throws IOException {
- String strPath = config.getWorkingDirectoryWithConfiguredFs(project);
- Path path = new Path(strPath);
- FileSystem fs =
path.getFileSystem(HadoopUtil.getCurrentConfiguration());
- long totalStorageSize = 0L;
- if (fs.exists(path)) {
- totalStorageSize = HadoopUtil.getContentSummary(fs,
path).getLength();
- }
- storageVolumeInfo.setTotalStorageSize(totalStorageSize);
- }
-
-}