This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new 205bf2c50f KYLIN-5217 Create a initial commit 205bf2c50f is described below commit 205bf2c50fc6416910fd83846cf62d86747f2330 Author: junqing.cai <junqing....@kyligence.io> AuthorDate: Tue Aug 9 14:20:14 2022 +0800 KYLIN-5217 Create a initial commit --- .../apache/kylin/job/dao/ExecutableOutputPO.java | 149 ------------ .../org/apache/kylin/job/dao/ExecutablePO.java | 152 ------------ .../org/apache/kylin/job/dao/JobStatistics.java | 77 ------ .../apache/kylin/job/dao/JobStatisticsBasic.java | 53 ----- .../apache/kylin/job/dao/JobStatisticsManager.java | 263 --------------------- .../org/apache/kylin/job/dao/NExecutableDao.java | 133 ----------- .../runners/AbstractDefaultSchedulerRunner.java | 93 -------- .../apache/kylin/job/runners/FetcherRunner.java | 241 ------------------- .../kylin/job/runners/ForkBasedJobRunner.java | 48 ---- .../kylin/job/runners/InMemoryJobRunner.java | 36 --- .../apache/kylin/job/runners/JobCheckRunner.java | 90 ------- .../org/apache/kylin/job/runners/JobRunner.java | 76 ------ .../apache/kylin/job/runners/JobRunnerFactory.java | 185 --------------- .../job/runners/LicenseCapacityCheckRunner.java | 63 ----- .../kylin/job/runners/QuotaStorageCheckRunner.java | 77 ------ .../cube/storage/GarbageStorageCollector.java | 87 ------- .../cube/storage/ProjectStorageInfoCollector.java | 79 ------- .../cube/storage/StorageInfoCollector.java | 29 --- .../metadata/cube/storage/StorageInfoEnum.java | 23 -- .../cube/storage/StorageQuotaCollector.java | 35 --- .../metadata/cube/storage/StorageVolumeInfo.java | 38 --- .../cube/storage/TotalStorageCollector.java | 42 ---- 22 files changed, 2069 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java deleted file mode 100644 index a3b69ab095..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.io.InputStream; -import java.io.Serializable; -import java.util.Map; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -import lombok.Getter; -import lombok.Setter; - -/** - * - */ -@Setter -@Getter -@SuppressWarnings("serial") -@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class ExecutableOutputPO implements Serializable { - - @JsonIgnore - private InputStream contentStream; - - @JsonProperty("content") - private String content; - - @JsonProperty("log_path") - private String logPath; - - @JsonProperty("status") - private String status = "READY"; - - @JsonProperty("info") - private Map<String, String> info = Maps.newHashMap(); - - @JsonProperty("last_modified") - private long lastModified; - - @JsonProperty("createTime") - private long createTime = System.currentTimeMillis(); - - @JsonProperty("start_time") - private long startTime; - - @JsonProperty("end_time") - private long endTime; - - @JsonProperty("wait_time") - private long waitTime; - - @JsonProperty("duration") - private long duration; - - @JsonProperty("last_running_start_time") - private long lastRunningStartTime; - - @JsonProperty("is_resumable") - private boolean resumable = false; - - @JsonProperty("byte_size") - private long byteSize; - - @JsonProperty("failed_msg") - private String failedMsg; - - @JsonProperty("failed_step_id") - private String failedStepId; - - @JsonProperty("failed_segment_id") - private String failedSegmentId; - - @JsonProperty("failed_stack") - private String failedStack; - - @JsonProperty("failed_reason") - private String failedReason; - - public void addStartTime(long time) { - //when ready -> suicidal/discarded, the start time is 0 - if (startTime == 0) { - startTime = time; - } - endTime = 0; - } - - public void addEndTime(long time) { - Preconditions.checkArgument(startTime > 0L); - endTime = time; - } - - public void addDuration(long time) { - if (time != 0 && time > lastRunningStartTime) { - duration = duration + time - lastRunningStartTime; - } - } - - public void addLastRunningStartTime(long time) { - lastRunningStartTime = time; - } - - public void resetTime() { - createTime = System.currentTimeMillis(); - startTime = 0L; - endTime = 0L; - waitTime = 0L; - duration = 0L; - lastRunningStartTime = 0L; - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java deleted file mode 100644 index b396efd047..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_IDS_DELIMITER; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.execution.JobTypeEnum; -import org.apache.kylin.metadata.cube.model.NDataSegment; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import lombok.Getter; -import lombok.Setter; - -/** - */ -@Setter -@Getter -@SuppressWarnings("serial") -@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class ExecutablePO extends RootPersistentEntity { - public static final int HIGHEST_PRIORITY = 0; - public static final int DEFAULT_PRIORITY = 3; - public static final int LOWEST_PRIORITY = 4; - @JsonProperty("name") - private String name; - - @JsonProperty("tasks") - private List<ExecutablePO> tasks; - - @JsonProperty("type") - private String type; - - @JsonProperty("handler_type") - private String handlerType; - - @JsonProperty("params") - private Map<String, String> params = Maps.newHashMap(); - - @JsonProperty("segments") - private Set<NDataSegment> segments = Sets.newHashSet(); - - @JsonProperty("job_type") - private JobTypeEnum jobType; - - @JsonProperty("data_range_start") - private long dataRangeStart; - - @JsonProperty("data_range_end") - private long dataRangeEnd; - - @JsonProperty("target_model") - private String targetModel; - - @JsonProperty("target_segments") - private List<String> targetSegments; - - @JsonProperty("output") - private ExecutableOutputPO output = new ExecutableOutputPO(); - - private String project; - - @JsonProperty("target_partitions") - private Set<Long> targetPartitions = Sets.newHashSet(); - - @JsonProperty("priority") - private int priority = DEFAULT_PRIORITY; - - @JsonProperty("tag") - private Object tag; - - @JsonProperty("stages_map") - private Map<String, List<ExecutablePO>> stagesMap; - - public void setPriority(int p) { - priority = isPriorityValid(p) ? p : DEFAULT_PRIORITY; - } - - @Override - public String getResourcePath() { - return concatResourcePath(getUuid(), project); - } - - public static String concatResourcePath(String name, String project) { - return new StringBuilder().append("/").append(project).append(ResourceStore.EXECUTABLE_JOB).append("/") - .append(name).toString(); - } - - public static boolean isPriorityValid(int priority) { - return priority >= HIGHEST_PRIORITY && priority <= LOWEST_PRIORITY; - } - - public static boolean isHigherPriority(int p1, int p2) { - return p1 < p2; - } - - public void addYarnApplicationJob(String appId) { - String oldAppIds = output.getInfo().getOrDefault(ExecutableConstants.YARN_APP_IDS, ""); - Set<String> appIds = new HashSet<>(Arrays.asList(oldAppIds.split(YARN_APP_IDS_DELIMITER))); - if (!appIds.contains(appId)) { - String newAppIds = oldAppIds + (StringUtils.isEmpty(oldAppIds) ? "" : YARN_APP_IDS_DELIMITER) + appId; - output.getInfo().put(ExecutableConstants.YARN_APP_IDS, newAppIds); - } - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java deleted file mode 100644 index 0b1928fbbf..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.util.Map; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Maps; - -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -@Getter -@Setter -@NoArgsConstructor -public class JobStatistics extends JobStatisticsBasic { - - @JsonProperty("date") - private long date; - @JsonProperty("model_stats") - private Map<String, JobStatisticsBasic> jobStatisticsByModels = Maps.newHashMap(); - - @Override - public String resourceName() { - return String.valueOf(date); - } - - public JobStatistics(long date, String model, long duration, long byteSize) { - this.date = date; - setCount(1); - setTotalDuration(duration); - setTotalByteSize(byteSize); - jobStatisticsByModels.put(model, new JobStatisticsBasic(duration, byteSize)); - } - - public JobStatistics(int count, long totalDuration, long totalByteSize) { - setCount(count); - setTotalDuration(totalDuration); - setTotalByteSize(totalByteSize); - } - - public JobStatistics(long date, long totalDuration, long totalByteSize) { - this.date = date; - setCount(1); - setTotalDuration(totalDuration); - setTotalByteSize(totalByteSize); - } - - public void update(String model, long duration, long byteSize, int deltaCount) { - super.update(duration, byteSize, deltaCount); - JobStatisticsBasic jobStatisticsByModel = jobStatisticsByModels.get(model); - if (jobStatisticsByModel == null) { - jobStatisticsByModel = new JobStatisticsBasic(duration, byteSize); - } else { - jobStatisticsByModel.update(duration, byteSize, deltaCount); - } - - jobStatisticsByModels.put(model, jobStatisticsByModel); - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java deleted file mode 100644 index c05743c3cf..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import org.apache.kylin.common.persistence.RootPersistentEntity; - -import com.fasterxml.jackson.annotation.JsonProperty; - -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - -@Getter -@Setter -@NoArgsConstructor -@AllArgsConstructor -public class JobStatisticsBasic extends RootPersistentEntity { - @JsonProperty("count") - private int count; - @JsonProperty("total_duration") - private long totalDuration; - @JsonProperty("total_byte_size") - private long totalByteSize; - - public void update(long duration, long byteSize, int deltaCount) { - this.count += deltaCount; - this.totalDuration += duration; - this.totalByteSize += byteSize; - } - - public JobStatisticsBasic(long totalDuration, long totalByteSize) { - this.count = 1; - this.totalDuration = totalDuration; - this.totalByteSize = totalByteSize; - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java deleted file mode 100644 index 2f0a65bfd3..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.time.DayOfWeek; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.time.temporal.TemporalAdjusters; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.TimeZone; -import java.util.stream.Collectors; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.cachesync.CachedCrudAssist; -import org.apache.kylin.metadata.model.NDataModel; -import org.apache.kylin.metadata.model.NDataModelManager; - -import com.google.common.collect.Maps; - -public class JobStatisticsManager { - - public static JobStatisticsManager getInstance(KylinConfig config, String project) { - return config.getManager(project, JobStatisticsManager.class); - } - - // called by reflection - static JobStatisticsManager newInstance(KylinConfig conf, String project) { - try { - String cls = JobStatisticsManager.class.getName(); - Class<? extends JobStatisticsManager> clz = ClassUtil.forName(cls, JobStatisticsManager.class); - return clz.getConstructor(KylinConfig.class, String.class).newInstance(conf, project); - } catch (Exception e) { - throw new RuntimeException("Failed to init DataModelManager from " + conf, e); - } - } - - // ============================================================================ - - private KylinConfig config; - private String project; - - private CachedCrudAssist<JobStatistics> crud; - - public JobStatisticsManager(KylinConfig config, String project) { - init(config, project); - } - - protected void init(KylinConfig cfg, final String project) { - this.config = cfg; - this.project = project; - final ResourceStore store = ResourceStore.getKylinMetaStore(this.config); - String resourceRootPath = "/" + this.project + ResourceStore.JOB_STATISTICS; - this.crud = new CachedCrudAssist<JobStatistics>(store, resourceRootPath, JobStatistics.class) { - @Override - protected JobStatistics initEntityAfterReload(JobStatistics jobStatistics, String resourceName) { - return jobStatistics; - } - }; - } - - public List<JobStatistics> getAll() { - return crud.listAll(); - } - - public JobStatistics updateStatistics(long date, String model, long duration, long byteSize, int deltaCount) { - JobStatistics jobStatistics = crud.get(String.valueOf(date)); - JobStatistics jobStatisticsToUpdate; - if (jobStatistics == null) { - jobStatisticsToUpdate = new JobStatistics(date, model, duration, byteSize); - return crud.save(jobStatisticsToUpdate); - } - - jobStatisticsToUpdate = crud.copyForWrite(jobStatistics); - jobStatisticsToUpdate.update(model, duration, byteSize, deltaCount); - return crud.save(jobStatisticsToUpdate); - } - - public JobStatistics updateStatistics(long date, long duration, long byteSize, int deltaCount) { - JobStatistics jobStatistics = crud.get(String.valueOf(date)); - JobStatistics jobStatisticsToUpdate; - if (jobStatistics == null) { - jobStatisticsToUpdate = new JobStatistics(date, duration, byteSize); - return crud.save(jobStatisticsToUpdate); - } - - jobStatisticsToUpdate = crud.copyForWrite(jobStatistics); - jobStatisticsToUpdate.update(duration, byteSize, deltaCount); - return crud.save(jobStatisticsToUpdate); - } - - public Pair<Integer, JobStatistics> getOverallJobStats(final long startTime, final long endTime) { - // filter - List<JobStatistics> filteredJobStats = getFilteredJobStatsByTime(crud.listAll(), startTime, endTime); - // aggregate all job stats - JobStatistics aggregatedStats = aggregateJobStats(filteredJobStats); - - return new Pair<>(aggregatedStats.getCount(), aggregatedStats); - } - - public Map<String, Integer> getJobCountByTime(final long startTime, final long endTime, - final String timeDimension) { - Map<String, Integer> result = Maps.newHashMap(); - aggregateJobStatsByTime(startTime, endTime, timeDimension).forEach((key, value) -> { - result.put(key, value.getCount()); - }); - return result; - } - - public Map<String, Integer> getJobCountByModel(long startTime, long endTime) { - Map<String, Integer> result = Maps.newHashMap(); - - aggregateStatsByModel(startTime, endTime).forEach((modelName, value) -> { - String modelAlias = getModelAlias(modelName); - if (modelAlias == null) - return; - result.put(modelAlias, value.getCount()); - }); - - return result; - } - - public Map<String, Double> getDurationPerByteByTime(final long startTime, final long endTime, - final String timeDimension) { - Map<String, JobStatisticsBasic> aggregateResult = aggregateJobStatsByTime(startTime, endTime, timeDimension); - return calculateDurationPerByte(aggregateResult); - } - - public Map<String, Double> getDurationPerByteByModel(long startTime, long endTime) { - Map<String, JobStatisticsBasic> transformedResult = Maps.newHashMap(); - - aggregateStatsByModel(startTime, endTime).forEach((modelName, value) -> { - String modelAlias = getModelAlias(modelName); - if (modelAlias == null) - return; - transformedResult.put(modelAlias, - new JobStatisticsBasic(value.getTotalDuration(), value.getTotalByteSize())); - }); - - return calculateDurationPerByte(transformedResult); - } - - private String getModelAlias(String modelId) { - NDataModelManager dataModelManager = NDataModelManager.getInstance(config, project); - NDataModel model = dataModelManager.getDataModelDesc(modelId); - if (model == null) - return null; - - return model.getAlias(); - } - - private JobStatistics aggregateJobStats(List<JobStatistics> jobStatisticsToAggregate) { - return jobStatisticsToAggregate.stream() - .reduce((x, y) -> new JobStatistics(x.getCount() + y.getCount(), - x.getTotalDuration() + y.getTotalDuration(), x.getTotalByteSize() + y.getTotalByteSize())) - .orElse(new JobStatistics()); - } - - // key is the date, value is the aggregated job stats - private Map<String, JobStatisticsBasic> aggregateJobStatsByTime(final long startTime, final long endTime, - final String timeDimension) { - Map<String, JobStatisticsBasic> result = Maps.newHashMap(); - - List<JobStatistics> qulifiedJobStats = getFilteredJobStatsByTime(crud.listAll(), startTime, endTime); - - long startDate = startTime; - while (startDate <= endTime) { - long nextDate = nextDate(startDate, timeDimension); - List<JobStatistics> list = getFilteredJobStatsByTime(qulifiedJobStats, startDate, nextDate); - result.put(formatDateTime(startDate), aggregateJobStats(list)); - startDate = nextDate; - } - - return result; - } - - // format epoch time to date string - private String formatDateTime(long time) { - ZoneId zoneId = TimeZone.getDefault().toZoneId(); - LocalDateTime localDateTime = Instant.ofEpochMilli(time).atZone(zoneId).toLocalDateTime(); - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd", - Locale.getDefault(Locale.Category.FORMAT)); - return localDateTime.format(formatter); - } - - private long nextDate(final long date, final String timeDimension) { - ZoneId zoneId = TimeZone.getTimeZone(config.getTimeZone()).toZoneId(); - LocalDate localDate = Instant.ofEpochMilli(date).atZone(zoneId).toLocalDate(); - switch (timeDimension) { - case "day": - localDate = localDate.plusDays(1); - break; - case "week": - localDate = localDate.with(TemporalAdjusters.next(DayOfWeek.MONDAY)); - break; - case "month": - localDate = localDate.with(TemporalAdjusters.firstDayOfNextMonth()); - break; - default: - localDate = localDate.plusDays(1); - break; - } - - return localDate.atStartOfDay(zoneId).toInstant().toEpochMilli(); - } - - private Map<String, JobStatisticsBasic> aggregateStatsByModel(long startTime, long endTime) { - return getFilteredJobStatsByTime(crud.listAll(), startTime, endTime).stream() - .map(JobStatistics::getJobStatisticsByModels).reduce((x, y) -> { - // merge two maps - Map<String, JobStatisticsBasic> mergedMap = Maps.newHashMap(x); - y.forEach((k, v) -> mergedMap.merge(k, v, - (value1, value2) -> new JobStatisticsBasic(value1.getCount() + value2.getCount(), - value1.getTotalDuration() + value2.getTotalDuration(), - value1.getTotalByteSize() + value2.getTotalByteSize()))); - return mergedMap; - }).orElse(Maps.newHashMap()); - } - - private List<JobStatistics> getFilteredJobStatsByTime(final List<JobStatistics> list, final long startTime, - final long endTime) { - return list.stream() - .filter(singleStats -> singleStats.getDate() >= startTime && singleStats.getDate() < endTime) - .collect(Collectors.toList()); - } - - private Map<String, Double> calculateDurationPerByte(Map<String, JobStatisticsBasic> totalMetricMap) { - Map<String, Double> result = Maps.newHashMap(); - for (Map.Entry<String, JobStatisticsBasic> entry : totalMetricMap.entrySet()) { - double totalDuration = entry.getValue().getTotalDuration(); - double totalByteSize = entry.getValue().getTotalByteSize(); - if (totalByteSize == 0) - result.put(entry.getKey(), .0); - else { - result.put(entry.getKey(), totalDuration / totalByteSize); - } - } - return result; - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java deleted file mode 100644 index 5c353d989c..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.dao; - -import java.util.Comparator; -import java.util.List; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.JsonSerializer; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.persistence.Serializer; -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.metadata.cachesync.CachedCrudAssist; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import lombok.val; - -/** - */ -public class NExecutableDao { - - private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<>(ExecutablePO.class); - private static final Logger logger = LoggerFactory.getLogger(NExecutableDao.class); - - public static NExecutableDao getInstance(KylinConfig config, String project) { - return config.getManager(project, NExecutableDao.class); - } - - // called by reflection - static NExecutableDao newInstance(KylinConfig config, String project) { - return new NExecutableDao(config, project); - } - // ============================================================================ - - private String project; - - private KylinConfig config; - - private CachedCrudAssist<ExecutablePO> crud; - - private NExecutableDao(KylinConfig config, String project) { - logger.trace("Using metadata url: {}", config); - this.project = project; - this.config = config; - String resourceRootPath = "/" + project + ResourceStore.EXECUTE_RESOURCE_ROOT; - this.crud = new CachedCrudAssist<ExecutablePO>(getStore(), resourceRootPath, "", ExecutablePO.class) { - @Override - protected ExecutablePO initEntityAfterReload(ExecutablePO entity, String resourceName) { - entity.setProject(project); - return entity; - } - }; - } - - public List<ExecutablePO> getJobs() { - return crud.listAll(); - } - - public List<ExecutablePO> getPartialJobs(Predicate<String> predicate) { - return crud.listPartial(predicate); - } - - public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) { - return crud.listAll().stream() - .filter(x -> x.getLastModified() >= timeStart && x.getLastModified() < timeEndExclusive) - .collect(Collectors.toList()); - } - - public List<String> getJobIds() { - return crud.listAll().stream().sorted(Comparator.comparing(ExecutablePO::getCreateTime)) - .sorted(Comparator.comparing(ExecutablePO::getPriority)).map(RootPersistentEntity::resourceName) - .collect(Collectors.toList()); - } - - public ExecutablePO getJobByUuid(String uuid) { - return crud.get(uuid); - } - - public ExecutablePO addJob(ExecutablePO job) { - if (getJobByUuid(job.getUuid()) != null) { - throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists"); - } - crud.save(job); - return job; - } - - // for ut - @VisibleForTesting - public void deleteAllJob() { - getJobs().forEach(job -> deleteJob(job.getId())); - } - - public void deleteJob(String uuid) { - crud.delete(uuid); - } - - public void updateJob(String uuid, Predicate<ExecutablePO> updater) { - val job = getJobByUuid(uuid); - Preconditions.checkNotNull(job); - val copyForWrite = JsonUtil.copyBySerialization(job, JOB_SERIALIZER, null); - if (updater.test(copyForWrite)) { - crud.save(copyForWrite); - } - } - - private ResourceStore getStore() { - return ResourceStore.getKylinMetaStore(config); - } - -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java deleted file mode 100644 index a8c6f3c973..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.logging.SetLogCategory; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; -import org.apache.kylin.job.execution.ExecutableContext; -import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.metadata.epoch.EpochManager; -import org.apache.kylin.metadata.project.EnhancedUnitOfWork; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.SneakyThrows; - -public abstract class AbstractDefaultSchedulerRunner implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(AbstractDefaultSchedulerRunner.class); - protected final NDefaultScheduler nDefaultScheduler; - - protected final ExecutableContext context; - - protected final String project; - - public AbstractDefaultSchedulerRunner(final NDefaultScheduler nDefaultScheduler) { - this.nDefaultScheduler = nDefaultScheduler; - this.context = nDefaultScheduler.getContext(); - this.project = nDefaultScheduler.getProject(); - } - - @SneakyThrows - private boolean checkEpochIdFailed() { - //check failed if isInterrupted - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); - } - - if (!KylinConfig.getInstanceFromEnv().isUTEnv() - && !EpochManager.getInstance().checkEpochId(context.getEpochId(), project)) { - nDefaultScheduler.forceShutdown(); - return true; - } - return false; - } - - // stop job if Storage Quota Limit reached - protected void stopJobIfSQLReached(String jobId) { - if (context.isReachQuotaLimit()) { - try { - EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - if (context.isReachQuotaLimit()) { - NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(jobId); - logger.info("Job {} paused due to no available storage quota.", jobId); - logger.info("Please clean up low-efficient storage in time, " - + "increase the low-efficient storage threshold, " - + "or notify the administrator to increase the storage quota for this project."); - } - return null; - }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); - } catch (Exception e) { - logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} failed to pause", project, jobId, e); - } - } - } - - @Override - public void run() { - try (SetLogCategory ignored = new SetLogCategory("schedule")) { - if (checkEpochIdFailed()) { - return; - } - doRun(); - } - } - - protected abstract void doRun(); -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java deleted file mode 100644 index c9bb123a45..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.Executable; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.metadata.project.EnhancedUnitOfWork; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.val; - -public class FetcherRunner extends AbstractDefaultSchedulerRunner { - - private static final Logger logger = LoggerFactory.getLogger(FetcherRunner.class); - - private final ExecutorService jobPool; - - private final ScheduledExecutorService fetcherPool; - - public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService jobPool, - ScheduledExecutorService fetcherPool) { - super(nDefaultScheduler); - this.jobPool = jobPool; - this.fetcherPool = fetcherPool; - } - - private boolean checkSuicide(String jobId) { - val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - if (executableManager.getJob(jobId).getStatus().isFinalState()) { - return false; - } - return executableManager.getJob(jobId).checkSuicide(); - } - - private boolean markSuicideJob(String jobId) { - try { - if (checkSuicide(jobId)) { - return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - if (checkSuicide(jobId)) { - NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).suicideJob(jobId); - return true; - } - return false; - }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); - } - } catch (Exception e) { - logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should be suicidal but discard failed", project, - jobId, e); - } - return false; - } - - private boolean markErrorJob(String jobId) { - try { - return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - val manager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - manager.errorJob(jobId); - return true; - }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); - } catch (Exception e) { - logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should be error but mark failed", project, - jobId, e); - } - return false; - } - - @Override - public void doRun() { - try { - val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - Map<String, Executable> runningJobs = context.getRunningJobs(); - - int nRunning = 0; - int nReady = 0; - int nStopped = 0; - int nOthers = 0; - int nError = 0; - int nDiscarded = 0; - int nSucceed = 0; - int nSuicidal = 0; - for (final String id : executableManager.getJobs()) { - if (markSuicideJob(id)) { - nSuicidal++; - continue; - } - - if (runningJobs.containsKey(id)) { - - // this is very important to prevent from same job being scheduled at same time. - // e.g. when a job is restarted, the old job may still be running (even if we tried to interrupt it) - // until the old job is finished, the new job should not start - nRunning++; - continue; - } - - final Output output = executableManager.getOutput(id); - - switch (output.getState()) { - case READY: - nReady++; - if (isJobPoolFull()) { - break; - } - - if (context.isReachQuotaLimit()) { - stopJobIfSQLReached(id); - break; - } - - logger.info("fetcher schedule {} ", id); - scheduleJob(id); - break; - case DISCARDED: - nDiscarded++; - break; - case ERROR: - nError++; - break; - case SUCCEED: - nSucceed++; - break; - case PAUSED: - nStopped++; - break; - case SUICIDAL: - nSuicidal++; - break; - default: - if (allSubTasksSuccess(id)) { - logger.info("All sub tasks are successful, reschedule job {}", id); - scheduleJob(id); - break; - } - logger.warn("Unexpected status for {} <{}>", id, output.getState()); - if (markErrorJob(id)) { - nError++; - } else { - nOthers++; - } - break; - } - } - - logger.info( - "Job Status in project {}: {} should running, {} actual running, {} stopped, {} ready, {} already succeed, {} error, {} discarded, {} suicidal, {} others", - project, nRunning, runningJobs.size(), nStopped, nReady, nSucceed, nError, nDiscarded, nSuicidal, - nOthers); - } catch (Exception e) { - logger.warn("Job Fetcher caught a exception ", e); - } - } - - private boolean allSubTasksSuccess(String id) { - val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - - // check special case, all sub task success, show make current job to success - AbstractExecutable job = executableManager.getJob(id); - if (job instanceof DefaultChainedExecutable) { - return ((DefaultChainedExecutable) job).getTasks().stream() - .allMatch(abstractExecutable -> abstractExecutable.getStatus() == ExecutableState.SUCCEED); - } - - return false; - } - - private void scheduleJob(String id) { - AbstractExecutable executable = null; - String jobDesc = null; - - boolean memoryLock = false; - int useMemoryCapacity = 0; - try { - val config = KylinConfig.getInstanceFromEnv(); - val executableManager = NExecutableManager.getInstance(config, project); - executable = executableManager.getJob(id); - if (!config.getDeployMode().equals("cluster")) { - useMemoryCapacity = executable.computeStepDriverMemory(); - } - memoryLock = NDefaultScheduler.getMemoryRemaining().tryAcquire(useMemoryCapacity); - if (memoryLock) { - jobDesc = executable.toString(); - logger.info("{} prepare to schedule", jobDesc); - context.addRunningJob(executable); - jobPool.execute(new JobRunner(nDefaultScheduler, executable, this)); - logger.info("{} scheduled", jobDesc); - } else { - logger.info("memory is not enough, remaining: {} MB", - NDefaultScheduler.getMemoryRemaining().availablePermits()); - } - } catch (Exception ex) { - if (executable != null) { - context.removeRunningJob(executable); - if (memoryLock) { - // may release twice when exception raise after jobPool execute executable - NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity); - } - } - logger.warn("{} fail to schedule", jobDesc, ex); - } - } - - private boolean isJobPoolFull() { - if (context.getRunningJobs().size() >= nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) { - logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time."); - return true; - } - return false; - } - - void scheduleNext() { - fetcherPool.schedule(this, 0, TimeUnit.SECONDS); - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java deleted file mode 100644 index fc29a5ebb4..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import java.util.List; -import java.util.Locale; -import java.util.Map; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BufferedLogger; -import org.apache.kylin.common.util.CliCommandExecutor; -import org.apache.kylin.common.util.ExecutableApplication; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class ForkBasedJobRunner extends JobRunnerFactory.AbstractJobRunner { - - private final CliCommandExecutor cliExecutor = new CliCommandExecutor(); - - public ForkBasedJobRunner(KylinConfig kylinConfig, String project, List<String> originResources) { - super(kylinConfig, project, originResources); - } - - @Override - protected void doExecute(ExecutableApplication app, Map<String, String> args) throws Exception { - String finalCommand = String.format(Locale.ROOT, "bash -x %s/sbin/bootstrap.sh %s %s 2>>%s", - KylinConfig.getKylinHome(), app.getClass().getName(), formatArgs(args), getJobTmpDir() + "/job.log"); - log.info("Try to execute {}", finalCommand); - cliExecutor.execute(finalCommand, new BufferedLogger(log), jobId); - } - -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java deleted file mode 100644 index 119c3313e3..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import java.util.List; -import java.util.Map; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ExecutableApplication; - -public class InMemoryJobRunner extends JobRunnerFactory.AbstractJobRunner { - - public InMemoryJobRunner(KylinConfig config, String project, List<String> resources) { - super(config, project, resources); - } - - @Override - public void doExecute(ExecutableApplication app, Map<String, String> args) throws Exception { - app.execute(formatArgs(args).split(" ")); - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java deleted file mode 100644 index 195a244900..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import java.util.Map; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; -import org.apache.kylin.job.execution.Executable; -import org.apache.kylin.job.execution.NExecutableManager; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.metadata.project.EnhancedUnitOfWork; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.val; - -public class JobCheckRunner extends AbstractDefaultSchedulerRunner { - - private static final Logger logger = LoggerFactory.getLogger(JobCheckRunner.class); - - public JobCheckRunner(NDefaultScheduler nDefaultScheduler) { - super(nDefaultScheduler); - } - - private boolean checkTimeoutIfNeeded(String jobId, Long startTime) { - Integer timeOutMinute = KylinConfig.getInstanceFromEnv().getSchedulerJobTimeOutMinute(); - if (timeOutMinute == 0) { - return false; - } - val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - if (executableManager.getJob(jobId).getStatus().isFinalState()) { - return false; - } - long duration = System.currentTimeMillis() - startTime; - long durationMins = Math.toIntExact(duration / (60 * 1000)); - return durationMins >= timeOutMinute; - } - - private boolean discardTimeoutJob(String jobId, Long startTime) { - try { - if (checkTimeoutIfNeeded(jobId, startTime)) { - return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - if (checkTimeoutIfNeeded(jobId, startTime)) { - logger.error("project {} job {} running timeout.", project, jobId); - NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).errorJob(jobId); - return true; - } - return false; - }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); - } - } catch (Exception e) { - logger.warn("[UNEXPECTED_THINGS_HAPPENED] project " + project + " job " + jobId - + " should be timeout but discard failed", e); - } - return false; - } - - @Override - protected void doRun() { - - logger.info("start check project {} job pool.", project); - - val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - Map<String, Executable> runningJobs = context.getRunningJobs(); - Map<String, Long> runningJobInfos = context.getRunningJobInfos(); - for (final String id : executableManager.getJobs()) { - if (runningJobs.containsKey(id)) { - discardTimeoutJob(id, runningJobInfos.get(id)); - stopJobIfSQLReached(id); - } - } - - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java deleted file mode 100644 index 3e839f57f4..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.SetThreadName; -import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException; -import org.apache.kylin.job.exception.JobStoppedVoluntarilyException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.common.logging.SetLogCategory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.val; -import lombok.var; - -public class JobRunner extends AbstractDefaultSchedulerRunner { - private static final Logger logger = LoggerFactory.getLogger(JobRunner.class); - - private final AbstractExecutable executable; - - private final FetcherRunner fetcherRunner; - - public JobRunner(NDefaultScheduler nDefaultScheduler, AbstractExecutable executable, FetcherRunner fetcherRunner) { - super(nDefaultScheduler); - this.fetcherRunner = fetcherRunner; - this.executable = executable; - } - - @Override - protected void doRun() { - //only the first 8 chars of the job uuid - val jobIdSimple = executable.getId().substring(0, 8); - try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); - SetLogCategory logCategory = new SetLogCategory("schedule")) { - executable.execute(context); - // trigger the next step asap - fetcherRunner.scheduleNext(); - } catch (JobStoppedVoluntarilyException | JobStoppedNonVoluntarilyException e) { - logger.info("Job quits either voluntarily or non-voluntarily,job: {}", jobIdSimple, e); - } catch (ExecuteException e) { - logger.error("ExecuteException occurred while job: " + executable.getId(), e); - } catch (Exception e) { - logger.error("unknown error execute job: " + executable.getId(), e); - } finally { - context.removeRunningJob(executable); - val config = KylinConfig.getInstanceFromEnv(); - var usingMemory = 0; - if (!config.getDeployMode().equals("cluster")) { - usingMemory = executable.computeStepDriverMemory(); - } - logger.info("Before job:{} global memory release {}", jobIdSimple, - NDefaultScheduler.getMemoryRemaining().availablePermits()); - NDefaultScheduler.getMemoryRemaining().release(usingMemory); - logger.info("After job:{} global memory release {}", jobIdSimple, - NDefaultScheduler.getMemoryRemaining().availablePermits()); - } - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java deleted file mode 100644 index 0713558d35..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.NotImplementedException; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.KylinConfigBase; -import org.apache.kylin.common.persistence.RawResource; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ExecutableApplication; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.common.util.ZipFileUtils; -import org.apache.kylin.common.persistence.metadata.MetadataStore; -import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams; -import org.apache.kylin.metadata.project.EnhancedUnitOfWork; - -import com.google.common.collect.Maps; - -import lombok.RequiredArgsConstructor; -import lombok.val; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class JobRunnerFactory { - - private JobRunnerFactory() { - // Just implement it - } - - public static AbstractJobRunner createRunner(KylinConfig config, String type, String project, - List<String> resources) { - switch (type) { - case "fork": - return new ForkBasedJobRunner(config, project, resources); - case "in-memory": - return new InMemoryJobRunner(config, project, resources); - default: - throw new NotImplementedException("Runner type " + type + " not implement"); - } - } - - @RequiredArgsConstructor - public abstract static class AbstractJobRunner { - - protected final KylinConfig kylinConfig; - protected final String project; - protected final List<String> originResources; - private Consumer<Properties> configUpdater; - - protected String metaDumpUrl; - protected String jobId; - - public String prepareEnv(String jobId) throws IOException { - this.jobId = jobId; - val jobTmpDir = getJobTmpDir(); - FileUtils.forceMkdir(new File(jobTmpDir)); - - metaDumpUrl = kylinConfig.getMetadataUrlPrefix() + "@hdfs,path=file://" + jobTmpDir + "/meta"; - return jobTmpDir; - } - - public void start(ExecutableApplication app, Map<String, String> args) throws Exception { - attachMetadataAndKylinProps(false); - args.put("meta", metaDumpUrl); - args.put("metaOutput", metaDumpUrl + "_output"); - doExecute(app, args); - uploadJobLog(); - } - - public void cleanupEnv() { - FileUtils.deleteQuietly(new File(getJobTmpDir())); - } - - protected abstract void doExecute(ExecutableApplication app, Map<String, String> args) throws Exception; - - protected String formatArgs(Map<String, String> args) { - return args.entrySet().stream().map(entry -> "--" + entry.getKey() + "=" + entry.getValue()) - .collect(Collectors.joining(" ")); - } - - public String getJobTmpDir() { - return KylinConfigBase.getKylinHome() + "/tmp/" + jobId; - } - - public void setConfigUpdater(Consumer<Properties> consumer) { - this.configUpdater = consumer; - } - - protected boolean uploadJobLog() { - val jobContentZip = getJobTmpDir() + ".zip"; - try { - ZipFileUtils.compressZipFile(getJobTmpDir(), jobContentZip); - val jobDir = kylinConfig.getJobTmpDir(project, true); - val fs = HadoopUtil.getFileSystem(jobDir); - - try (val in = new FileInputStream(new File(jobContentZip)); - val out = fs.create(new Path(jobDir + jobId + ".zip"), true)) { - IOUtils.copy(in, out); - } - return true; - } catch (IOException e) { - log.warn("Upload Job Evidence failed {}", jobId, e); - } finally { - FileUtils.deleteQuietly(new File(jobContentZip)); - } - return false; - } - - protected void attachMetadataAndKylinProps(boolean kylinPropsOnly) throws IOException { - if (StringUtils.isEmpty(metaDumpUrl)) { - throw new RuntimeException("Missing metaUrl"); - } - - File tmpDir = File.createTempFile("kylin_job_meta", ""); - try { - org.apache.commons.io.FileUtils.forceDelete(tmpDir); // we need a directory, so delete the file first - - Properties props = kylinConfig.exportToProperties(); - props.setProperty("kylin.metadata.url", metaDumpUrl); - if (configUpdater != null) { - configUpdater.accept(props); - } - - if (kylinPropsOnly) { - ResourceStore.dumpKylinProps(tmpDir, props); - } else { - // The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value. - Map<String, RawResource> dumpMap = EnhancedUnitOfWork.doInTransactionWithCheckAndRetry( - UnitOfWorkParams.<Map> builder().readonly(true).unitName(project).processor(() -> { - Map<String, RawResource> retMap = Maps.newHashMap(); - for (String resPath : originResources) { - ResourceStore resourceStore = ResourceStore.getKylinMetaStore(kylinConfig); - RawResource rawResource = resourceStore.getResource(resPath); - retMap.put(resPath, rawResource); - } - return retMap; - }).build()); - - if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) { - return; - } - // dump metadata - ResourceStore.dumpResourceMaps(kylinConfig, tmpDir, dumpMap, props); - } - - // copy metadata to target metaUrl - KylinConfig dstConfig = KylinConfig.createKylinConfig(props); - MetadataStore.createMetadataStore(dstConfig).uploadFromFile(tmpDir); - // clean up - log.debug("Copied metadata to the target metaUrl, delete the temp dir: {}", tmpDir); - } finally { - FileUtils.forceDelete(tmpDir); - } - } - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java deleted file mode 100644 index 1b51dd2c3b..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import static org.apache.kylin.common.exception.CommonErrorCode.LICENSE_OVER_CAPACITY; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.annotation.Clarification; -import org.apache.kylin.common.exception.KylinException; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.metadata.sourceusage.SourceUsageManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import lombok.val; - -@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") -public class LicenseCapacityCheckRunner extends AbstractDefaultSchedulerRunner { - private static final Logger logger = LoggerFactory.getLogger(LicenseCapacityCheckRunner.class); - - public LicenseCapacityCheckRunner(NDefaultScheduler nDefaultScheduler) { - super(nDefaultScheduler); - } - - @Override - protected void doRun() { - logger.info("start check license capacity for project {}", project); - context.setLicenseOverCapacity(isLicenseOverCapacity()); - } - - private boolean isLicenseOverCapacity() { - val sourceUsageManager = SourceUsageManager.getInstance(KylinConfig.getInstanceFromEnv()); - - try { - sourceUsageManager.checkIsOverCapacity(project); - } catch (KylinException e) { - if (LICENSE_OVER_CAPACITY.toErrorCode() == e.getErrorCode()) { - logger.warn("Source usage over capacity, no job will be scheduled.", e); - return true; - } - } catch (Throwable e) { - logger.warn("Check source usage over capacity failed.", e); - } - - // not over capacity - return false; - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java deleted file mode 100644 index c7f3140671..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.job.runners; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.annotation.Clarification; -import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; -import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector; -import org.apache.kylin.metadata.cube.storage.StorageInfoEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -import lombok.val; -import lombok.var; - -@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") -public class QuotaStorageCheckRunner extends AbstractDefaultSchedulerRunner { - private static final Logger logger = LoggerFactory.getLogger(QuotaStorageCheckRunner.class); - - private final ProjectStorageInfoCollector collector; - - public QuotaStorageCheckRunner(NDefaultScheduler nDefaultScheduler) { - super(nDefaultScheduler); - - val storageInfoEnumList = Lists.newArrayList(StorageInfoEnum.STORAGE_QUOTA, StorageInfoEnum.TOTAL_STORAGE); - collector = new ProjectStorageInfoCollector(storageInfoEnumList); - } - - @Override - protected void doRun() { - logger.info("start check project {} storage quota.", nDefaultScheduler.getProject()); - context.setReachQuotaLimit(reachStorageQuota()); - } - - private boolean reachStorageQuota() { - var storageVolumeInfo = collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(), - nDefaultScheduler.getProject()); - var totalSize = storageVolumeInfo.getTotalStorageSize(); - int retryCount = 3; - while (retryCount-- > 0 && totalSize < 0) { - storageVolumeInfo = collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(), - nDefaultScheduler.getProject()); - totalSize = storageVolumeInfo.getTotalStorageSize(); - } - val storageQuotaSize = storageVolumeInfo.getStorageQuotaSize(); - if (totalSize < 0) { - logger.error( - "Project '{}' : an exception occurs when getting storage volume info, no job will be scheduled!!! The error info : {}", - nDefaultScheduler.getProject(), - storageVolumeInfo.getThrowableMap().get(StorageInfoEnum.TOTAL_STORAGE)); - return true; - } - if (totalSize >= storageQuotaSize) { - logger.info("Project '{}' reach storage quota, no job will be scheduled!!!", - nDefaultScheduler.getProject()); - return true; - } - return false; - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java deleted file mode 100644 index 5fb6ecc8ff..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.cube.storage; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.cube.model.NDataLayout; -import org.apache.kylin.metadata.cube.model.NDataSegment; -import org.apache.kylin.metadata.cube.model.NDataflow; -import org.apache.kylin.metadata.cube.model.NDataflowManager; -import org.apache.kylin.metadata.cube.optimization.IndexOptimizer; -import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory; -import org.apache.kylin.metadata.model.NDataModel; - -import com.google.common.collect.Maps; - -import lombok.val; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class GarbageStorageCollector implements StorageInfoCollector { - - @Override - public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { - Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap(); - long storageSize = 0L; - - for (val model : getModels(project)) { - val dataflow = getDataflow(model).copy(); - - final IndexOptimizer indexOptimizer = IndexOptimizerFactory.getOptimizer(dataflow, false); - val garbageLayouts = indexOptimizer.getGarbageLayoutMap(dataflow).keySet(); - if (CollectionUtils.isNotEmpty(garbageLayouts)) { - storageSize += calculateLayoutSize(garbageLayouts, dataflow); - garbageIndexMap.put(model.getId(), garbageLayouts); - } - } - - storageVolumeInfo.setGarbageModelIndexMap(garbageIndexMap); - storageVolumeInfo.setGarbageStorageSize(storageSize); - } - - private List<NDataModel> getModels(String project) { - val dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project); - return dataflowManager.listUnderliningDataModels(); - } - - private NDataflow getDataflow(NDataModel model) { - val dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), model.getProject()); - return dataflowManager.getDataflow(model.getUuid()); - } - - private long calculateLayoutSize(Set<Long> cuboidLayoutIdSet, NDataflow dataflow) { - long cuboidLayoutSize = 0L; - for (NDataSegment segment : dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING)) { - for (Long cuboidLayoutId : cuboidLayoutIdSet) { - NDataLayout dataCuboid = segment.getSegDetails().getLayoutById(cuboidLayoutId); - if (dataCuboid != null) { - cuboidLayoutSize += dataCuboid.getByteSize(); - } - } - } - return cuboidLayoutSize; - } - -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java deleted file mode 100644 index 6a5f4d7c56..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.cube.storage; - -import java.util.List; - -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.kylin.common.KylinConfig; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; - -public class ProjectStorageInfoCollector { - - private List<StorageInfoCollector> collectors = Lists.newArrayList(); - - private static final ImmutableMap<Class, StorageInfoEnum> collectorType = ImmutableMap - .<Class, StorageInfoEnum> builder().put(GarbageStorageCollector.class, StorageInfoEnum.GARBAGE_STORAGE) - .put(TotalStorageCollector.class, StorageInfoEnum.TOTAL_STORAGE) - .put(StorageQuotaCollector.class, StorageInfoEnum.STORAGE_QUOTA).build(); - - public ProjectStorageInfoCollector(List<StorageInfoEnum> storageInfoList) { - if (CollectionUtils.isNotEmpty(storageInfoList)) { - storageInfoList.forEach(si -> addCollectors(si)); - } - } - - private void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { - for (StorageInfoCollector collector : collectors) { - try { - collector.collect(config, project, storageVolumeInfo); - } catch (Exception e) { - storageVolumeInfo.getThrowableMap().put(collectorType.get(collector.getClass()), e); - } - } - } - - private void addCollectors(StorageInfoEnum storageInfoEnum) { - switch (storageInfoEnum) { - case GARBAGE_STORAGE: - collectors.add(new GarbageStorageCollector()); - break; - case TOTAL_STORAGE: - collectors.add(new TotalStorageCollector()); - break; - case STORAGE_QUOTA: - collectors.add(new StorageQuotaCollector()); - break; - default: - break; - } - } - - public StorageVolumeInfo getStorageVolumeInfo(KylinConfig config, String project) { - StorageVolumeInfo storageVolumeInfo = new StorageVolumeInfo(); - if (StringUtils.isBlank(project) || CollectionUtils.isEmpty(collectors)) { - return storageVolumeInfo; - } - collect(config, project, storageVolumeInfo); - return storageVolumeInfo; - } -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java deleted file mode 100644 index 4218244b06..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.cube.storage; - -import java.io.IOException; - -import org.apache.kylin.common.KylinConfig; - -public interface StorageInfoCollector { - - void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException; - -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java deleted file mode 100644 index ac29378738..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.cube.storage; - -public enum StorageInfoEnum { - GARBAGE_STORAGE, TOTAL_STORAGE, STORAGE_QUOTA -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java deleted file mode 100644 index 951f896ef6..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.cube.storage; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.annotation.Clarification; -import org.apache.kylin.metadata.project.NProjectManager; - -@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") -public class StorageQuotaCollector implements StorageInfoCollector { - - @Override - public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { - config = NProjectManager.getInstance(config).getProject(project).getConfig(); - long quotaSize = config.getStorageQuotaSize(); - storageVolumeInfo.setStorageQuotaSize(quotaSize); - } - -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java deleted file mode 100644 index d77b18f18f..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kylin.metadata.cube.storage; - -import java.util.Map; -import java.util.Set; - -import com.google.common.collect.Maps; - -import lombok.Getter; -import lombok.Setter; -import org.apache.kylin.common.annotation.Clarification; - -@Getter -@Setter -@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") -public class StorageVolumeInfo { - private long storageQuotaSize = -1L; - private long totalStorageSize = -1L; - private long garbageStorageSize = -1L; - private Map<String, Set<Long>> garbageModelIndexMap = Maps.newHashMap(); - private Map<StorageInfoEnum, Throwable> throwableMap = Maps.newHashMap(); -} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java deleted file mode 100644 index e5101a9f1a..0000000000 --- a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.metadata.cube.storage; - -import java.io.IOException; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.HadoopUtil; - -public class TotalStorageCollector implements StorageInfoCollector { - - @Override - public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException { - String strPath = config.getWorkingDirectoryWithConfiguredFs(project); - Path path = new Path(strPath); - FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); - long totalStorageSize = 0L; - if (fs.exists(path)) { - totalStorageSize = HadoopUtil.getContentSummary(fs, path).getLength(); - } - storageVolumeInfo.setTotalStorageSize(totalStorageSize); - } - -}