This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new c8d73369e4 KYLIN-5217 Create a initial commit c8d73369e4 is described below commit c8d73369e47013a564b263420d4250d87246e7f6 Author: junqing.cai <junqing....@kyligence.io> AuthorDate: Tue Aug 9 14:27:42 2022 +0800 KYLIN-5217 Create a initial commit --- .../apache/kylin/job/dao/ExecutableOutputPO.java | 149 ++++++++++++ .../org/apache/kylin/job/dao/ExecutablePO.java | 152 ++++++++++++ .../org/apache/kylin/job/dao/JobStatistics.java | 77 ++++++ .../apache/kylin/job/dao/JobStatisticsBasic.java | 53 +++++ .../apache/kylin/job/dao/JobStatisticsManager.java | 263 +++++++++++++++++++++ .../org/apache/kylin/job/dao/NExecutableDao.java | 133 +++++++++++ .../runners/AbstractDefaultSchedulerRunner.java | 93 ++++++++ .../apache/kylin/job/runners/FetcherRunner.java | 241 +++++++++++++++++++ .../kylin/job/runners/ForkBasedJobRunner.java | 48 ++++ .../kylin/job/runners/InMemoryJobRunner.java | 36 +++ .../apache/kylin/job/runners/JobCheckRunner.java | 90 +++++++ .../org/apache/kylin/job/runners/JobRunner.java | 76 ++++++ .../apache/kylin/job/runners/JobRunnerFactory.java | 185 +++++++++++++++ .../job/runners/LicenseCapacityCheckRunner.java | 63 +++++ .../kylin/job/runners/QuotaStorageCheckRunner.java | 77 ++++++ .../cube/storage/GarbageStorageCollector.java | 87 +++++++ .../cube/storage/ProjectStorageInfoCollector.java | 79 +++++++ .../cube/storage/StorageInfoCollector.java | 29 +++ .../metadata/cube/storage/StorageInfoEnum.java | 23 ++ .../cube/storage/StorageQuotaCollector.java | 35 +++ .../metadata/cube/storage/StorageVolumeInfo.java | 38 +++ .../cube/storage/TotalStorageCollector.java | 42 ++++ 22 files changed, 2069 insertions(+) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java new file mode 100644 index 0000000000..a3b69ab095 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import java.io.InputStream; +import java.io.Serializable; +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +import lombok.Getter; +import lombok.Setter; + +/** + * + */ +@Setter +@Getter +@SuppressWarnings("serial") +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class ExecutableOutputPO implements Serializable { + + @JsonIgnore + private InputStream contentStream; + + @JsonProperty("content") + private String content; + + @JsonProperty("log_path") + private String logPath; + + @JsonProperty("status") + private String status = "READY"; + + @JsonProperty("info") + private Map<String, String> info = Maps.newHashMap(); + + @JsonProperty("last_modified") + private long lastModified; + + @JsonProperty("createTime") + private long createTime = System.currentTimeMillis(); + + @JsonProperty("start_time") + private long startTime; + + @JsonProperty("end_time") + private long endTime; + + @JsonProperty("wait_time") + private long waitTime; + + @JsonProperty("duration") + private long duration; + + @JsonProperty("last_running_start_time") + private long lastRunningStartTime; + + @JsonProperty("is_resumable") + private boolean resumable = false; + + @JsonProperty("byte_size") + private long byteSize; + + @JsonProperty("failed_msg") + private String failedMsg; + + @JsonProperty("failed_step_id") + private String failedStepId; + + @JsonProperty("failed_segment_id") + private String failedSegmentId; + + @JsonProperty("failed_stack") + private String failedStack; + + @JsonProperty("failed_reason") + private String failedReason; + + public void addStartTime(long time) { + //when ready -> suicidal/discarded, the start time is 0 + if (startTime == 0) { + startTime = time; + } + endTime = 0; + } + + public void addEndTime(long time) { + Preconditions.checkArgument(startTime > 0L); + endTime = time; + } + + public void addDuration(long time) { + if (time != 0 && time > lastRunningStartTime) { + duration = duration + time - lastRunningStartTime; + } + } + + public void addLastRunningStartTime(long time) { + lastRunningStartTime = time; + } + + public void resetTime() { + createTime = System.currentTimeMillis(); + startTime = 0L; + endTime = 0L; + waitTime = 0L; + duration = 0L; + lastRunningStartTime = 0L; + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java new file mode 100644 index 0000000000..b396efd047 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_IDS_DELIMITER; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.JobTypeEnum; +import org.apache.kylin.metadata.cube.model.NDataSegment; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import lombok.Getter; +import lombok.Setter; + +/** + */ +@Setter +@Getter +@SuppressWarnings("serial") +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) +public class ExecutablePO extends RootPersistentEntity { + public static final int HIGHEST_PRIORITY = 0; + public static final int DEFAULT_PRIORITY = 3; + public static final int LOWEST_PRIORITY = 4; + @JsonProperty("name") + private String name; + + @JsonProperty("tasks") + private List<ExecutablePO> tasks; + + @JsonProperty("type") + private String type; + + @JsonProperty("handler_type") + private String handlerType; + + @JsonProperty("params") + private Map<String, String> params = Maps.newHashMap(); + + @JsonProperty("segments") + private Set<NDataSegment> segments = Sets.newHashSet(); + + @JsonProperty("job_type") + private JobTypeEnum jobType; + + @JsonProperty("data_range_start") + private long dataRangeStart; + + @JsonProperty("data_range_end") + private long dataRangeEnd; + + @JsonProperty("target_model") + private String targetModel; + + @JsonProperty("target_segments") + private List<String> targetSegments; + + @JsonProperty("output") + private ExecutableOutputPO output = new ExecutableOutputPO(); + + private String project; + + @JsonProperty("target_partitions") + private Set<Long> targetPartitions = Sets.newHashSet(); + + @JsonProperty("priority") + private int priority = DEFAULT_PRIORITY; + + @JsonProperty("tag") + private Object tag; + + @JsonProperty("stages_map") + private Map<String, List<ExecutablePO>> stagesMap; + + public void setPriority(int p) { + priority = isPriorityValid(p) ? p : DEFAULT_PRIORITY; + } + + @Override + public String getResourcePath() { + return concatResourcePath(getUuid(), project); + } + + public static String concatResourcePath(String name, String project) { + return new StringBuilder().append("/").append(project).append(ResourceStore.EXECUTABLE_JOB).append("/") + .append(name).toString(); + } + + public static boolean isPriorityValid(int priority) { + return priority >= HIGHEST_PRIORITY && priority <= LOWEST_PRIORITY; + } + + public static boolean isHigherPriority(int p1, int p2) { + return p1 < p2; + } + + public void addYarnApplicationJob(String appId) { + String oldAppIds = output.getInfo().getOrDefault(ExecutableConstants.YARN_APP_IDS, ""); + Set<String> appIds = new HashSet<>(Arrays.asList(oldAppIds.split(YARN_APP_IDS_DELIMITER))); + if (!appIds.contains(appId)) { + String newAppIds = oldAppIds + (StringUtils.isEmpty(oldAppIds) ? "" : YARN_APP_IDS_DELIMITER) + appId; + output.getInfo().put(ExecutableConstants.YARN_APP_IDS, newAppIds); + } + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java new file mode 100644 index 0000000000..0b1928fbbf --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatistics.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import java.util.Map; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Maps; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +public class JobStatistics extends JobStatisticsBasic { + + @JsonProperty("date") + private long date; + @JsonProperty("model_stats") + private Map<String, JobStatisticsBasic> jobStatisticsByModels = Maps.newHashMap(); + + @Override + public String resourceName() { + return String.valueOf(date); + } + + public JobStatistics(long date, String model, long duration, long byteSize) { + this.date = date; + setCount(1); + setTotalDuration(duration); + setTotalByteSize(byteSize); + jobStatisticsByModels.put(model, new JobStatisticsBasic(duration, byteSize)); + } + + public JobStatistics(int count, long totalDuration, long totalByteSize) { + setCount(count); + setTotalDuration(totalDuration); + setTotalByteSize(totalByteSize); + } + + public JobStatistics(long date, long totalDuration, long totalByteSize) { + this.date = date; + setCount(1); + setTotalDuration(totalDuration); + setTotalByteSize(totalByteSize); + } + + public void update(String model, long duration, long byteSize, int deltaCount) { + super.update(duration, byteSize, deltaCount); + JobStatisticsBasic jobStatisticsByModel = jobStatisticsByModels.get(model); + if (jobStatisticsByModel == null) { + jobStatisticsByModel = new JobStatisticsBasic(duration, byteSize); + } else { + jobStatisticsByModel.update(duration, byteSize, deltaCount); + } + + jobStatisticsByModels.put(model, jobStatisticsByModel); + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java new file mode 100644 index 0000000000..c05743c3cf --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsBasic.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import org.apache.kylin.common.persistence.RootPersistentEntity; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class JobStatisticsBasic extends RootPersistentEntity { + @JsonProperty("count") + private int count; + @JsonProperty("total_duration") + private long totalDuration; + @JsonProperty("total_byte_size") + private long totalByteSize; + + public void update(long duration, long byteSize, int deltaCount) { + this.count += deltaCount; + this.totalDuration += duration; + this.totalByteSize += byteSize; + } + + public JobStatisticsBasic(long totalDuration, long totalByteSize) { + this.count = 1; + this.totalDuration = totalDuration; + this.totalByteSize = totalByteSize; + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java new file mode 100644 index 0000000000..2f0a65bfd3 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/JobStatisticsManager.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import java.time.DayOfWeek; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.TemporalAdjusters; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; +import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.model.NDataModelManager; + +import com.google.common.collect.Maps; + +public class JobStatisticsManager { + + public static JobStatisticsManager getInstance(KylinConfig config, String project) { + return config.getManager(project, JobStatisticsManager.class); + } + + // called by reflection + static JobStatisticsManager newInstance(KylinConfig conf, String project) { + try { + String cls = JobStatisticsManager.class.getName(); + Class<? extends JobStatisticsManager> clz = ClassUtil.forName(cls, JobStatisticsManager.class); + return clz.getConstructor(KylinConfig.class, String.class).newInstance(conf, project); + } catch (Exception e) { + throw new RuntimeException("Failed to init DataModelManager from " + conf, e); + } + } + + // ============================================================================ + + private KylinConfig config; + private String project; + + private CachedCrudAssist<JobStatistics> crud; + + public JobStatisticsManager(KylinConfig config, String project) { + init(config, project); + } + + protected void init(KylinConfig cfg, final String project) { + this.config = cfg; + this.project = project; + final ResourceStore store = ResourceStore.getKylinMetaStore(this.config); + String resourceRootPath = "/" + this.project + ResourceStore.JOB_STATISTICS; + this.crud = new CachedCrudAssist<JobStatistics>(store, resourceRootPath, JobStatistics.class) { + @Override + protected JobStatistics initEntityAfterReload(JobStatistics jobStatistics, String resourceName) { + return jobStatistics; + } + }; + } + + public List<JobStatistics> getAll() { + return crud.listAll(); + } + + public JobStatistics updateStatistics(long date, String model, long duration, long byteSize, int deltaCount) { + JobStatistics jobStatistics = crud.get(String.valueOf(date)); + JobStatistics jobStatisticsToUpdate; + if (jobStatistics == null) { + jobStatisticsToUpdate = new JobStatistics(date, model, duration, byteSize); + return crud.save(jobStatisticsToUpdate); + } + + jobStatisticsToUpdate = crud.copyForWrite(jobStatistics); + jobStatisticsToUpdate.update(model, duration, byteSize, deltaCount); + return crud.save(jobStatisticsToUpdate); + } + + public JobStatistics updateStatistics(long date, long duration, long byteSize, int deltaCount) { + JobStatistics jobStatistics = crud.get(String.valueOf(date)); + JobStatistics jobStatisticsToUpdate; + if (jobStatistics == null) { + jobStatisticsToUpdate = new JobStatistics(date, duration, byteSize); + return crud.save(jobStatisticsToUpdate); + } + + jobStatisticsToUpdate = crud.copyForWrite(jobStatistics); + jobStatisticsToUpdate.update(duration, byteSize, deltaCount); + return crud.save(jobStatisticsToUpdate); + } + + public Pair<Integer, JobStatistics> getOverallJobStats(final long startTime, final long endTime) { + // filter + List<JobStatistics> filteredJobStats = getFilteredJobStatsByTime(crud.listAll(), startTime, endTime); + // aggregate all job stats + JobStatistics aggregatedStats = aggregateJobStats(filteredJobStats); + + return new Pair<>(aggregatedStats.getCount(), aggregatedStats); + } + + public Map<String, Integer> getJobCountByTime(final long startTime, final long endTime, + final String timeDimension) { + Map<String, Integer> result = Maps.newHashMap(); + aggregateJobStatsByTime(startTime, endTime, timeDimension).forEach((key, value) -> { + result.put(key, value.getCount()); + }); + return result; + } + + public Map<String, Integer> getJobCountByModel(long startTime, long endTime) { + Map<String, Integer> result = Maps.newHashMap(); + + aggregateStatsByModel(startTime, endTime).forEach((modelName, value) -> { + String modelAlias = getModelAlias(modelName); + if (modelAlias == null) + return; + result.put(modelAlias, value.getCount()); + }); + + return result; + } + + public Map<String, Double> getDurationPerByteByTime(final long startTime, final long endTime, + final String timeDimension) { + Map<String, JobStatisticsBasic> aggregateResult = aggregateJobStatsByTime(startTime, endTime, timeDimension); + return calculateDurationPerByte(aggregateResult); + } + + public Map<String, Double> getDurationPerByteByModel(long startTime, long endTime) { + Map<String, JobStatisticsBasic> transformedResult = Maps.newHashMap(); + + aggregateStatsByModel(startTime, endTime).forEach((modelName, value) -> { + String modelAlias = getModelAlias(modelName); + if (modelAlias == null) + return; + transformedResult.put(modelAlias, + new JobStatisticsBasic(value.getTotalDuration(), value.getTotalByteSize())); + }); + + return calculateDurationPerByte(transformedResult); + } + + private String getModelAlias(String modelId) { + NDataModelManager dataModelManager = NDataModelManager.getInstance(config, project); + NDataModel model = dataModelManager.getDataModelDesc(modelId); + if (model == null) + return null; + + return model.getAlias(); + } + + private JobStatistics aggregateJobStats(List<JobStatistics> jobStatisticsToAggregate) { + return jobStatisticsToAggregate.stream() + .reduce((x, y) -> new JobStatistics(x.getCount() + y.getCount(), + x.getTotalDuration() + y.getTotalDuration(), x.getTotalByteSize() + y.getTotalByteSize())) + .orElse(new JobStatistics()); + } + + // key is the date, value is the aggregated job stats + private Map<String, JobStatisticsBasic> aggregateJobStatsByTime(final long startTime, final long endTime, + final String timeDimension) { + Map<String, JobStatisticsBasic> result = Maps.newHashMap(); + + List<JobStatistics> qulifiedJobStats = getFilteredJobStatsByTime(crud.listAll(), startTime, endTime); + + long startDate = startTime; + while (startDate <= endTime) { + long nextDate = nextDate(startDate, timeDimension); + List<JobStatistics> list = getFilteredJobStatsByTime(qulifiedJobStats, startDate, nextDate); + result.put(formatDateTime(startDate), aggregateJobStats(list)); + startDate = nextDate; + } + + return result; + } + + // format epoch time to date string + private String formatDateTime(long time) { + ZoneId zoneId = TimeZone.getDefault().toZoneId(); + LocalDateTime localDateTime = Instant.ofEpochMilli(time).atZone(zoneId).toLocalDateTime(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd", + Locale.getDefault(Locale.Category.FORMAT)); + return localDateTime.format(formatter); + } + + private long nextDate(final long date, final String timeDimension) { + ZoneId zoneId = TimeZone.getTimeZone(config.getTimeZone()).toZoneId(); + LocalDate localDate = Instant.ofEpochMilli(date).atZone(zoneId).toLocalDate(); + switch (timeDimension) { + case "day": + localDate = localDate.plusDays(1); + break; + case "week": + localDate = localDate.with(TemporalAdjusters.next(DayOfWeek.MONDAY)); + break; + case "month": + localDate = localDate.with(TemporalAdjusters.firstDayOfNextMonth()); + break; + default: + localDate = localDate.plusDays(1); + break; + } + + return localDate.atStartOfDay(zoneId).toInstant().toEpochMilli(); + } + + private Map<String, JobStatisticsBasic> aggregateStatsByModel(long startTime, long endTime) { + return getFilteredJobStatsByTime(crud.listAll(), startTime, endTime).stream() + .map(JobStatistics::getJobStatisticsByModels).reduce((x, y) -> { + // merge two maps + Map<String, JobStatisticsBasic> mergedMap = Maps.newHashMap(x); + y.forEach((k, v) -> mergedMap.merge(k, v, + (value1, value2) -> new JobStatisticsBasic(value1.getCount() + value2.getCount(), + value1.getTotalDuration() + value2.getTotalDuration(), + value1.getTotalByteSize() + value2.getTotalByteSize()))); + return mergedMap; + }).orElse(Maps.newHashMap()); + } + + private List<JobStatistics> getFilteredJobStatsByTime(final List<JobStatistics> list, final long startTime, + final long endTime) { + return list.stream() + .filter(singleStats -> singleStats.getDate() >= startTime && singleStats.getDate() < endTime) + .collect(Collectors.toList()); + } + + private Map<String, Double> calculateDurationPerByte(Map<String, JobStatisticsBasic> totalMetricMap) { + Map<String, Double> result = Maps.newHashMap(); + for (Map.Entry<String, JobStatisticsBasic> entry : totalMetricMap.entrySet()) { + double totalDuration = entry.getValue().getTotalDuration(); + double totalByteSize = entry.getValue().getTotalByteSize(); + if (totalByteSize == 0) + result.put(entry.getKey(), .0); + else { + result.put(entry.getKey(), totalDuration / totalByteSize); + } + } + return result; + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java new file mode 100644 index 0000000000..5c353d989c --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.dao; + +import java.util.Comparator; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.RootPersistentEntity; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.metadata.cachesync.CachedCrudAssist; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import lombok.val; + +/** + */ +public class NExecutableDao { + + private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<>(ExecutablePO.class); + private static final Logger logger = LoggerFactory.getLogger(NExecutableDao.class); + + public static NExecutableDao getInstance(KylinConfig config, String project) { + return config.getManager(project, NExecutableDao.class); + } + + // called by reflection + static NExecutableDao newInstance(KylinConfig config, String project) { + return new NExecutableDao(config, project); + } + // ============================================================================ + + private String project; + + private KylinConfig config; + + private CachedCrudAssist<ExecutablePO> crud; + + private NExecutableDao(KylinConfig config, String project) { + logger.trace("Using metadata url: {}", config); + this.project = project; + this.config = config; + String resourceRootPath = "/" + project + ResourceStore.EXECUTE_RESOURCE_ROOT; + this.crud = new CachedCrudAssist<ExecutablePO>(getStore(), resourceRootPath, "", ExecutablePO.class) { + @Override + protected ExecutablePO initEntityAfterReload(ExecutablePO entity, String resourceName) { + entity.setProject(project); + return entity; + } + }; + } + + public List<ExecutablePO> getJobs() { + return crud.listAll(); + } + + public List<ExecutablePO> getPartialJobs(Predicate<String> predicate) { + return crud.listPartial(predicate); + } + + public List<ExecutablePO> getJobs(long timeStart, long timeEndExclusive) { + return crud.listAll().stream() + .filter(x -> x.getLastModified() >= timeStart && x.getLastModified() < timeEndExclusive) + .collect(Collectors.toList()); + } + + public List<String> getJobIds() { + return crud.listAll().stream().sorted(Comparator.comparing(ExecutablePO::getCreateTime)) + .sorted(Comparator.comparing(ExecutablePO::getPriority)).map(RootPersistentEntity::resourceName) + .collect(Collectors.toList()); + } + + public ExecutablePO getJobByUuid(String uuid) { + return crud.get(uuid); + } + + public ExecutablePO addJob(ExecutablePO job) { + if (getJobByUuid(job.getUuid()) != null) { + throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists"); + } + crud.save(job); + return job; + } + + // for ut + @VisibleForTesting + public void deleteAllJob() { + getJobs().forEach(job -> deleteJob(job.getId())); + } + + public void deleteJob(String uuid) { + crud.delete(uuid); + } + + public void updateJob(String uuid, Predicate<ExecutablePO> updater) { + val job = getJobByUuid(uuid); + Preconditions.checkNotNull(job); + val copyForWrite = JsonUtil.copyBySerialization(job, JOB_SERIALIZER, null); + if (updater.test(copyForWrite)) { + crud.save(copyForWrite); + } + } + + private ResourceStore getStore() { + return ResourceStore.getKylinMetaStore(config); + } + +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java new file mode 100644 index 0000000000..a8c6f3c973 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/AbstractDefaultSchedulerRunner.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.logging.SetLogCategory; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.NExecutableManager; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.epoch.EpochManager; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.SneakyThrows; + +public abstract class AbstractDefaultSchedulerRunner implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(AbstractDefaultSchedulerRunner.class); + protected final NDefaultScheduler nDefaultScheduler; + + protected final ExecutableContext context; + + protected final String project; + + public AbstractDefaultSchedulerRunner(final NDefaultScheduler nDefaultScheduler) { + this.nDefaultScheduler = nDefaultScheduler; + this.context = nDefaultScheduler.getContext(); + this.project = nDefaultScheduler.getProject(); + } + + @SneakyThrows + private boolean checkEpochIdFailed() { + //check failed if isInterrupted + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException(); + } + + if (!KylinConfig.getInstanceFromEnv().isUTEnv() + && !EpochManager.getInstance().checkEpochId(context.getEpochId(), project)) { + nDefaultScheduler.forceShutdown(); + return true; + } + return false; + } + + // stop job if Storage Quota Limit reached + protected void stopJobIfSQLReached(String jobId) { + if (context.isReachQuotaLimit()) { + try { + EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + if (context.isReachQuotaLimit()) { + NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).pauseJob(jobId); + logger.info("Job {} paused due to no available storage quota.", jobId); + logger.info("Please clean up low-efficient storage in time, " + + "increase the low-efficient storage threshold, " + + "or notify the administrator to increase the storage quota for this project."); + } + return null; + }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); + } catch (Exception e) { + logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} failed to pause", project, jobId, e); + } + } + } + + @Override + public void run() { + try (SetLogCategory ignored = new SetLogCategory("schedule")) { + if (checkEpochIdFailed()) { + return; + } + doRun(); + } + } + + protected abstract void doRun(); +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java new file mode 100644 index 0000000000..c9bb123a45 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.NExecutableManager; +import org.apache.kylin.job.execution.Output; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.val; + +public class FetcherRunner extends AbstractDefaultSchedulerRunner { + + private static final Logger logger = LoggerFactory.getLogger(FetcherRunner.class); + + private final ExecutorService jobPool; + + private final ScheduledExecutorService fetcherPool; + + public FetcherRunner(NDefaultScheduler nDefaultScheduler, ExecutorService jobPool, + ScheduledExecutorService fetcherPool) { + super(nDefaultScheduler); + this.jobPool = jobPool; + this.fetcherPool = fetcherPool; + } + + private boolean checkSuicide(String jobId) { + val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + if (executableManager.getJob(jobId).getStatus().isFinalState()) { + return false; + } + return executableManager.getJob(jobId).checkSuicide(); + } + + private boolean markSuicideJob(String jobId) { + try { + if (checkSuicide(jobId)) { + return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + if (checkSuicide(jobId)) { + NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).suicideJob(jobId); + return true; + } + return false; + }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); + } + } catch (Exception e) { + logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should be suicidal but discard failed", project, + jobId, e); + } + return false; + } + + private boolean markErrorJob(String jobId) { + try { + return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + val manager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + manager.errorJob(jobId); + return true; + }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); + } catch (Exception e) { + logger.warn("[UNEXPECTED_THINGS_HAPPENED] project {} job {} should be error but mark failed", project, + jobId, e); + } + return false; + } + + @Override + public void doRun() { + try { + val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + Map<String, Executable> runningJobs = context.getRunningJobs(); + + int nRunning = 0; + int nReady = 0; + int nStopped = 0; + int nOthers = 0; + int nError = 0; + int nDiscarded = 0; + int nSucceed = 0; + int nSuicidal = 0; + for (final String id : executableManager.getJobs()) { + if (markSuicideJob(id)) { + nSuicidal++; + continue; + } + + if (runningJobs.containsKey(id)) { + + // this is very important to prevent from same job being scheduled at same time. + // e.g. when a job is restarted, the old job may still be running (even if we tried to interrupt it) + // until the old job is finished, the new job should not start + nRunning++; + continue; + } + + final Output output = executableManager.getOutput(id); + + switch (output.getState()) { + case READY: + nReady++; + if (isJobPoolFull()) { + break; + } + + if (context.isReachQuotaLimit()) { + stopJobIfSQLReached(id); + break; + } + + logger.info("fetcher schedule {} ", id); + scheduleJob(id); + break; + case DISCARDED: + nDiscarded++; + break; + case ERROR: + nError++; + break; + case SUCCEED: + nSucceed++; + break; + case PAUSED: + nStopped++; + break; + case SUICIDAL: + nSuicidal++; + break; + default: + if (allSubTasksSuccess(id)) { + logger.info("All sub tasks are successful, reschedule job {}", id); + scheduleJob(id); + break; + } + logger.warn("Unexpected status for {} <{}>", id, output.getState()); + if (markErrorJob(id)) { + nError++; + } else { + nOthers++; + } + break; + } + } + + logger.info( + "Job Status in project {}: {} should running, {} actual running, {} stopped, {} ready, {} already succeed, {} error, {} discarded, {} suicidal, {} others", + project, nRunning, runningJobs.size(), nStopped, nReady, nSucceed, nError, nDiscarded, nSuicidal, + nOthers); + } catch (Exception e) { + logger.warn("Job Fetcher caught a exception ", e); + } + } + + private boolean allSubTasksSuccess(String id) { + val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + + // check special case, all sub task success, show make current job to success + AbstractExecutable job = executableManager.getJob(id); + if (job instanceof DefaultChainedExecutable) { + return ((DefaultChainedExecutable) job).getTasks().stream() + .allMatch(abstractExecutable -> abstractExecutable.getStatus() == ExecutableState.SUCCEED); + } + + return false; + } + + private void scheduleJob(String id) { + AbstractExecutable executable = null; + String jobDesc = null; + + boolean memoryLock = false; + int useMemoryCapacity = 0; + try { + val config = KylinConfig.getInstanceFromEnv(); + val executableManager = NExecutableManager.getInstance(config, project); + executable = executableManager.getJob(id); + if (!config.getDeployMode().equals("cluster")) { + useMemoryCapacity = executable.computeStepDriverMemory(); + } + memoryLock = NDefaultScheduler.getMemoryRemaining().tryAcquire(useMemoryCapacity); + if (memoryLock) { + jobDesc = executable.toString(); + logger.info("{} prepare to schedule", jobDesc); + context.addRunningJob(executable); + jobPool.execute(new JobRunner(nDefaultScheduler, executable, this)); + logger.info("{} scheduled", jobDesc); + } else { + logger.info("memory is not enough, remaining: {} MB", + NDefaultScheduler.getMemoryRemaining().availablePermits()); + } + } catch (Exception ex) { + if (executable != null) { + context.removeRunningJob(executable); + if (memoryLock) { + // may release twice when exception raise after jobPool execute executable + NDefaultScheduler.getMemoryRemaining().release(useMemoryCapacity); + } + } + logger.warn("{} fail to schedule", jobDesc, ex); + } + } + + private boolean isJobPoolFull() { + if (context.getRunningJobs().size() >= nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) { + logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time."); + return true; + } + return false; + } + + void scheduleNext() { + fetcherPool.schedule(this, 0, TimeUnit.SECONDS); + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java new file mode 100644 index 0000000000..fc29a5ebb4 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/ForkBasedJobRunner.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.common.util.CliCommandExecutor; +import org.apache.kylin.common.util.ExecutableApplication; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ForkBasedJobRunner extends JobRunnerFactory.AbstractJobRunner { + + private final CliCommandExecutor cliExecutor = new CliCommandExecutor(); + + public ForkBasedJobRunner(KylinConfig kylinConfig, String project, List<String> originResources) { + super(kylinConfig, project, originResources); + } + + @Override + protected void doExecute(ExecutableApplication app, Map<String, String> args) throws Exception { + String finalCommand = String.format(Locale.ROOT, "bash -x %s/sbin/bootstrap.sh %s %s 2>>%s", + KylinConfig.getKylinHome(), app.getClass().getName(), formatArgs(args), getJobTmpDir() + "/job.log"); + log.info("Try to execute {}", finalCommand); + cliExecutor.execute(finalCommand, new BufferedLogger(log), jobId); + } + +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java new file mode 100644 index 0000000000..119c3313e3 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/InMemoryJobRunner.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ExecutableApplication; + +public class InMemoryJobRunner extends JobRunnerFactory.AbstractJobRunner { + + public InMemoryJobRunner(KylinConfig config, String project, List<String> resources) { + super(config, project, resources); + } + + @Override + public void doExecute(ExecutableApplication app, Map<String, String> args) throws Exception { + app.execute(formatArgs(args).split(" ")); + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java new file mode 100644 index 0000000000..195a244900 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobCheckRunner.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.NExecutableManager; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.val; + +public class JobCheckRunner extends AbstractDefaultSchedulerRunner { + + private static final Logger logger = LoggerFactory.getLogger(JobCheckRunner.class); + + public JobCheckRunner(NDefaultScheduler nDefaultScheduler) { + super(nDefaultScheduler); + } + + private boolean checkTimeoutIfNeeded(String jobId, Long startTime) { + Integer timeOutMinute = KylinConfig.getInstanceFromEnv().getSchedulerJobTimeOutMinute(); + if (timeOutMinute == 0) { + return false; + } + val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + if (executableManager.getJob(jobId).getStatus().isFinalState()) { + return false; + } + long duration = System.currentTimeMillis() - startTime; + long durationMins = Math.toIntExact(duration / (60 * 1000)); + return durationMins >= timeOutMinute; + } + + private boolean discardTimeoutJob(String jobId, Long startTime) { + try { + if (checkTimeoutIfNeeded(jobId, startTime)) { + return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + if (checkTimeoutIfNeeded(jobId, startTime)) { + logger.error("project {} job {} running timeout.", project, jobId); + NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project).errorJob(jobId); + return true; + } + return false; + }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), jobId); + } + } catch (Exception e) { + logger.warn("[UNEXPECTED_THINGS_HAPPENED] project " + project + " job " + jobId + + " should be timeout but discard failed", e); + } + return false; + } + + @Override + protected void doRun() { + + logger.info("start check project {} job pool.", project); + + val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + Map<String, Executable> runningJobs = context.getRunningJobs(); + Map<String, Long> runningJobInfos = context.getRunningJobInfos(); + for (final String id : executableManager.getJobs()) { + if (runningJobs.containsKey(id)) { + discardTimeoutJob(id, runningJobInfos.get(id)); + stopJobIfSQLReached(id); + } + } + + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java new file mode 100644 index 0000000000..3e839f57f4 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunner.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.SetThreadName; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException; +import org.apache.kylin.job.exception.JobStoppedVoluntarilyException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.common.logging.SetLogCategory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.val; +import lombok.var; + +public class JobRunner extends AbstractDefaultSchedulerRunner { + private static final Logger logger = LoggerFactory.getLogger(JobRunner.class); + + private final AbstractExecutable executable; + + private final FetcherRunner fetcherRunner; + + public JobRunner(NDefaultScheduler nDefaultScheduler, AbstractExecutable executable, FetcherRunner fetcherRunner) { + super(nDefaultScheduler); + this.fetcherRunner = fetcherRunner; + this.executable = executable; + } + + @Override + protected void doRun() { + //only the first 8 chars of the job uuid + val jobIdSimple = executable.getId().substring(0, 8); + try (SetThreadName ignored = new SetThreadName("JobWorker(project:%s,jobid:%s)", project, jobIdSimple); + SetLogCategory logCategory = new SetLogCategory("schedule")) { + executable.execute(context); + // trigger the next step asap + fetcherRunner.scheduleNext(); + } catch (JobStoppedVoluntarilyException | JobStoppedNonVoluntarilyException e) { + logger.info("Job quits either voluntarily or non-voluntarily,job: {}", jobIdSimple, e); + } catch (ExecuteException e) { + logger.error("ExecuteException occurred while job: " + executable.getId(), e); + } catch (Exception e) { + logger.error("unknown error execute job: " + executable.getId(), e); + } finally { + context.removeRunningJob(executable); + val config = KylinConfig.getInstanceFromEnv(); + var usingMemory = 0; + if (!config.getDeployMode().equals("cluster")) { + usingMemory = executable.computeStepDriverMemory(); + } + logger.info("Before job:{} global memory release {}", jobIdSimple, + NDefaultScheduler.getMemoryRemaining().availablePermits()); + NDefaultScheduler.getMemoryRemaining().release(usingMemory); + logger.info("After job:{} global memory release {}", jobIdSimple, + NDefaultScheduler.getMemoryRemaining().availablePermits()); + } + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java new file mode 100644 index 0000000000..0713558d35 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/JobRunnerFactory.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinConfigBase; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.ExecutableApplication; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.ZipFileUtils; +import org.apache.kylin.common.persistence.metadata.MetadataStore; +import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; + +import com.google.common.collect.Maps; + +import lombok.RequiredArgsConstructor; +import lombok.val; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class JobRunnerFactory { + + private JobRunnerFactory() { + // Just implement it + } + + public static AbstractJobRunner createRunner(KylinConfig config, String type, String project, + List<String> resources) { + switch (type) { + case "fork": + return new ForkBasedJobRunner(config, project, resources); + case "in-memory": + return new InMemoryJobRunner(config, project, resources); + default: + throw new NotImplementedException("Runner type " + type + " not implement"); + } + } + + @RequiredArgsConstructor + public abstract static class AbstractJobRunner { + + protected final KylinConfig kylinConfig; + protected final String project; + protected final List<String> originResources; + private Consumer<Properties> configUpdater; + + protected String metaDumpUrl; + protected String jobId; + + public String prepareEnv(String jobId) throws IOException { + this.jobId = jobId; + val jobTmpDir = getJobTmpDir(); + FileUtils.forceMkdir(new File(jobTmpDir)); + + metaDumpUrl = kylinConfig.getMetadataUrlPrefix() + "@hdfs,path=file://" + jobTmpDir + "/meta"; + return jobTmpDir; + } + + public void start(ExecutableApplication app, Map<String, String> args) throws Exception { + attachMetadataAndKylinProps(false); + args.put("meta", metaDumpUrl); + args.put("metaOutput", metaDumpUrl + "_output"); + doExecute(app, args); + uploadJobLog(); + } + + public void cleanupEnv() { + FileUtils.deleteQuietly(new File(getJobTmpDir())); + } + + protected abstract void doExecute(ExecutableApplication app, Map<String, String> args) throws Exception; + + protected String formatArgs(Map<String, String> args) { + return args.entrySet().stream().map(entry -> "--" + entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(" ")); + } + + public String getJobTmpDir() { + return KylinConfigBase.getKylinHome() + "/tmp/" + jobId; + } + + public void setConfigUpdater(Consumer<Properties> consumer) { + this.configUpdater = consumer; + } + + protected boolean uploadJobLog() { + val jobContentZip = getJobTmpDir() + ".zip"; + try { + ZipFileUtils.compressZipFile(getJobTmpDir(), jobContentZip); + val jobDir = kylinConfig.getJobTmpDir(project, true); + val fs = HadoopUtil.getFileSystem(jobDir); + + try (val in = new FileInputStream(new File(jobContentZip)); + val out = fs.create(new Path(jobDir + jobId + ".zip"), true)) { + IOUtils.copy(in, out); + } + return true; + } catch (IOException e) { + log.warn("Upload Job Evidence failed {}", jobId, e); + } finally { + FileUtils.deleteQuietly(new File(jobContentZip)); + } + return false; + } + + protected void attachMetadataAndKylinProps(boolean kylinPropsOnly) throws IOException { + if (StringUtils.isEmpty(metaDumpUrl)) { + throw new RuntimeException("Missing metaUrl"); + } + + File tmpDir = File.createTempFile("kylin_job_meta", ""); + try { + org.apache.commons.io.FileUtils.forceDelete(tmpDir); // we need a directory, so delete the file first + + Properties props = kylinConfig.exportToProperties(); + props.setProperty("kylin.metadata.url", metaDumpUrl); + if (configUpdater != null) { + configUpdater.accept(props); + } + + if (kylinPropsOnly) { + ResourceStore.dumpKylinProps(tmpDir, props); + } else { + // The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value. + Map<String, RawResource> dumpMap = EnhancedUnitOfWork.doInTransactionWithCheckAndRetry( + UnitOfWorkParams.<Map> builder().readonly(true).unitName(project).processor(() -> { + Map<String, RawResource> retMap = Maps.newHashMap(); + for (String resPath : originResources) { + ResourceStore resourceStore = ResourceStore.getKylinMetaStore(kylinConfig); + RawResource rawResource = resourceStore.getResource(resPath); + retMap.put(resPath, rawResource); + } + return retMap; + }).build()); + + if (Objects.isNull(dumpMap) || dumpMap.isEmpty()) { + return; + } + // dump metadata + ResourceStore.dumpResourceMaps(kylinConfig, tmpDir, dumpMap, props); + } + + // copy metadata to target metaUrl + KylinConfig dstConfig = KylinConfig.createKylinConfig(props); + MetadataStore.createMetadataStore(dstConfig).uploadFromFile(tmpDir); + // clean up + log.debug("Copied metadata to the target metaUrl, delete the temp dir: {}", tmpDir); + } finally { + FileUtils.forceDelete(tmpDir); + } + } + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java new file mode 100644 index 0000000000..1b51dd2c3b --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/LicenseCapacityCheckRunner.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import static org.apache.kylin.common.exception.CommonErrorCode.LICENSE_OVER_CAPACITY; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.annotation.Clarification; +import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.sourceusage.SourceUsageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import lombok.val; + +@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") +public class LicenseCapacityCheckRunner extends AbstractDefaultSchedulerRunner { + private static final Logger logger = LoggerFactory.getLogger(LicenseCapacityCheckRunner.class); + + public LicenseCapacityCheckRunner(NDefaultScheduler nDefaultScheduler) { + super(nDefaultScheduler); + } + + @Override + protected void doRun() { + logger.info("start check license capacity for project {}", project); + context.setLicenseOverCapacity(isLicenseOverCapacity()); + } + + private boolean isLicenseOverCapacity() { + val sourceUsageManager = SourceUsageManager.getInstance(KylinConfig.getInstanceFromEnv()); + + try { + sourceUsageManager.checkIsOverCapacity(project); + } catch (KylinException e) { + if (LICENSE_OVER_CAPACITY.toErrorCode() == e.getErrorCode()) { + logger.warn("Source usage over capacity, no job will be scheduled.", e); + return true; + } + } catch (Throwable e) { + logger.warn("Check source usage over capacity failed.", e); + } + + // not over capacity + return false; + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java new file mode 100644 index 0000000000..c7f3140671 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/QuotaStorageCheckRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.job.runners; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.annotation.Clarification; +import org.apache.kylin.job.impl.threadpool.NDefaultScheduler; +import org.apache.kylin.metadata.cube.storage.ProjectStorageInfoCollector; +import org.apache.kylin.metadata.cube.storage.StorageInfoEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +import lombok.val; +import lombok.var; + +@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") +public class QuotaStorageCheckRunner extends AbstractDefaultSchedulerRunner { + private static final Logger logger = LoggerFactory.getLogger(QuotaStorageCheckRunner.class); + + private final ProjectStorageInfoCollector collector; + + public QuotaStorageCheckRunner(NDefaultScheduler nDefaultScheduler) { + super(nDefaultScheduler); + + val storageInfoEnumList = Lists.newArrayList(StorageInfoEnum.STORAGE_QUOTA, StorageInfoEnum.TOTAL_STORAGE); + collector = new ProjectStorageInfoCollector(storageInfoEnumList); + } + + @Override + protected void doRun() { + logger.info("start check project {} storage quota.", nDefaultScheduler.getProject()); + context.setReachQuotaLimit(reachStorageQuota()); + } + + private boolean reachStorageQuota() { + var storageVolumeInfo = collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(), + nDefaultScheduler.getProject()); + var totalSize = storageVolumeInfo.getTotalStorageSize(); + int retryCount = 3; + while (retryCount-- > 0 && totalSize < 0) { + storageVolumeInfo = collector.getStorageVolumeInfo(KylinConfig.getInstanceFromEnv(), + nDefaultScheduler.getProject()); + totalSize = storageVolumeInfo.getTotalStorageSize(); + } + val storageQuotaSize = storageVolumeInfo.getStorageQuotaSize(); + if (totalSize < 0) { + logger.error( + "Project '{}' : an exception occurs when getting storage volume info, no job will be scheduled!!! The error info : {}", + nDefaultScheduler.getProject(), + storageVolumeInfo.getThrowableMap().get(StorageInfoEnum.TOTAL_STORAGE)); + return true; + } + if (totalSize >= storageQuotaSize) { + logger.info("Project '{}' reach storage quota, no job will be scheduled!!!", + nDefaultScheduler.getProject()); + return true; + } + return false; + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java new file mode 100644 index 0000000000..5fb6ecc8ff --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.storage; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.cube.model.NDataLayout; +import org.apache.kylin.metadata.cube.model.NDataSegment; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.optimization.IndexOptimizer; +import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory; +import org.apache.kylin.metadata.model.NDataModel; + +import com.google.common.collect.Maps; + +import lombok.val; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class GarbageStorageCollector implements StorageInfoCollector { + + @Override + public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { + Map<String, Set<Long>> garbageIndexMap = Maps.newHashMap(); + long storageSize = 0L; + + for (val model : getModels(project)) { + val dataflow = getDataflow(model).copy(); + + final IndexOptimizer indexOptimizer = IndexOptimizerFactory.getOptimizer(dataflow, false); + val garbageLayouts = indexOptimizer.getGarbageLayoutMap(dataflow).keySet(); + if (CollectionUtils.isNotEmpty(garbageLayouts)) { + storageSize += calculateLayoutSize(garbageLayouts, dataflow); + garbageIndexMap.put(model.getId(), garbageLayouts); + } + } + + storageVolumeInfo.setGarbageModelIndexMap(garbageIndexMap); + storageVolumeInfo.setGarbageStorageSize(storageSize); + } + + private List<NDataModel> getModels(String project) { + val dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + return dataflowManager.listUnderliningDataModels(); + } + + private NDataflow getDataflow(NDataModel model) { + val dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), model.getProject()); + return dataflowManager.getDataflow(model.getUuid()); + } + + private long calculateLayoutSize(Set<Long> cuboidLayoutIdSet, NDataflow dataflow) { + long cuboidLayoutSize = 0L; + for (NDataSegment segment : dataflow.getSegments(SegmentStatusEnum.READY, SegmentStatusEnum.WARNING)) { + for (Long cuboidLayoutId : cuboidLayoutIdSet) { + NDataLayout dataCuboid = segment.getSegDetails().getLayoutById(cuboidLayoutId); + if (dataCuboid != null) { + cuboidLayoutSize += dataCuboid.getByteSize(); + } + } + } + return cuboidLayoutSize; + } + +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java new file mode 100644 index 0000000000..6a5f4d7c56 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollector.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.storage; + +import java.util.List; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.KylinConfig; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +public class ProjectStorageInfoCollector { + + private List<StorageInfoCollector> collectors = Lists.newArrayList(); + + private static final ImmutableMap<Class, StorageInfoEnum> collectorType = ImmutableMap + .<Class, StorageInfoEnum> builder().put(GarbageStorageCollector.class, StorageInfoEnum.GARBAGE_STORAGE) + .put(TotalStorageCollector.class, StorageInfoEnum.TOTAL_STORAGE) + .put(StorageQuotaCollector.class, StorageInfoEnum.STORAGE_QUOTA).build(); + + public ProjectStorageInfoCollector(List<StorageInfoEnum> storageInfoList) { + if (CollectionUtils.isNotEmpty(storageInfoList)) { + storageInfoList.forEach(si -> addCollectors(si)); + } + } + + private void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { + for (StorageInfoCollector collector : collectors) { + try { + collector.collect(config, project, storageVolumeInfo); + } catch (Exception e) { + storageVolumeInfo.getThrowableMap().put(collectorType.get(collector.getClass()), e); + } + } + } + + private void addCollectors(StorageInfoEnum storageInfoEnum) { + switch (storageInfoEnum) { + case GARBAGE_STORAGE: + collectors.add(new GarbageStorageCollector()); + break; + case TOTAL_STORAGE: + collectors.add(new TotalStorageCollector()); + break; + case STORAGE_QUOTA: + collectors.add(new StorageQuotaCollector()); + break; + default: + break; + } + } + + public StorageVolumeInfo getStorageVolumeInfo(KylinConfig config, String project) { + StorageVolumeInfo storageVolumeInfo = new StorageVolumeInfo(); + if (StringUtils.isBlank(project) || CollectionUtils.isEmpty(collectors)) { + return storageVolumeInfo; + } + collect(config, project, storageVolumeInfo); + return storageVolumeInfo; + } +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java new file mode 100644 index 0000000000..4218244b06 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoCollector.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.storage; + +import java.io.IOException; + +import org.apache.kylin.common.KylinConfig; + +public interface StorageInfoCollector { + + void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException; + +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java new file mode 100644 index 0000000000..ac29378738 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageInfoEnum.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.storage; + +public enum StorageInfoEnum { + GARBAGE_STORAGE, TOTAL_STORAGE, STORAGE_QUOTA +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java new file mode 100644 index 0000000000..951f896ef6 --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageQuotaCollector.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.storage; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.annotation.Clarification; +import org.apache.kylin.metadata.project.NProjectManager; + +@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") +public class StorageQuotaCollector implements StorageInfoCollector { + + @Override + public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) { + config = NProjectManager.getInstance(config).getProject(project).getConfig(); + long quotaSize = config.getStorageQuotaSize(); + storageVolumeInfo.setStorageQuotaSize(quotaSize); + } + +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java new file mode 100644 index 0000000000..d77b18f18f --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/StorageVolumeInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.metadata.cube.storage; + +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Maps; + +import lombok.Getter; +import lombok.Setter; +import org.apache.kylin.common.annotation.Clarification; + +@Getter +@Setter +@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") +public class StorageVolumeInfo { + private long storageQuotaSize = -1L; + private long totalStorageSize = -1L; + private long garbageStorageSize = -1L; + private Map<String, Set<Long>> garbageModelIndexMap = Maps.newHashMap(); + private Map<StorageInfoEnum, Throwable> throwableMap = Maps.newHashMap(); +} diff --git a/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java new file mode 100644 index 0000000000..e5101a9f1a --- /dev/null +++ b/src/core-job/src/main/java/org/apache/kylin/metadata/cube/storage/TotalStorageCollector.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.storage; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; + +public class TotalStorageCollector implements StorageInfoCollector { + + @Override + public void collect(KylinConfig config, String project, StorageVolumeInfo storageVolumeInfo) throws IOException { + String strPath = config.getWorkingDirectoryWithConfiguredFs(project); + Path path = new Path(strPath); + FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); + long totalStorageSize = 0L; + if (fs.exists(path)) { + totalStorageSize = HadoopUtil.getContentSummary(fs, path).getLength(); + } + storageVolumeInfo.setTotalStorageSize(totalStorageSize); + } + +}