This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 0dc4c06641966e4fb2975f1ff738de65ff5d846c Author: yaqian.zhang <598593...@qq.com> AuthorDate: Tue Mar 23 17:52:42 2021 +0800 KYLIN-4966 Refresh the existing segment according to the new cuboid list in kylin4 --- .../apache/kylin/cube/model/CubeBuildTypeEnum.java | 5 + .../kylin/job/constant/ExecutableConstants.java | 1 + .../org/apache/kylin/metadata/model/TableDesc.java | 1 - .../spark/SparkBatchCubingEngineParquet.java | 3 +- .../engine/spark/job/FilterRecommendCuboidJob.java | 104 ++++++ .../kylin/engine/spark/job/JobStepFactory.java | 10 +- .../apache/kylin/engine/spark/job/JobStepType.java | 4 +- .../engine/spark/job/NResourceDetectStep.java | 40 +- .../NSparkBatchOptimizeJobCheckpointBuilder.java | 88 +++++ .../spark/job/NSparkCleanupHdfsStorageStep.java | 90 +++++ .../kylin/engine/spark/job/NSparkCubingUtil.java | 7 +- ...esourceDetectStep.java => NSparkLocalStep.java} | 40 +- .../engine/spark/job/NSparkOptimizingJob.java | 108 ++++++ .../engine/spark/job/NSparkOptimizingStep.java | 94 +++++ .../job/NSparkUpdateCubeInfoAfterOptimizeStep.java | 66 ++++ .../kylin/engine/spark/job/OptimizeBuildJob.java} | 411 +++++++++------------ .../job/ResourceDetectBeforeOptimizingJob.java | 108 ++++++ .../engine/spark/utils/UpdateMetadataUtil.java | 84 ++++- .../kylin/engine/spark/job/BuildJobInfos.scala | 39 ++ .../kylin/engine/spark/job/CubeBuildJob.java | 16 +- .../engine/spark/job/CuboidStatisticsJob.scala | 9 + .../kylin/engine/spark/job/LogJobInfoUtils.scala | 32 ++ .../engine/spark/job/ParentSourceChooser.scala | 33 +- .../spark/job/ResourceDetectBeforeCubingJob.java | 8 +- .../engine/spark/metadata/cube/ManagerHub.java | 13 +- .../kylin/engine/spark/metadata/MetaData.scala | 4 + .../engine/spark/metadata/MetadataConverter.scala | 21 +- .../kylin/engine/spark2/NOptimizeJobTest.java | 151 ++++++++ .../kylin/rest/controller/CubeController.java | 1 + .../org/apache/kylin/rest/service/JobService.java | 13 +- 30 files changed, 1248 insertions(+), 356 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java index 6a14025..5b502b1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java @@ -38,6 +38,11 @@ public enum CubeBuildTypeEnum { REFRESH, /** + * optimize segments + */ + OPTIMIZE, + + /** * checkpoint for set of other jobs */ CHECKPOINT diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 5ef7a69..0d5e482 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -94,5 +94,6 @@ public final class ExecutableConstants { public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict"; //kylin on parquetv2 public static final String STEP_NAME_DETECT_RESOURCE = "Detect Resource"; + public static final String STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID = "Build recommend cuboid from parent cuboid"; } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index 588cf1e..8124875 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -26,7 +26,6 @@ import java.util.Locale; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StringSplitter; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java index ad8e68b..b6f4bb8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark; +import org.apache.kylin.engine.spark.job.NSparkOptimizingJob; import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; @@ -50,7 +51,7 @@ public class SparkBatchCubingEngineParquet implements IBatchCubingEngine { @Override public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) { - return null; + return NSparkOptimizingJob.optimize(optimizeSegment, submitter); } @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/FilterRecommendCuboidJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/FilterRecommendCuboidJob.java new file mode 100644 index 0000000..b9c46d2 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/FilterRecommendCuboidJob.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.application.SparkApplication; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +public class FilterRecommendCuboidJob extends SparkApplication { + protected static final Logger logger = LoggerFactory.getLogger(FilterRecommendCuboidJob.class); + + private long baseCuboid; + private Set<Long> recommendCuboids; + + private FileSystem fs = HadoopUtil.getWorkingFileSystem(); + private Configuration conf = HadoopUtil.getCurrentConfiguration(); + + public FilterRecommendCuboidJob() { + + } + + public String getCuboidRootPath(CubeSegment segment) { + return PathManager.getSegmentParquetStoragePath(segment.getCubeInstance(), segment.getName(), + segment.getStorageLocationIdentifier()); + } + + @Override + protected void doExecute() throws Exception { + infos.clearReusedCuboids(); + final CubeManager mgr = CubeManager.getInstance(config); + final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite(); + final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); + + CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment); + Preconditions.checkNotNull(oldSegment, + "cannot find the original segment to be optimized by " + optimizeSegment); + + infos.recordReusedCuboids(Collections.singleton(cube.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING))); + + baseCuboid = cube.getCuboidScheduler().getBaseCuboidId(); + recommendCuboids = cube.getCuboidsRecommend(); + + Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null"); + + Path originalCuboidPath = new Path(getCuboidRootPath(oldSegment)); + + try { + for (FileStatus cuboid : fs.listStatus(originalCuboidPath)) { + String cuboidId = cuboid.getPath().getName(); + if (cuboidId.equals(String.valueOf(baseCuboid)) || recommendCuboids.contains(Long.valueOf(cuboidId))) { + Path optimizeCuboidPath = new Path(getCuboidRootPath(optimizeSegment) + "/" + cuboidId); + FileUtil.copy(fs, cuboid.getPath(), fs, optimizeCuboidPath, false, true, conf); + logger.info("Copy cuboid {} storage from original segment to optimized segment", cuboidId); + } + } + } catch (IOException e) { + logger.error("Failed to filter cuboid", e); + throw e; + } + } + + public static void main(String[] args) { + FilterRecommendCuboidJob filterRecommendCuboidJob = new FilterRecommendCuboidJob(); + filterRecommendCuboidJob.execute(args); + } + + @Override + protected String generateInfo() { + return LogJobInfoUtils.filterRecommendCuboidJobInfo(); + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java index 4e0be04..ead4223 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.job; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.DefaultChainedExecutable; public class JobStepFactory { @@ -41,13 +42,20 @@ public class JobStepFactory { case MERGING: step = new NSparkMergingStep(config.getSparkMergeClassName()); break; + case OPTIMIZING: + step = new NSparkOptimizingStep(OptimizeBuildJob.class.getName()); + break; case CLEAN_UP_AFTER_MERGE: step = new NSparkUpdateMetaAndCleanupAfterMergeStep(); break; + case FILTER_RECOMMEND_CUBOID: + step = new NSparkLocalStep(); + step.setSparkSubmitClassName(FilterRecommendCuboidJob.class.getName()); + step.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION); + break; default: throw new IllegalArgumentException(); } - step.setParams(parent.getParams()); step.setProject(parent.getProject()); step.setTargetSubject(parent.getTargetSubject()); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java index a81312a..3b4142d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java @@ -21,5 +21,7 @@ package org.apache.kylin.engine.spark.job; public enum JobStepType { RESOURCE_DETECT, - CLEAN_UP_AFTER_MERGE, CUBING, MERGING + CLEAN_UP_AFTER_MERGE, CUBING, MERGING, OPTIMIZING, + + FILTER_RECOMMEND_CUBOID } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java index 2aed71e..b366437 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java @@ -18,19 +18,10 @@ package org.apache.kylin.engine.spark.job; -import java.util.Map; -import java.util.Set; - -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.constant.ExecutableConstants; -import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; -public class NResourceDetectStep extends NSparkExecutable { - - private final static String[] excludedSparkConf = new String[] {"spark.executor.cores", - "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions", - "spark.executor.instances", "spark.executor.memory", "spark.executor.extraClassPath"}; +public class NResourceDetectStep extends NSparkLocalStep { // called by reflection public NResourceDetectStep() { @@ -44,33 +35,12 @@ public class NResourceDetectStep extends NSparkExecutable { this.setSparkSubmitClassName(ResourceDetectBeforeMergingJob.class.getName()); /*} else if (parent instanceof NTableSamplingJob) { this.setSparkSubmitClassName(ResourceDetectBeforeSampling.class.getName()); - */} else { + */ + } else if (parent instanceof NSparkOptimizingJob) { + this.setSparkSubmitClassName(ResourceDetectBeforeOptimizingJob.class.getName()); + } else { throw new IllegalArgumentException("Unsupported resource detect for " + parent.getName() + " job"); } this.setName(ExecutableConstants.STEP_NAME_DETECT_RESOURCE); } - - @Override - protected Set<String> getMetadataDumpList(KylinConfig config) { - AbstractExecutable parent = getParentExecutable(); - if (parent instanceof DefaultChainedExecutable) { - return ((DefaultChainedExecutable) parent).getMetadataDumpList(config); - } - throw new IllegalStateException("Unsupported resource detect for non chained executable!"); - } - - @Override - protected Map<String, String> getSparkConfigOverride(KylinConfig config) { - Map<String, String> sparkConfigOverride = super.getSparkConfigOverride(config); - //run resource detect job on local not cluster - sparkConfigOverride.put("spark.master", "local"); - sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1"); - sparkConfigOverride.put("spark.sql.adaptive.enabled", "false"); - for (String sparkConf : excludedSparkConf) { - if (sparkConfigOverride.containsKey(sparkConf)) { - sparkConfigOverride.remove(sparkConf); - } - } - return sparkConfigOverride; - } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkBatchOptimizeJobCheckpointBuilder.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkBatchOptimizeJobCheckpointBuilder.java new file mode 100644 index 0000000..8922d2f --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkBatchOptimizeJobCheckpointBuilder.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import com.google.common.base.Preconditions; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.execution.CheckpointExecutable; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Locale; + +public class NSparkBatchOptimizeJobCheckpointBuilder { + protected SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss", Locale.ROOT); + + final protected CubeInstance cube; + final protected String submitter; + + public NSparkBatchOptimizeJobCheckpointBuilder(CubeInstance cube, String submitter) { + this.cube = cube; + this.submitter = submitter; + + Preconditions.checkNotNull(cube.getFirstSegment(), "Cube " + cube + " is empty!!!"); + } + + public CheckpointExecutable build() { + KylinConfig kylinConfig = cube.getConfig(); + List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(), + cube.getName()); + if (projList == null || projList.size() == 0) { + throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!"); + } else if (projList.size() >= 2) { + throw new RuntimeException("Find more than one project containing the cube " + cube.getName() + + ". It does't meet the uniqueness requirement!!! "); + } + + CheckpointExecutable checkpointJob = new CheckpointExecutable(); + checkpointJob.setSubmitter(submitter); + CubingExecutableUtil.setCubeName(cube.getName(), checkpointJob.getParams()); + checkpointJob.setName( + cube.getName() + " - OPTIMIZE CHECKPOINT - " + format.format(new Date(System.currentTimeMillis()))); + checkpointJob.setProjectName(projList.get(0).getName()); + + // Phase 1: Update cube information + checkpointJob.addTask(createUpdateCubeInfoAfterCheckpointStep()); + + // Phase 2: Cleanup hdfs storage + checkpointJob.addTask(createCleanupHdfsStorageStep()); + + return checkpointJob; + } + + private NSparkUpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterCheckpointStep() { + NSparkUpdateCubeInfoAfterOptimizeStep result = new NSparkUpdateCubeInfoAfterOptimizeStep(); + result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); + CubingExecutableUtil.setCubeName(cube.getName(), result.getParams()); + return result; + } + + private NSparkCleanupHdfsStorageStep createCleanupHdfsStorageStep() { + NSparkCleanupHdfsStorageStep result = new NSparkCleanupHdfsStorageStep(); + result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS); + CubingExecutableUtil.setCubeName(cube.getName(), result.getParams()); + return result; + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCleanupHdfsStorageStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCleanupHdfsStorageStep.java new file mode 100644 index 0000000..950d236 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCleanupHdfsStorageStep.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class NSparkCleanupHdfsStorageStep extends NSparkExecutable { + private static final Logger logger = LoggerFactory.getLogger(NSparkCleanupHdfsStorageStep.class); + private FileSystem fs = HadoopUtil.getWorkingFileSystem(); + + public NSparkCleanupHdfsStorageStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + + List<String> segments = cube.getSegments().stream().map(segment -> { + return segment.getName() + "_" + segment.getStorageLocationIdentifier(); + }).collect(Collectors.toList()); + String project = cube.getProject(); + + //list all segment directory + Path cubePath = new Path(context.getConfig().getHdfsWorkingDirectory(project) + "/parquet/" + cube.getName()); + try { + if (fs.exists(cubePath)) { + FileStatus[] segmentStatus = fs.listStatus(cubePath); + if (segmentStatus != null) { + for (FileStatus status : segmentStatus) { + String segment = status.getPath().getName(); + if (!segments.contains(segment)) { + logger.info("Deleting old segment storage {}", status.getPath()); + fs.delete(status.getPath(), true); + } + } + } + } else { + logger.warn("Cube path doesn't exist! The path is " + cubePath); + } + return new ExecuteResult(); + } catch (IOException e) { + logger.error("Failed to clean old segment storage", e); + return ExecuteResult.createError(e); + } + } + + @Override + protected Set<String> getMetadataDumpList(KylinConfig config) { + AbstractExecutable parent = getParentExecutable(); + return ((DefaultChainedExecutable) parent).getMetadataDumpList(config); + } + +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java index 6a1f9f1..146a1f2 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.spark.job; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.metadata.model.Segments; import org.apache.spark.sql.Column; import org.spark_project.guava.collect.Sets; @@ -79,10 +80,8 @@ public class NSparkCubingUtil { return withoutDot; } - public static String getStoragePath(CubeSegment nDataSegment, Long layoutId) { - String hdfsWorkingDir = nDataSegment.getConfig().getReadHdfsWorkingDirectory(); - return hdfsWorkingDir + getStoragePathWithoutPrefix(nDataSegment.getProject(), - nDataSegment.getCubeInstance().getId(), nDataSegment.getUuid(), layoutId); + public static String getStoragePath(CubeSegment segment, Long layoutId) { + return PathManager.getParquetStoragePath(segment.getCubeInstance(), segment.getName(), segment.getStorageLocationIdentifier(), layoutId); } static Set<String> toSegmentNames(Segments<CubeSegment> segments) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java similarity index 68% copy from kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java copy to kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java index 2aed71e..1bd63ba 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java @@ -18,54 +18,31 @@ package org.apache.kylin.engine.spark.job; -import java.util.Map; -import java.util.Set; - import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; -public class NResourceDetectStep extends NSparkExecutable { +import java.util.Map; +import java.util.Set; +public class NSparkLocalStep extends NSparkExecutable { private final static String[] excludedSparkConf = new String[] {"spark.executor.cores", "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions", "spark.executor.instances", "spark.executor.memory", "spark.executor.extraClassPath"}; - // called by reflection - public NResourceDetectStep() { - - } - - public NResourceDetectStep(DefaultChainedExecutable parent) { - if (parent instanceof NSparkCubingJob) { - this.setSparkSubmitClassName(ResourceDetectBeforeCubingJob.class.getName()); - } else if (parent instanceof NSparkMergingJob) { - this.setSparkSubmitClassName(ResourceDetectBeforeMergingJob.class.getName()); - /*} else if (parent instanceof NTableSamplingJob) { - this.setSparkSubmitClassName(ResourceDetectBeforeSampling.class.getName()); - */} else { - throw new IllegalArgumentException("Unsupported resource detect for " + parent.getName() + " job"); - } - this.setName(ExecutableConstants.STEP_NAME_DETECT_RESOURCE); - } - @Override protected Set<String> getMetadataDumpList(KylinConfig config) { AbstractExecutable parent = getParentExecutable(); if (parent instanceof DefaultChainedExecutable) { return ((DefaultChainedExecutable) parent).getMetadataDumpList(config); } - throw new IllegalStateException("Unsupported resource detect for non chained executable!"); + throw new IllegalStateException("Unsupported " + this.getName() + " for non chained executable!"); } @Override protected Map<String, String> getSparkConfigOverride(KylinConfig config) { Map<String, String> sparkConfigOverride = super.getSparkConfigOverride(config); - //run resource detect job on local not cluster - sparkConfigOverride.put("spark.master", "local"); - sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1"); - sparkConfigOverride.put("spark.sql.adaptive.enabled", "false"); + overrideSparkConf(sparkConfigOverride); for (String sparkConf : excludedSparkConf) { if (sparkConfigOverride.containsKey(sparkConf)) { sparkConfigOverride.remove(sparkConf); @@ -73,4 +50,11 @@ public class NResourceDetectStep extends NSparkExecutable { } return sparkConfigOverride; } + + protected void overrideSparkConf(Map<String, String> sparkConfigOverride) { + //run job on local not cluster + sparkConfigOverride.put("spark.master", "local"); + sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1"); + sparkConfigOverride.put("spark.sql.adaptive.enabled", "false"); + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java new file mode 100644 index 0000000..f09ea04 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.utils.MetaDumpUtil; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; +import java.util.TimeZone; +import java.util.UUID; +import java.util.Set; + +public class NSparkOptimizingJob extends CubingJob { + private static final Logger logger = LoggerFactory.getLogger(NSparkOptimizingJob.class); + private static final String DEPLOY_ENV_NAME = "envName"; + + public static NSparkOptimizingJob optimize(CubeSegment optimizedSegment, String submitter) { + return NSparkOptimizingJob.optimize(optimizedSegment, submitter, CubingJobTypeEnum.OPTIMIZE, UUID.randomUUID().toString()); + } + + public static NSparkOptimizingJob optimize(CubeSegment optimizedSegment, String submitter, CubingJobTypeEnum jobType, String jobId) { + logger.info("SPARK_V2 new job to OPTIMIZE a segment " + optimizedSegment); + CubeSegment oldSegment = optimizedSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizedSegment); + Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + optimizedSegment); + CubeInstance cube = optimizedSegment.getCubeInstance(); + + NSparkOptimizingJob job = new NSparkOptimizingJob(); + SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss", Locale.ROOT); + format.setTimeZone(TimeZone.getTimeZone(cube.getConfig().getTimeZone())); + + StringBuilder builder = new StringBuilder(); + builder.append(jobType).append(" CUBE - "); + builder.append(optimizedSegment.getCubeInstance().getDisplayName()).append(" - ").append(optimizedSegment.getName()) + .append(" - "); + + builder.append(format.format(new Date(System.currentTimeMillis()))); + job.setName(builder.toString()); + job.setId(jobId); + job.setTargetSubject(optimizedSegment.getModel().getUuid()); + job.setTargetSegments(Lists.newArrayList(String.valueOf(optimizedSegment.getUuid()))); + job.setProject(optimizedSegment.getProject()); + job.setSubmitter(submitter); + + job.setParam(MetadataConstants.P_JOB_ID, jobId); + job.setParam(MetadataConstants.P_PROJECT_NAME, cube.getProject()); + job.setParam(MetadataConstants.P_TARGET_MODEL, job.getTargetSubject()); + job.setParam(MetadataConstants.P_CUBE_ID, cube.getId()); + job.setParam(MetadataConstants.P_CUBE_NAME, cube.getName()); + job.setParam(MetadataConstants.P_SEGMENT_IDS, String.join(",", job.getTargetSegments())); + job.setParam(CubingExecutableUtil.SEGMENT_ID, optimizedSegment.getUuid()); + job.setParam(MetadataConstants.SEGMENT_NAME, optimizedSegment.getName()); + job.setParam(MetadataConstants.P_DATA_RANGE_START, optimizedSegment.getSegRange().start.toString()); + job.setParam(MetadataConstants.P_DATA_RANGE_END, optimizedSegment.getSegRange().end.toString()); + job.setParam(MetadataConstants.P_OUTPUT_META_URL, cube.getConfig().getMetadataUrl().toString()); + job.setParam(MetadataConstants.P_JOB_TYPE, String.valueOf(jobType)); + job.setParam(MetadataConstants.P_CUBOID_NUMBER, String.valueOf(cube.getDescriptor().getAllCuboids().size())); + + // Phase 1: Prepare base cuboid data from old segment + JobStepFactory.addStep(job, JobStepType.FILTER_RECOMMEND_CUBOID, cube); + + // Phase 2: Resource detect + JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, cube); + + // Phase 3: Calculate cuboid statistics for optimized segment, Build Cube for Missing Cuboid Data, Update metadata + JobStepFactory.addStep(job, JobStepType.OPTIMIZING, cube); + + return job; + } + + @Override + public Set<String> getMetadataDumpList(KylinConfig config) { + String cubeId = getParam(MetadataConstants.P_CUBE_ID); + CubeInstance cubeInstance = CubeManager.getInstance(config).getCubeByUuid(cubeId); + return MetaDumpUtil.collectCubeMetadata(cubeInstance); + } + + public String getDeployEnvName() { + return getParam(DEPLOY_ENV_NAME); + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingStep.java new file mode 100644 index 0000000..afb2a85 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingStep.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; +import org.apache.kylin.engine.spark.utils.MetaDumpUtil; +import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; + +public class NSparkOptimizingStep extends NSparkExecutable { + private static final Logger logger = LoggerFactory.getLogger(NSparkOptimizingStep.class); + + // called by reflection + public NSparkOptimizingStep() { + } + + public NSparkOptimizingStep(String sparkSubmitClassName) { + this.setSparkSubmitClassName(sparkSubmitClassName); + this.setName(ExecutableConstants.STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID); + } + + @Override + protected Set<String> getMetadataDumpList(KylinConfig config) { + String cubeId = getParam(MetadataConstants.P_CUBE_ID); + CubeInstance cubeInstance = CubeManager.getInstance(config).getCubeByUuid(cubeId); + return MetaDumpUtil.collectCubeMetadata(cubeInstance); + } + + public static class Mockup { + public static void main(String[] args) { + logger.info(NSparkCubingStep.Mockup.class + ".main() invoked, args: " + Arrays.toString(args)); + } + } + + @Override + public boolean needMergeMetadata() { + return true; + } + + @Override + protected Map<String, String> getJobMetricsInfo(KylinConfig config) { + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(getCubeName()); + Map<String, String> joblogInfo = Maps.newHashMap(); + joblogInfo.put(CubingJob.SOURCE_SIZE_BYTES, String.valueOf(cube.getInputRecordSizeBytes())); + joblogInfo.put(CubingJob.CUBE_SIZE_BYTES, String.valueOf(cube.getSizeKB())); + return joblogInfo; + } + + @Override + public void cleanup(ExecuteResult result) throws ExecuteException { + // delete job tmp dir + if (result != null && result.state().ordinal() == ExecuteResult.State.SUCCEED.ordinal()) { + PathManager.deleteJobTempPath(getConfig(), getParam(MetadataConstants.P_PROJECT_NAME), + getParam(MetadataConstants.P_JOB_ID)); + } + } + + @Override + protected void updateMetaAfterOperation(KylinConfig config) throws IOException { + UpdateMetadataUtil.syncLocalMetadataToRemote(config, this); + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateCubeInfoAfterOptimizeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateCubeInfoAfterOptimizeStep.java new file mode 100644 index 0000000..6572b0f --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateCubeInfoAfterOptimizeStep.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterCheckpointStep; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class NSparkUpdateCubeInfoAfterOptimizeStep extends NSparkExecutable { + private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterCheckpointStep.class); + + public NSparkUpdateCubeInfoAfterOptimizeStep() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + final CubeManager cubeManager = CubeManager.getInstance(context.getConfig()); + final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); + + Set<Long> recommendCuboids = cube.getCuboidsRecommend(); + try { + List<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING); + Map<Long, Long> recommendCuboidsWithStats = CuboidStatsReaderUtil + .readCuboidStatsFromSegments(recommendCuboids, newSegments); + if (recommendCuboidsWithStats == null) { + throw new RuntimeException("Fail to get statistics info for recommended cuboids after optimization!!!"); + } + cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats, + newSegments.toArray(new CubeSegment[newSegments.size()])); + return new ExecuteResult(); + } catch (Exception e) { + logger.error("fail to update cube after build", e); + return ExecuteResult.createError(e); + } + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java similarity index 53% copy from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java copy to kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java index efce341..a4e2d5d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java @@ -18,40 +18,20 @@ package org.apache.kylin.engine.spark.job; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CubeStatsWriter; -import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; -import org.apache.kylin.measure.hllc.HLLCounter; -import org.apache.kylin.shaded.com.google.common.base.Joiner; -import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.StringUtils; +import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.NSparkCubingEngine; import org.apache.kylin.engine.spark.application.SparkApplication; import org.apache.kylin.engine.spark.builder.NBuildSourceInfo; @@ -61,243 +41,183 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree; import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity; import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree; -import org.apache.kylin.engine.spark.utils.BuildUtils; import org.apache.kylin.engine.spark.utils.JobMetrics; import org.apache.kylin.engine.spark.utils.JobMetricsUtils; import org.apache.kylin.engine.spark.utils.Metrics; import org.apache.kylin.engine.spark.utils.QueryExecutionCache; -import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.engine.spark.utils.BuildUtils; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.shaded.com.google.common.base.Joiner; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.storage.StorageFactory; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.kylin.shaded.com.google.common.base.Preconditions; -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.kylin.shaded.com.google.common.collect.Sets; - import scala.Tuple2; import scala.collection.JavaConversions; -public class CubeBuildJob extends SparkApplication { - protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class); +import java.io.IOException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Collection; +import java.util.LinkedList; +import java.util.ArrayList; +import java.util.UUID; + +import java.util.stream.Collectors; + +public class OptimizeBuildJob extends SparkApplication { + private static final Logger logger = LoggerFactory.getLogger(OptimizeBuildJob.class); + + private Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); protected static String TEMP_DIR_SUFFIX = "_temp"; - private CubeManager cubeManager; - private CubeInstance cubeInstance; private BuildLayoutWithUpdate buildLayoutWithUpdate; private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap(); - private Map<Long, Long> recommendCuboidMap = new HashMap<>(); + + private Configuration conf = HadoopUtil.getCurrentConfiguration(); + private CubeManager cubeManager; + private CubeInstance cubeInstance; + private SegmentInfo optSegInfo; + private SegmentInfo originalSegInfo; + private CubeSegment optSeg; + private CubeSegment originalSeg; + private long baseCuboidId; public static void main(String[] args) { - CubeBuildJob cubeBuildJob = new CubeBuildJob(); - cubeBuildJob.execute(args); + OptimizeBuildJob optimizeBuildJob = new OptimizeBuildJob(); + optimizeBuildJob.execute(args); } @Override protected void doExecute() throws Exception { + String segmentId = getParam(CubingExecutableUtil.SEGMENT_ID); + String cubeId = getParam(MetadataConstants.P_CUBE_ID); + + cubeManager = CubeManager.getInstance(config); + cubeInstance = cubeManager.getCubeByUuid(cubeId); + optSeg = cubeInstance.getSegmentById(segmentId); + originalSeg = cubeInstance.getOriginalSegmentToOptimize(optSeg); + originalSegInfo = ManagerHub.getSegmentInfo(config, cubeId, originalSeg.getUuid()); + calculateCuboidFromBaseCuboid(); + buildCuboidFromParent(cubeId); + } + + private void calculateCuboidFromBaseCuboid() throws IOException { + logger.info("Start to calculate cuboid statistics for optimized segment"); long start = System.currentTimeMillis(); - logger.info("Start building cube job for {} ...", getParam(MetadataConstants.P_SEGMENT_IDS)); - Set<String> segmentIds = Sets.newHashSet(StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS))); - // For now, Kylin should only build one segment in one time, cube planner has this restriction (maybe we can remove this limitation later) - Preconditions.checkArgument(segmentIds.size() == 1, "Build one segment in one time."); + baseCuboidId = cubeInstance.getCuboidScheduler().getBaseCuboidId(); + LayoutEntity baseCuboid = originalSegInfo.getAllLayoutJava().stream() + .filter(layoutEntity -> layoutEntity.getId() == baseCuboidId).iterator().next(); + Dataset<Row> baseCuboidDS = StorageFactory + .createEngineAdapter(baseCuboid, NSparkCubingEngine.NSparkCubingStorage.class) + .getFrom(PathManager.getParquetStoragePath(config, cubeInstance.getName(), optSeg.getName(), + optSeg.getStorageLocationIdentifier(), String.valueOf(baseCuboid.getId())), ss); - String firstSegmentId = segmentIds.iterator().next(); - String cubeName = getParam(MetadataConstants.P_CUBE_ID); - SegmentInfo seg = ManagerHub.getSegmentInfo(config, cubeName, firstSegmentId); - cubeManager = CubeManager.getInstance(config); - cubeInstance = cubeManager.getCubeByUuid(cubeName); - CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId); - SpanningTree spanningTree; - ParentSourceChooser sourceChooser; + Map<Long, HLLCounter> hllMap = new HashMap<>(); - // Cuboid Statistics is served for Cube Planner Phase One at the moment - boolean needStatistics = StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(newSegment) - || config.isSegmentStatisticsEnabled(); - - if (needStatistics) { - // 1.1 Call CuboidStatistics#statistics - long startMills = System.currentTimeMillis(); - spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts())); - sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, false); - sourceChooser.setNeedStatistics(); - sourceChooser.decideFlatTableSource(null); - Map<Long, HLLCounter> hllMap = new HashMap<>(); - for (Tuple2<Object, AggInfo> cuboidData : sourceChooser.aggInfo()) { - hllMap.put((Long) cuboidData._1, cuboidData._2.cuboid().counter()); - } - logger.info("Cuboid statistics return {} records and cost {} ms.", hllMap.size(), (System.currentTimeMillis() - startMills)); - - // 1.2 Save cuboid statistics - String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId; - Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + "/"); - Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate)); - long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L); - CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc); - - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - ResourceStore rs = ResourceStore.getStore(config); - String metaKey = newSegment.getStatisticsResourcePath(); - Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); - FSDataInputStream is = fs.open(statisticsFile); - rs.putResource(metaKey, is, System.currentTimeMillis()); // write to Job-Local metastore - logger.info("{}'s stats saved to resource key({}) with path({})", newSegment, metaKey, statisticsFile); - - // 1.3 Trigger cube planner phase one and save optimized cuboid set into CubeInstance - recommendCuboidMap = StatisticsDecisionUtil.optimizeCubingPlan(newSegment); - if (!recommendCuboidMap.isEmpty()) - logger.info("Triggered cube planner phase one ."); + for (Tuple2<Object, AggInfo> cuboidData : CuboidStatisticsJob.statistics(baseCuboidDS, + originalSegInfo, getNewCuboidIds())) { + hllMap.put((Long) cuboidData._1, cuboidData._2.cuboid().counter()); } - buildLayoutWithUpdate = new BuildLayoutWithUpdate(config); - List<String> persistedFlatTable = new ArrayList<>(); - List<String> persistedViewFactTable = new ArrayList<>(); - Path shareDir = config.getJobTmpShareDir(project, jobId); - try { - //TODO: what if a segment is deleted during building? - for (String segId : segmentIds) { - seg = ManagerHub.getSegmentInfo(config, cubeName, segId); - spanningTree = new ForestSpanningTree( - JavaConversions.asJavaCollection(seg.toBuildLayouts())); - logger.info("There are {} cuboids to be built in segment {}.", - seg.toBuildLayouts().size(), seg.name()); - for (LayoutEntity cuboid : JavaConversions.asJavaCollection(seg.toBuildLayouts())) { - logger.debug("Cuboid {} has row keys: {}", cuboid.getId(), - Joiner.on(", ").join(cuboid.getOrderedDimensions().keySet())); - } + String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId; + Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + + cubeInstance.getUuid() + "/" + optSeg.getUuid() + "/"); - // choose source - sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, true); - sourceChooser.decideSources(); - NBuildSourceInfo buildFromFlatTable = sourceChooser.flatTableSource(); - Map<Long, NBuildSourceInfo> buildFromLayouts = sourceChooser.reuseSources(); + CubeStatsWriter.writeCuboidStatistics(conf, statisticsDir, hllMap, 1, -1); - infos.clearCuboidsNumPerLayer(segId); + logger.info("Calculate cuboid statistics from base cuboid job takes {} ms", + (System.currentTimeMillis() - start)); + } - // build cuboids from flat table - if (buildFromFlatTable != null) { - collectPersistedTablePath(persistedFlatTable, sourceChooser); - build(Collections.singletonList(buildFromFlatTable), seg, spanningTree); - } + private void buildCuboidFromParent(String cubeId) throws IOException { + logger.info("Start to build recommend cuboid for optimized segment"); + long start = System.currentTimeMillis(); + optSegInfo = ManagerHub.getSegmentInfo(config, cubeId, optSeg.getUuid(), CuboidModeEnum.RECOMMEND); + buildLayoutWithUpdate = new BuildLayoutWithUpdate(config); - // build cuboids from reused layouts - if (!buildFromLayouts.isEmpty()) { - build(buildFromLayouts.values(), seg, spanningTree); - } - infos.recordSpanningTree(segId, spanningTree); + infos.clearAddCuboids(); - logger.info("Updating segment info"); - assert buildFromFlatTable != null; - updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlatTableDS().count()); - } - updateCubeAndSegmentMeta(getParam(MetadataConstants.P_CUBE_ID), - ResourceDetectUtils.getSegmentSourceSize(shareDir), recommendCuboidMap); - } finally { - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - for (String viewPath : persistedViewFactTable) { - fs.delete(new Path(viewPath), true); - logger.info("Delete persisted view fact table: {}.", viewPath); - } - for (String path : persistedFlatTable) { - fs.delete(new Path(path), true); - logger.info("Delete persisted flat table: {}.", path); + SpanningTree spanningTree; + ParentSourceChooser sourceChooser; + try { + spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())); + logger.info("There are {} cuboids to be built in segment {}.", optSegInfo.toBuildLayouts().size(), + optSegInfo.name()); + for (LayoutEntity cuboid : JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())) { + logger.debug("Cuboid {} has row keys: {}", cuboid.getId(), + Joiner.on(", ").join(cuboid.getOrderedDimensions().keySet())); } - logger.info("Building job takes {} ms", (System.currentTimeMillis() - start)); - } - } - private void updateSegmentInfo(String cubeId, SegmentInfo segmentInfo, long sourceRowCount) throws IOException { - CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId); - CubeInstance cubeCopy = cubeInstance.latestCopyForWrite(); - CubeUpdate update = new CubeUpdate(cubeCopy); + // choose source + optSegInfo.removeLayout(baseCuboidId); + sourceChooser = new ParentSourceChooser(spanningTree, optSegInfo, optSeg, jobId, ss, config, false); + sourceChooser.decideSources(); + Map<Long, NBuildSourceInfo> buildFromLayouts = sourceChooser.reuseSources(); - List<CubeSegment> cubeSegments = Lists.newArrayList(); - CubeSegment segment = cubeCopy.getSegmentById(segmentInfo.id()); - segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024); - List<String> cuboidStatics = new LinkedList<>(); - - String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d \"deviation\": %7f}"; - for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) { - double deviation = 0.0d; - if (layoutEntity.getRows() > 0 && recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) { - long diff = (layoutEntity.getRows() - recommendCuboidMap.get(layoutEntity.getId())); - deviation = diff / (layoutEntity.getRows() + 0.0d); - } - cuboidStatics.add(String.format(Locale.getDefault(), template, layoutEntity.getId(), - layoutEntity.getRows(), layoutEntity.getByteSize(), deviation)); - } + infos.clearCuboidsNumPerLayer(optSegInfo.id()); - try { - FileSystem fs = HadoopUtil.getWorkingFileSystem(); - JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext()); - JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics, 1); - for (String cuboid : cuboidStatics) { - logger.info("Statistics \t: {}", cuboid); - } - String pathDir = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath(); - logger.info("Saving {} {} .", pathDir, segmentInfo); - Path path = new Path(pathDir); - if (fs.exists(path)) { - fs.delete(path, true); + // build cuboids from reused layouts + if (!buildFromLayouts.isEmpty()) { + build(buildFromLayouts.values(), optSegInfo, spanningTree); } - cuboidStatRdd.saveAsTextFile(pathDir); - } catch (Exception e) { - logger.error("Write metrics failed.", e); - } + infos.recordSpanningTree(optSegInfo.id(), spanningTree); - segment.setLastBuildTime(System.currentTimeMillis()); - segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID)); - segment.setInputRecords(sourceRowCount); - segment.setSnapshots(new ConcurrentHashMap<>(segmentInfo.getSnapShot2JavaMap())); - segment.setCuboidShardNums(cuboidShardNum); - Map<String, String> additionalInfo = segment.getAdditionalInfo(); - additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); - segment.setAdditionalInfo(additionalInfo); - cubeSegments.add(segment); - update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0])); - cubeManager.updateCube(update); + logger.info("Updating segment info"); + updateOptimizeSegmentInfo(); + } finally { + logger.info("Building job takes {} ms", (System.currentTimeMillis() - start)); + } } - private void collectPersistedTablePath(List<String> persistedFlatTable, ParentSourceChooser sourceChooser) { - String flatTablePath = sourceChooser.persistFlatTableIfNecessary(); - if (!flatTablePath.isEmpty()) { - persistedFlatTable.add(flatTablePath); + private long[] getNewCuboidIds() { + Set<Long> recommendCuboidsSet = cubeInstance.getCuboidsByMode(CuboidModeEnum.RECOMMEND_MISSING); + Preconditions.checkNotNull(recommendCuboidsSet, "The recommend cuboid map could not be null"); + long[] recommendCuboid = new long[recommendCuboidsSet.size()]; + int i = 0; + for (long cuboidId : recommendCuboidsSet) { + recommendCuboid[i++] = cuboidId; } + return recommendCuboid; } - private void updateCubeAndSegmentMeta(String cubeId, Map<String, Object> toUpdateSegmentSourceSize, - Map<Long, Long> recommendCuboidMap) throws IOException { - CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId); - CubeInstance cubeCopy = cubeInstance.latestCopyForWrite(); + protected void updateOptimizeSegmentInfo() throws IOException { + CubeInstance cubeCopy = optSeg.getCubeInstance().latestCopyForWrite(); + List<CubeSegment> cubeSegments = Lists.newArrayList(); CubeUpdate update = new CubeUpdate(cubeCopy); - if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) - update.setCuboids(recommendCuboidMap); - - List<CubeSegment> cubeSegments = Lists.newArrayList(); - for (Map.Entry<String, Object> entry : toUpdateSegmentSourceSize.entrySet()) { - CubeSegment segment = cubeCopy.getSegmentById(entry.getKey()); - if (segment.getInputRecords() > 0L) { - segment.setInputRecordsSize((Long) entry.getValue()); - segment.setLastBuildTime(System.currentTimeMillis()); - cubeSegments.add(segment); - } - } - if (!cubeSegments.isEmpty()) { - update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0])); - cubeManager.updateCube(update); + optSeg.setSizeKB(optSegInfo.getAllLayoutSize() / 1024); + optSeg.setLastBuildTime(System.currentTimeMillis()); + optSeg.setLastBuildJobID(jobId); + optSeg.setInputRecords(originalSeg.getInputRecords()); + Map<Long, Short> existingShardNums = originalSeg.getCuboidShardNums(); + for (Long cuboidId : cubeCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING)) { + cuboidShardNum.putIfAbsent(cuboidId, existingShardNums.get(cuboidId)); } + optSeg.setCuboidShardNums(cuboidShardNum); + optSeg.setInputRecordsSize(originalSeg.getInputRecordsSize()); + Map<String, String> additionalInfo = optSeg.getAdditionalInfo(); + additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); + optSeg.setAdditionalInfo(additionalInfo); + cubeSegments.add(optSeg); + update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0])); + cubeManager.updateCube(update); } - private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) throws InterruptedException{ + private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) { List<NBuildSourceInfo> theFirstLevelBuildInfos = buildLayer(buildSourceInfos, seg, st); LinkedList<List<NBuildSourceInfo>> queue = new LinkedList<>(); @@ -313,12 +233,11 @@ public class CubeBuildJob extends SparkApplication { queue.offer(theNextLayer); } } - } // build current layer and return the next layer to be built. private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, - SpanningTree st) throws InterruptedException{ + SpanningTree st) { int cuboidsNumInLayer = 0; // build current layer @@ -330,11 +249,6 @@ public class CubeBuildJob extends SparkApplication { cuboidsNumInLayer += toBuildCuboids.size(); Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty."); Dataset<Row> parentDS = info.getParentDS(); - - if (toBuildCuboids.size() > 1) { - buildLayoutWithUpdate.cacheAndRegister(info.getLayoutId(), parentDS); - } - // record the source count of flat table if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) { cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count()); @@ -342,22 +256,31 @@ public class CubeBuildJob extends SparkApplication { for (LayoutEntity index : toBuildCuboids) { Preconditions.checkNotNull(parentDS, "Parent dataset is null when building."); - buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { - @Override - public String getName() { - return "build-cuboid-" + index.getId(); - } - - @Override - public LayoutEntity build() throws IOException { - return buildCuboid(seg, index, parentDS, st, info.getLayoutId()); + if (!cubeInstance.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING).contains(index.getId())) { + infos.recordAddCuboids(index.getId()); + buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { + @Override + public String getName() { + return "build-cuboid-" + index.getId(); + } + + @Override + public LayoutEntity build() throws IOException { + return buildCuboid(seg, index, parentDS, st, info.getLayoutId()); + } + + @Override + public NBuildSourceInfo getBuildSourceInfo() { + return info; + } + }, config); + } else { + try { + updateExistingLayout(index, info.getLayoutId()); + } catch (IOException e) { + logger.error("Failed to update existing cuboid info: {}", index.getId()); } - - @Override - public NBuildSourceInfo getBuildSourceInfo() { - return info; - } - }, config); + } allIndexesInCurrentLayer.add(index); } } @@ -490,8 +413,30 @@ public class CubeBuildJob extends SparkApplication { BuildUtils.fillCuboidInfo(layout, path); } + private void updateExistingLayout(LayoutEntity layout, long parentId) throws IOException { + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + long layoutId = layout.getId(); + String path = PathManager.getParquetStoragePath(config, cubeInstance.getName(), optSegInfo.name(), optSegInfo.identifier(), + String.valueOf(layoutId)); + Dataset<Row> dataset = StorageFactory + .createEngineAdapter(layout, NSparkCubingEngine.NSparkCubingStorage.class) + .getFrom(path, ss); + logger.debug("Existing cuboid, use count() to collect cuboid rows."); + long cuboidRowCnt = dataset.count(); + ContentSummary cs = HadoopUtil.getContentSummary(fs, new Path(path)); + layout.setRows(cuboidRowCnt); + layout.setFileCount(cs.getFileCount()); + layout.setByteSize(cs.getLength()); + // record the row count of cuboid + cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt); + layout.setSourceRows(cuboidsRowCount.get(parentId)); + int shardNum = originalSeg.getCuboidShardNums().get(layoutId); + layout.setShardNum(shardNum); + optSegInfo.updateLayout(layout); + } + @Override protected String generateInfo() { - return LogJobInfoUtils.dfBuildJobInfo(); + return LogJobInfoUtils.dfOptimizeJobInfo(); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/ResourceDetectBeforeOptimizingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/ResourceDetectBeforeOptimizingJob.java new file mode 100644 index 0000000..8e77303d --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/ResourceDetectBeforeOptimizingJob.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job; + +import org.apache.hadoop.fs.Path; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.application.SparkApplication; +import org.apache.kylin.engine.spark.builder.NBuildSourceInfo; +import org.apache.kylin.engine.spark.metadata.SegmentInfo; +import org.apache.kylin.engine.spark.metadata.cube.ManagerHub; +import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree; +import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree; +import org.apache.kylin.engine.spark.utils.SparkUtils; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.spark.rdd.RDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.hive.utils.ResourceDetectUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ResourceDetectBeforeOptimizingJob extends SparkApplication { + protected static final Logger logger = LoggerFactory.getLogger(ResourceDetectBeforeOptimizingJob.class); + protected volatile SpanningTree spanningTree; + protected volatile List<NBuildSourceInfo> sources = new ArrayList<>(); + + public static void main(String[] args) { + ResourceDetectBeforeOptimizingJob resourceDetectJob = new ResourceDetectBeforeOptimizingJob(); + resourceDetectJob.execute(args); + } + + @Override + protected void doExecute() throws Exception { + logger.info("Start detect resource before optimize."); + String segId = getParam(CubingExecutableUtil.SEGMENT_ID); + String cubeId = getParam(MetadataConstants.P_CUBE_ID); + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId); + SegmentInfo segInfo = ManagerHub.getSegmentInfo(config, cubeId, segId); + CubeSegment segment = cubeInstance.getSegmentById(segId); + infos.recordOptimizingSegment(segInfo); + + spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(segInfo.toBuildLayouts())); + segInfo.removeLayout(segment.getCuboidScheduler().getBaseCuboidId()); + ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), ResourceDetectUtils.countDistinctSuffix()), false); + ParentSourceChooser datasetChooser = new ParentSourceChooser(spanningTree, segInfo, segment, jobId, ss, config, false); + + datasetChooser.decideSources(); + NBuildSourceInfo buildFromFlatTable = datasetChooser.flatTableSource(); + if (buildFromFlatTable != null) { + sources.add(buildFromFlatTable); + } + Map<Long, NBuildSourceInfo> buildFromLayouts = datasetChooser.reuseSources(); + sources.addAll(buildFromLayouts.values()); + + Map<String, List<String>> resourcePaths = Maps.newHashMap(); + Map<String, Integer> layoutLeafTaskNums = Maps.newHashMap(); + infos.clearSparkPlans(); + for (NBuildSourceInfo source : sources) { + Dataset<Row> dataset = source.getParentDS(); + RDD actionRdd = dataset.queryExecution().toRdd(); + logger.info("leaf nodes is: {} ", SparkUtils.leafNodes(actionRdd)); + infos.recordSparkPlan(dataset.queryExecution().sparkPlan()); + List<Path> paths = JavaConversions + .seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan())); + List<String> pathList = paths.stream().map(Path::toString).collect(Collectors.toList()); + resourcePaths.put(String.valueOf(source.getLayoutId()), pathList); + layoutLeafTaskNums.put(String.valueOf(source.getLayoutId()), SparkUtils.leafNodePartitionNums(actionRdd)); + } + ResourceDetectUtils.write( + new Path(config.getJobTmpShareDir(project, jobId), segId + "_" + ResourceDetectUtils.fileName()), + resourcePaths); + ResourceDetectUtils.write( + new Path(config.getJobTmpShareDir(project, jobId), segId + "_" + ResourceDetectUtils.cubingDetectItemFileSuffix()), + layoutLeafTaskNums); + } + + @Override + protected String generateInfo() { + return LogJobInfoUtils.resourceDetectBeforeOptimizeJobInfo(); + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java index 0987842..a02530e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java @@ -39,13 +39,18 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.job.NSparkExecutable; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,11 +58,11 @@ public class UpdateMetadataUtil { protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class); - public static void syncLocalMetadataToRemote(KylinConfig config, - NSparkExecutable nsparkExecutable) throws IOException { + public static void syncLocalMetadataToRemote(KylinConfig config, NSparkExecutable nsparkExecutable) + throws IOException { String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID); - Set<String> segmentIds = Sets.newHashSet(StringUtils.split( - nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " ")); + Set<String> segmentIds = Sets + .newHashSet(StringUtils.split(nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " ")); String segmentId = segmentIds.iterator().next(); String remoteResourceStore = nsparkExecutable.getDistMetaUrl(); String jobType = nsparkExecutable.getParam(MetadataConstants.P_JOB_TYPE); @@ -77,8 +82,10 @@ public class UpdateMetadataUtil { currentInstanceCopy.toString(), toUpdateSeg.toString(), tobeSegments.toString())); String resKey = toUpdateSeg.getStatisticsResourcePath(); - String jobTmpDir = config.getJobTmpDir(currentInstanceCopy.getProject()) + "/" + nsparkExecutable.getParam(MetadataConstants.P_JOB_ID); - Path statisticsFile = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + segmentId + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + String statisticsDir = config.getJobTmpDir(currentInstanceCopy.getProject()) + "/" + + nsparkExecutable.getParam(MetadataConstants.P_JOB_ID) + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + + cubeId + "/" + segmentId + "/"; + Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FileSystem fs = HadoopUtil.getWorkingFileSystem(); if (fs.exists(statisticsFile)) { FSDataInputStream is = fs.open(statisticsFile); @@ -92,10 +99,45 @@ public class UpdateMetadataUtil { if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) { toUpdateSeg.getSnapshots().clear(); // update the snapshot table path - for (Map.Entry<String, String> entry : - currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) { + for (Map.Entry<String, String> entry : currentInstanceCopy.getLatestReadySegment().getSnapshots() + .entrySet()) { toUpdateSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); } + } else if (String.valueOf(CubeBuildTypeEnum.OPTIMIZE).equals(jobType)) { + CubeSegment origSeg = currentInstanceCopy.getOriginalSegmentToOptimize(toUpdateSeg); + toUpdateSeg.getDictionaries().putAll(origSeg.getDictionaries()); + toUpdateSeg.getSnapshots().putAll(origSeg.getSnapshots()); + toUpdateSeg.getRowkeyStats().addAll(origSeg.getRowkeyStats()); + + CubeStatsReader optSegStatsReader = new CubeStatsReader(toUpdateSeg, config); + CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, config); + Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap(); + if (origSegStatsReader.getCuboidRowHLLCounters() == null) { + throw new IllegalArgumentException( + "Cuboid statistics of original segment do not exist. Please check the config of kylin.engine.segment-statistics-enabled."); + } + addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap); + addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap); + + Set<Long> recommendCuboids = currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND); + Map<Long, HLLCounter> resultCuboidHLLMap = Maps.newHashMapWithExpectedSize(recommendCuboids.size()); + for (long cuboid : recommendCuboids) { + HLLCounter hll = cuboidHLLMap.get(cuboid); + if (hll == null) { + logger.warn("Cannot get the row count stats for cuboid " + cuboid); + } else { + resultCuboidHLLMap.put(cuboid, hll); + } + } + if (fs.exists(statisticsFile)) { + fs.delete(statisticsFile, false); + } + CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new Path(statisticsDir), + resultCuboidHLLMap, 1, origSegStatsReader.getSourceRowCount()); + FSDataInputStream is = fs.open(statisticsFile); + ResourceStore.getStore(config).putBigResource(resKey, is, System.currentTimeMillis()); + + toUpdateSeg.setStatus(SegmentStatusEnum.READY_PENDING); } else { toUpdateSeg.setStatus(SegmentStatusEnum.READY); for (CubeSegment segment : currentInstanceCopy.getSegments()) { @@ -108,16 +150,16 @@ public class UpdateMetadataUtil { } } - logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSeg, toRemoveSegs); + logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSeg, + toRemoveSegs); toUpdateSeg.setLastBuildTime(System.currentTimeMillis()); - update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])) - .setToUpdateSegs(toUpdateSeg); + update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])).setToUpdateSegs(toUpdateSeg); cubeManager.updateCube(update); } - public static void updateMetadataAfterMerge(String cubeId, String segmentId, - KylinConfig config) throws IOException { + public static void updateMetadataAfterMerge(String cubeId, String segmentId, KylinConfig config) + throws IOException { CubeManager cubeManager = CubeManager.getInstance(config); CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite(); @@ -142,11 +184,21 @@ public class UpdateMetadataUtil { update.setStatus(RealizationStatusEnum.READY); } - logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs); + logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, + toRemoveSegs); toUpdateSegs.setLastBuildTime(System.currentTimeMillis()); - update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])) - .setToUpdateSegs(toUpdateSegs); + update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])).setToUpdateSegs(toUpdateSegs); cubeManager.updateCube(update); } + + private static void addFromCubeStatsReader(CubeStatsReader cubeStatsReader, Map<Long, HLLCounter> cuboidHLLMap) { + for (Map.Entry<Long, HLLCounter> entry : cubeStatsReader.getCuboidRowHLLCounters().entrySet()) { + if (cuboidHLLMap.get(entry.getKey()) != null) { + cuboidHLLMap.get(entry.getKey()).merge(entry.getValue()); + } else { + cuboidHLLMap.put(entry.getKey(), entry.getValue()); + } + } + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala index a1592bf..2266087 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala @@ -38,6 +38,13 @@ class BuildJobInfos { private val mergingSegments: java.util.List[SegmentInfo] = new util.LinkedList[SegmentInfo] + // OPTIMIZE + private var optimizingSegment: SegmentInfo = null + + private val addCuboids: java.util.List[Long] = new util.LinkedList[Long] + + private val reusedCuboids: java.util.Set[Long] = new util.HashSet[Long] + // COMMON private val abnormalLayouts: util.Map[Long, util.List[String]] = new util.HashMap[Long, util.List[String]] @@ -82,6 +89,38 @@ class BuildJobInfos { mergingSegments.addAll(segments) } + def recordOptimizingSegment(segment: SegmentInfo): Unit = { + optimizingSegment = segment + } + + def getOptimizingSegment(): SegmentInfo = { + optimizingSegment + } + + def recordReusedCuboids(cuboids: util.Set[Long]): Unit = { + reusedCuboids.addAll(cuboids) + } + + def getReusedCuboid(): util.Set[Long] = { + reusedCuboids + } + + def clearReusedCuboids(): Unit = { + reusedCuboids.clear() + } + + def clearAddCuboids(): Unit = { + addCuboids.clear() + } + + def getAddCuboids: util.List[Long] = { + addCuboids + } + + def recordAddCuboids(cuboidId: Long): Unit = { + addCuboids.add(cuboidId) + } + def clearMergingSegments(): Unit = { mergingSegments.clear() } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index efce341..dbfe810 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -36,7 +36,6 @@ import java.util.stream.Collectors; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; @@ -111,10 +110,10 @@ public class CubeBuildJob extends SparkApplication { Preconditions.checkArgument(segmentIds.size() == 1, "Build one segment in one time."); String firstSegmentId = segmentIds.iterator().next(); - String cubeName = getParam(MetadataConstants.P_CUBE_ID); - SegmentInfo seg = ManagerHub.getSegmentInfo(config, cubeName, firstSegmentId); + String cubeId = getParam(MetadataConstants.P_CUBE_ID); + SegmentInfo seg = ManagerHub.getSegmentInfo(config, cubeId, firstSegmentId); cubeManager = CubeManager.getInstance(config); - cubeInstance = cubeManager.getCubeByUuid(cubeName); + cubeInstance = cubeManager.getCubeByUuid(cubeId); CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId); SpanningTree spanningTree; ParentSourceChooser sourceChooser; @@ -127,7 +126,7 @@ public class CubeBuildJob extends SparkApplication { // 1.1 Call CuboidStatistics#statistics long startMills = System.currentTimeMillis(); spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts())); - sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, false); + sourceChooser = new ParentSourceChooser(spanningTree, seg, newSegment, jobId, ss, config, false); sourceChooser.setNeedStatistics(); sourceChooser.decideFlatTableSource(null); Map<Long, HLLCounter> hllMap = new HashMap<>(); @@ -138,7 +137,7 @@ public class CubeBuildJob extends SparkApplication { // 1.2 Save cuboid statistics String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId; - Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + "/"); + Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + firstSegmentId + "/"); Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate)); long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L); CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc); @@ -164,7 +163,7 @@ public class CubeBuildJob extends SparkApplication { try { //TODO: what if a segment is deleted during building? for (String segId : segmentIds) { - seg = ManagerHub.getSegmentInfo(config, cubeName, segId); + seg = ManagerHub.getSegmentInfo(config, cubeId, segId); spanningTree = new ForestSpanningTree( JavaConversions.asJavaCollection(seg.toBuildLayouts())); logger.info("There are {} cuboids to be built in segment {}.", @@ -175,7 +174,7 @@ public class CubeBuildJob extends SparkApplication { } // choose source - sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, true); + sourceChooser = new ParentSourceChooser(spanningTree, seg, newSegment, jobId, ss, config, true); sourceChooser.decideSources(); NBuildSourceInfo buildFromFlatTable = sourceChooser.flatTableSource(); Map<Long, NBuildSourceInfo> buildFromLayouts = sourceChooser.reuseSources(); @@ -313,7 +312,6 @@ public class CubeBuildJob extends SparkApplication { queue.offer(theNextLayer); } } - } // build current layer and return the next layer to be built. diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala index 9c18765..c02d6a7 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala @@ -45,6 +45,15 @@ object CuboidStatisticsJob { // l.foreach(x => println(x._1 + " >>><<< " + x._2.cuboid.counter.getCountEstimate)) l } + + def statistics(inputDs: Dataset[Row], seg: SegmentInfo, layoutIds: Array[Long]): Array[(Long, AggInfo)] = { + val res = inputDs.rdd.repartition(inputDs.sparkSession.sparkContext.defaultParallelism) + .mapPartitions(new CuboidStatisticsJob(layoutIds, seg.allColumns.count(c => c.rowKey)).statisticsWithinPartition) + val l = res.map(a => (a.key, a)).reduceByKey((a, b) => a.merge(b)).collect() + l + } + + } class CuboidStatisticsJob(ids: Array[Long], rkc: Int) extends Serializable { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala index 5a388f4..7c29ee6 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala @@ -72,6 +72,15 @@ object LogJobInfoUtils { """.stripMargin } + def resourceDetectBeforeOptimizeJobInfo: String = { + s""" + |==========================[RESOURCE DETECT BEFORE OPTIMIZE]=============================== + |optimizing segment : ${infos.getOptimizingSegment()} + |spark plans : ${infos.getSparkPlans} + |==========================[RESOURCE DETECT BEFORE OPTIMIZE]=============================== + """.stripMargin + } + def dfMergeJobInfo: String = { s""" |==========================[MERGE CUBE]=============================== @@ -86,4 +95,27 @@ object LogJobInfoUtils { |==========================[MERGE CUBE]=============================== """.stripMargin } + + def filterRecommendCuboidJobInfo: String = { + s""" + |==========================[FILTER RECOMMEND CUBOID]=============================== + |copy cuboids : ${infos.getReusedCuboid} + |==========================[[FILTER RECOMMEND CUBOID]=============================== + """.stripMargin + } + + def dfOptimizeJobInfo: String = { + s""" + |==========================[BUILD CUBE]=============================== + |auto spark config :${infos.getAutoSparkConfs} + |wait time: ${infos.waitTime} + |build time: ${infos.buildTime} + |add cuboids: ${infos.getAddCuboids} + |abnormal layouts : ${infos.getAbnormalLayouts} + |retry times : ${infos.getRetryTimes} + |job retry infos : + | ${infos.getJobRetryInfos.asScala.map(_.toString).mkString("\n")} + |==========================[BUILD CUBE]=============================== + """.stripMargin + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala index 216fbf8..bd54d0b 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala @@ -21,9 +21,11 @@ package org.apache.kylin.engine.spark.job import org.apache.kylin.shaded.com.google.common.collect.Maps import org.apache.kylin.engine.spark.builder._ import org.apache.kylin.common.KylinConfig +import org.apache.kylin.cube.CubeSegment import org.apache.kylin.engine.spark.builder.NBuildSourceInfo import org.apache.kylin.engine.spark.metadata.cube.model.{LayoutEntity, SpanningTree} import org.apache.kylin.engine.spark.metadata.SegmentInfo +import org.apache.kylin.engine.spark.metadata.cube.PathManager import org.apache.spark.internal.Logging import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} @@ -33,7 +35,8 @@ import scala.collection.JavaConverters._ class ParentSourceChooser( toBuildTree: SpanningTree, - var seg: SegmentInfo, + var segInfo: SegmentInfo, + var segment: CubeSegment, jobId: String, ss: SparkSession, config: KylinConfig, @@ -52,8 +55,8 @@ class ParentSourceChooser( //TODO: MetadataConverter don't have getCubeDesc() now /*val flatTableDesc = new CubeJoinedFlatTableDesc( - MetadataConverter.getCubeDesc(seg.getCube), - ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree))*/ + MetadataConverter.getCubeDesc(segInfo.getCube), + ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree))*/ def setNeedStatistics(): Unit = needStatistics = true @@ -61,7 +64,7 @@ class ParentSourceChooser( def decideSources(): Unit = { toBuildTree.getRootIndexEntities.asScala.foreach { entity => - val parentLayout = CuboidLayoutChooser.selectLayoutForBuild(seg, entity) + val parentLayout = CuboidLayoutChooser.selectLayoutForBuild(segInfo, entity) if (parentLayout != null) { decideParentLayoutSource(entity, parentLayout) } else { @@ -76,18 +79,18 @@ class ParentSourceChooser( // hacked, for some case, you do not want to trigger buildSnapshot // eg: resource detect // Move this to a more suitable place - val builder = new CubeSnapshotBuilder(seg, ss) + val builder = new CubeSnapshotBuilder(segInfo, ss) builder.checkDupKey() - seg = builder.buildSnapshot + segInfo = builder.buildSnapshot } flatTableSource = getFlatTable - val rowKeyColumns: Seq[String] = seg.allColumns.filter(c => c.rowKey).map(c => c.id.toString) + val rowKeyColumns: Seq[String] = segInfo.allColumns.filter(c => c.rowKey).map(c => c.id.toString) if (aggInfo == null && needStatistics) { val startMs = System.currentTimeMillis() logInfo("Sampling start ...") val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, rowKeyColumns.tail: _*) - aggInfo = CuboidStatisticsJob.statistics(coreDs, seg) + aggInfo = CuboidStatisticsJob.statistics(coreDs, segInfo) logInfo("Sampling finished and cost " + (System.currentTimeMillis() - startMs)/1000 + " s .") val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + x._2.cuboid.counter.getCountEstimate).mkString(", ") logInfo("Cuboid Statistics results : \t" + statisticsStr) @@ -131,7 +134,7 @@ class ParentSourceChooser( }.toSeq df.select(allUsedCols.map(col): _*) - path = s"${config.getJobTmpFlatTableDir(seg.project, jobId)}" + path = s"${config.getJobTmpFlatTableDir(segInfo.project, jobId)}" ss.sparkContext.setJobDescription("Persist flat table.") df.write.mode(SaveMode.Overwrite).parquet(path) logInfo(s"Persist flat table into:$path. Selected cols in table are $allUsedCols.") @@ -146,14 +149,14 @@ class ParentSourceChooser( // private def persistFactViewIfNecessary(): String = { // var path = "" // if (needEncoding) { - // logInfo(s"Check project:${seg.getProject} seg:${seg.getName} persist view fact table.") + // logInfo(s"Check project:${segInfo.getProject} segInfo:${segInfo.getName} persist view fact table.") // val fact = flatTableDesc.getDataModel.getRootFactTable - // val globalDicts = DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(seg, toBuildTree) + // val globalDicts = DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(segInfo, toBuildTree) // val existsFactDictCol = globalDicts.asScala.exists(_.tableName.equals(buildDesc.factTable.tableName)) // // if (fact.getTableDesc.isView && existsFactDictCol) { // val viewDS = ss.table(fact.getTableDesc).alias(fact.getAlias) - // path = s"${config.getJobTmpViewFactTableDir(seg.getProject, jobId)}" + // path = s"${config.getJobTmpViewFactTableDir(segInfo.getProject, jobId)}" // ss.sparkContext.setJobDescription("Persist view fact table.") // viewDS.write.mode(SaveMode.Overwrite).parquet(path) // logInfo(s"Persist view fact table into:$path.") @@ -164,7 +167,7 @@ class ParentSourceChooser( private def getSourceFromLayout(layout: LayoutEntity): NBuildSourceInfo = { val buildSource = new NBuildSourceInfo - buildSource.setParentStoragePath("NSparkCubingUtil.getStoragePath(dataCuboid)") + buildSource.setParentStoragePath(NSparkCubingUtil.getStoragePath(segment, layout.getId)) buildSource.setSparkSession(ss) buildSource.setLayoutId(layout.getId) buildSource.setLayout(layout) @@ -181,8 +184,8 @@ class ParentSourceChooser( sourceInfo.setLayoutId(ParentSourceChooser.FLAT_TABLE_FLAG) // sourceInfo.setViewFactTablePath(viewPath) - // val needJoin = ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree) - val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo) + // val needJoin = ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree) + val flatTable = new CreateFlatTable(segInfo, toBuildTree, ss, sourceInfo) val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true) sourceInfo.setFlatTableDS(afterJoin) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java index 065bd5e..dd52d7a 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java @@ -18,6 +18,9 @@ package org.apache.kylin.engine.spark.job; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.engine.spark.application.SparkApplication; @@ -52,12 +55,15 @@ public class ResourceDetectBeforeCubingJob extends SparkApplication { protected void doExecute() throws Exception { logger.info("Start detect resource before cube."); Set<String> segmentIds = Sets.newHashSet(StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS))); + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cubeInstance = cubeManager.getCubeByUuid(getParam(MetadataConstants.P_CUBE_ID)); for (String segId : segmentIds) { SegmentInfo seg = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), segId); + CubeSegment segment = cubeInstance.getSegmentById(segId); spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts())); ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), ResourceDetectUtils.countDistinctSuffix()), ResourceDetectUtils.findCountDistinctMeasure(JavaConversions.asJavaCollection(seg.toBuildLayouts()))); - ParentSourceChooser datasetChooser = new ParentSourceChooser(spanningTree, seg, jobId, ss, config, false); + ParentSourceChooser datasetChooser = new ParentSourceChooser(spanningTree, seg, segment, jobId, ss, config, false); datasetChooser.decideSources(); NBuildSourceInfo buildFromFlatTable = datasetChooser.flatTableSource(); if (buildFromFlatTable != null) { diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java b/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java index 11ad50d..69d3d11 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java +++ b/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java @@ -22,6 +22,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.engine.spark.metadata.MetadataConverter; import org.apache.kylin.engine.spark.metadata.SegmentInfo; @@ -32,11 +33,15 @@ public class ManagerHub { private ManagerHub() { } - public static SegmentInfo getSegmentInfo(KylinConfig kylinConfig, String cubeName, String segmentId) { - CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeName); + public static SegmentInfo getSegmentInfo(KylinConfig kylinConfig, String cubeId, String segmentId) { + return getSegmentInfo(kylinConfig, cubeId, segmentId, CuboidModeEnum.CURRENT); + } + + public static SegmentInfo getSegmentInfo(KylinConfig kylinConfig, String cubeId, String segmentId, CuboidModeEnum cuboidMode) { + CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId); CubeSegment segment = cubeInstance.getSegmentById(segmentId); - return MetadataConverter.getSegmentInfo(CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeName), - segment.getUuid(), segment.getName(), segment.getStorageLocationIdentifier()); + return MetadataConverter.getSegmentInfo(CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId), + segment.getUuid(), segment.getName(), segment.getStorageLocationIdentifier(), cuboidMode); } public static CubeInstance updateSegment(KylinConfig kylinConfig, SegmentInfo segmentInfo) throws IOException { diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala index 030834b..0469775 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala @@ -118,6 +118,10 @@ case class SegmentInfo(id: String, toBuildLayouts.remove(layoutEntity) } + def removeLayout(layoutId: Long): Unit = { + toBuildLayouts = toBuildLayouts.filter(layout => !layout.getId.equals(layoutId)) + } + def updateSnapshot(tableInfo: Map[String, String]): Unit = { snapshotInfo = tableInfo } diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala index a50a835..b0ba58e 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala @@ -23,7 +23,7 @@ import java.util.regex.Pattern import java.{lang, util} import org.apache.commons.lang.StringUtils -import org.apache.kylin.cube.cuboid.Cuboid +import org.apache.kylin.cube.cuboid.{Cuboid, CuboidModeEnum} import org.apache.kylin.cube.{CubeInstance, CubeSegment, CubeUpdate} import org.apache.kylin.engine.spark.metadata.cube.BitUtils import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity @@ -37,8 +37,12 @@ import scala.collection.mutable object MetadataConverter { def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, segmentName: String, identifier: String): SegmentInfo = { + getSegmentInfo(cubeInstance, segmentId, segmentName, identifier, CuboidModeEnum.CURRENT) + } + + def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, segmentName: String, identifier: String, cuboidMode: CuboidModeEnum): SegmentInfo = { val (allColumnDesc, allRowKeyCols) = extractAllColumnDesc(cubeInstance) - val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance) + val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance, cuboidMode) val dictColumn = measure.values.filter(_.returnType.dataType.equals("bitmap")) .map(_.pra.head).toSet SegmentInfo(segmentId, segmentName, identifier, cubeInstance.getProject, cubeInstance.getConfig, extractFactTable(cubeInstance), @@ -122,9 +126,12 @@ object MetadataConverter { } def extractEntityAndMeasures(cubeInstance: CubeInstance): (List[LayoutEntity], Map[Integer, FunctionDesc]) = { + extractEntityAndMeasures(cubeInstance, CuboidModeEnum.CURRENT) + } + + def extractEntityAndMeasures(cubeInstance: CubeInstance, cuboidMode: CuboidModeEnum): (List[LayoutEntity], Map[Integer, FunctionDesc]) = { val (columnIndexes, shardByColumnsId, idToColumnMap, measureId) = genIDToColumnMap(cubeInstance) - (cubeInstance.getCuboidScheduler - .getAllCuboidIds + (cubeInstance.getCuboidsByMode(cuboidMode) .asScala .map { long => genLayoutEntity(columnIndexes, shardByColumnsId, idToColumnMap, measureId, long) @@ -202,7 +209,11 @@ object MetadataConverter { } def extractEntityList2JavaList(cubeInstance: CubeInstance): java.util.List[LayoutEntity] = { - extractEntityAndMeasures(cubeInstance)._1.asJava + extractEntityList2JavaList(cubeInstance, CuboidModeEnum.CURRENT) + } + + def extractEntityList2JavaList(cubeInstance: CubeInstance, cuboidMode: CuboidModeEnum): java.util.List[LayoutEntity] = { + extractEntityAndMeasures(cubeInstance, cuboidMode)._1.asJava } private def toColumnDesc(ref: TblColRef, index: Int = -1, rowKey: Boolean = false) = { diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java new file mode 100644 index 0000000..5e5789c --- /dev/null +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark2; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.CubeStatsReader; +import org.apache.kylin.engine.spark.LocalWithSparkSessionTest; +import org.apache.kylin.engine.spark.job.NSparkBatchOptimizeJobCheckpointBuilder; +import org.apache.kylin.engine.spark.job.NSparkOptimizingJob; +import org.apache.kylin.engine.spark.metadata.cube.PathManager; +import org.apache.kylin.job.exception.SchedulerException; +import org.apache.kylin.job.execution.CheckpointExecutable; +import org.apache.kylin.job.execution.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.realization.RealizationType; +import org.apache.kylin.query.routing.Candidate; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class NOptimizeJobTest extends LocalWithSparkSessionTest { + protected KylinConfig config; + protected CubeManager cubeMgr; + protected ExecutableManager execMgr; + + private final String CUBE_NAME = "ci_left_join_cube"; + private final long CUBOID_ADD = 1048575L; + private final long CUBOID_DELETE = 14336L; + + @Override + public void setup() throws SchedulerException { + super.setup(); + overwriteSystemProp("kylin.env", "UT"); + overwriteSystemProp("isDeveloperMode", "true"); + overwriteSystemProp("kylin.engine.segment-statistics-enabled", "true"); + Map<RealizationType, Integer> priorities = Maps.newHashMap(); + priorities.put(RealizationType.HYBRID, 0); + priorities.put(RealizationType.CUBE, 0); + Candidate.setPriorities(priorities); + config = KylinConfig.getInstanceFromEnv(); + cubeMgr = CubeManager.getInstance(config); + execMgr = ExecutableManager.getInstance(config); + } + + @Override + public void after() { + super.after(); + } + + @Test + public void verifyOptimizeJob() throws Exception { + CubeInstance cube = cubeMgr.reloadCube(CUBE_NAME); + Set<Long> recommendCuboids = new HashSet<>(); + recommendCuboids.addAll(cube.getCuboidScheduler().getAllCuboidIds()); + recommendCuboids.add(CUBOID_ADD); + recommendCuboids.remove(CUBOID_DELETE); + // 1. Build two segments + buildSegments(CUBE_NAME, new SegmentRange.TSRange(dateToLong("2012-01-01"), dateToLong("2012-02-01")), + new SegmentRange.TSRange(dateToLong("2012-02-01"), dateToLong("2012-03-01"))); + + // 2. Optimize Segment + CubeSegment[] optimizeSegments = cubeMgr.optimizeSegments(cube, recommendCuboids); + for (CubeSegment segment : optimizeSegments) { + ExecutableState result = optimizeSegment(segment); + Assert.assertEquals(ExecutableState.SUCCEED, result); + } + + cube = cubeMgr.reloadCube(CUBE_NAME); + + Assert.assertEquals(4, cube.getSegments().size()); + Assert.assertEquals(2, cube.getSegments(SegmentStatusEnum.READY_PENDING).size()); + Assert.assertEquals(2, cube.getSegments(SegmentStatusEnum.READY).size()); + + // 3. CheckPoint Job + executeCheckPoint(cube); + + cube = cubeMgr.reloadCube(CUBE_NAME); + + // 4. Check cube status and cuboid list + Assert.assertEquals(2, cube.getSegments().size()); + Assert.assertEquals(2, cube.getSegments(SegmentStatusEnum.READY).size()); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + for (CubeSegment segment : cube.getSegments()) { + Assert.assertEquals(SegmentStatusEnum.READY, segment.getStatus()); + CubeStatsReader segStatsReader = new CubeStatsReader(segment, config); + Assert.assertEquals(recommendCuboids, segStatsReader.getCuboidRowHLLCounters().keySet()); + String cuboidPath = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()); + Assert.assertTrue(fs.exists(new Path(cuboidPath))); + Assert.assertTrue(fs.exists(new Path(cuboidPath + "/" + CUBOID_ADD))); + Assert.assertFalse(fs.exists(new Path(cuboidPath + "/" + CUBOID_DELETE))); + + } + Assert.assertEquals(recommendCuboids, cube.getCuboidScheduler().getAllCuboidIds()); + } + + public void buildSegments(String cubeName, SegmentRange.TSRange... toBuildRanges) throws Exception { + Assert.assertTrue(config.getHdfsWorkingDirectory().startsWith("file:")); + + // cleanup all segments first + cleanupSegments(cubeName); + + ExecutableState state; + for (SegmentRange.TSRange toBuildRange : toBuildRanges) { + state = buildCuboid(cubeName, toBuildRange); + Assert.assertEquals(ExecutableState.SUCCEED, state); + } + } + + protected ExecutableState optimizeSegment(CubeSegment segment) throws Exception { + NSparkOptimizingJob optimizeJob = NSparkOptimizingJob.optimize(segment, "ADMIN"); + execMgr.addJob(optimizeJob); + ExecutableState result = wait(optimizeJob); + checkJobTmpPathDeleted(config, optimizeJob); + return result; + } + + protected ExecutableState executeCheckPoint(CubeInstance cubeInstance) throws Exception { + CheckpointExecutable checkPointJob = new NSparkBatchOptimizeJobCheckpointBuilder(cubeInstance, "ADMIN").build(); + execMgr.addJob(checkPointJob); + ExecutableState result = wait(checkPointJob); + return result; + } +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 60bbf6c..c514bd4 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.HashSet; import javax.servlet.http.HttpServletResponse; diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index 7cb5d3d..3cfcc78 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -44,12 +44,12 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.engine.EngineFactory; -import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.LookupSnapshotBuildJob; import org.apache.kylin.engine.mr.common.CubeJobLockUtil; import org.apache.kylin.engine.mr.common.JobInfoConverter; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.job.NSparkBatchOptimizeJobCheckpointBuilder; import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.engine.spark.metadata.cube.source.SourceFactory; import org.apache.kylin.job.JobInstance; @@ -331,7 +331,7 @@ public class JobService extends BasicService implements InitializingBean { } /** Add checkpoint job for batch jobs */ - CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cube, submitter).build(); + CheckpointExecutable checkpointJob = new NSparkBatchOptimizeJobCheckpointBuilder(cube, submitter).build(); checkpointJob.addTaskListForCheck(optimizeJobList); getExecutableManager().addJob(checkpointJob); @@ -501,11 +501,19 @@ public class JobService extends BasicService implements InitializingBean { public String getJobStepOutput(String jobId, String stepId) { ExecutableManager executableManager = getExecutableManager(); + AbstractExecutable job = executableManager.getJob(jobId); + if (job instanceof CheckpointExecutable) { + return executableManager.getOutput(stepId).getVerboseMsg(); + } return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg(); } public String getAllJobStepOutput(String jobId, String stepId) { ExecutableManager executableManager = getExecutableManager(); + AbstractExecutable job = executableManager.getJob(jobId); + if (job instanceof CheckpointExecutable) { + return executableManager.getOutput(stepId).getVerboseMsg(); + } return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg(); } @@ -666,6 +674,7 @@ public class JobService extends BasicService implements InitializingBean { if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube()) || null == job.getRelatedSegment()) { getExecutableManager().discardJob(job.getId()); + return; } logger.info("Cancel job [" + job.getId() + "] trigger by "