This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9e33d93b2dc668c1309a1f4c1afb8f8f5a946c29 Author: XiaoxiangYu <x...@apache.org> AuthorDate: Thu Dec 3 18:36:18 2020 +0800 KYLIN-4818 Persist metadata in SparkExecutable --- .../kylin/engine/mr/common/CubeStatsReader.java | 7 +-- .../engine/mr/common/StatisticsDecisionUtil.java | 8 +-- .../org/apache/kylin/common/KylinConfigBase.java | 7 +++ .../engine/spark/utils/UpdateMetadataUtil.java | 41 ++++++++++---- .../kylin/engine/spark/job/CubeBuildJob.java | 63 +++++++++++++--------- .../engine/spark/job/ParentSourceChooser.scala | 2 +- 6 files changed, 86 insertions(+), 42 deletions(-) diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 88b94b7..e63fc1a 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -100,9 +100,10 @@ public class CubeStatsReader { ResourceStore store = ResourceStore.getStore(kylinConfig); String statsKey = cubeSegment.getStatisticsResourcePath(); RawResource resource = store.getResource(statsKey); - if (resource == null) - throw new IllegalStateException("Missing resource at " + statsKey); - + if (resource == null) { + // throw new IllegalStateException("Missing resource at " + statsKey); + logger.warn("{} is not exists.", statsKey); + } File tmpSeqFile = writeTmpSeqFile(resource.content()); Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); logger.info("Reading statistics from {}", path); diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java index 3890e38..dc96c56 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.common; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -93,22 +94,23 @@ public class StatisticsDecisionUtil { } // For triggering cube planner phase one - public static void optimizeCubingPlan(CubeSegment segment) throws IOException { + public static Map<Long, Long> optimizeCubingPlan(CubeSegment segment) throws IOException { if (isAbleToOptimizeCubingPlan(segment)) { logger.info("It's able to trigger cuboid planner algorithm."); } else { - return; + return new HashMap<>(); } Map<Long, Long> recommendCuboidsWithStats = CuboidRecommenderUtil.getRecommendCuboidList(segment); if (recommendCuboidsWithStats == null || recommendCuboidsWithStats.isEmpty()) { - return; + return new HashMap<>(); } CubeInstance cube = segment.getCubeInstance(); CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite()); update.setCuboids(recommendCuboidsWithStats); CubeManager.getInstance(cube.getConfig()).updateCube(update); + return recommendCuboidsWithStats; } public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) { diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index f57f899..cb0d863 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2963,6 +2963,13 @@ public abstract class KylinConfigBase implements Serializable { return Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", "3")); } + /** + * If we should calculate cuboid statistics for each segment, which is needed for cube planner phase two + */ + public boolean isSegmentStatisticsEnabled() { + return Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled", "false")); + } + // ============================================================================ // Spark with Kerberos // ============================================================================ diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java index ab89f44..5560a1c 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java @@ -29,12 +29,19 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.job.NSparkExecutable; import org.apache.kylin.metadata.MetadataConstants; @@ -43,12 +50,14 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS; + public class UpdateMetadataUtil { protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class); public static void syncLocalMetadataToRemote(KylinConfig config, - NSparkExecutable nsparkExecutable) throws IOException { + NSparkExecutable nsparkExecutable) throws IOException { String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID); Set<String> segmentIds = Sets.newHashSet(StringUtils.split( nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " ")); @@ -62,26 +71,36 @@ public class UpdateMetadataUtil { // load the meta from local meta path of this job KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(remoteResourceStore); CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).getCubeByUuid(cubeId); - CubeSegment toUpdateSegs = distCube.getSegmentById(segmentId); + CubeSegment toUpdateSeg = distCube.getSegmentById(segmentId); - List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs); - if (!tobeSegments.contains(toUpdateSegs)) + List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSeg); + if (!tobeSegments.contains(toUpdateSeg)) throw new IllegalStateException( String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s", - currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString())); + currentInstanceCopy.toString(), toUpdateSeg.toString(), tobeSegments.toString())); + + String resKey = toUpdateSeg.getStatisticsResourcePath(); + String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(currentInstanceCopy.getConfig().getHdfsWorkingDirectory(), nsparkExecutable.getParam(MetadataConstants.P_JOB_ID)); + Path statisticsFile = new Path(jobWorkingDirPath + "/" + segmentId + "/" + CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + FileSystem fs = HadoopUtil.getWorkingFileSystem(); + if (fs.exists(statisticsFile)) { + FSDataInputStream is = fs.open(statisticsFile); + ResourceStore.getStore(config).putResource(resKey, is, System.currentTimeMillis()); + } CubeUpdate update = new CubeUpdate(currentInstanceCopy); + update.setCuboids(distCube.getCuboids()); List<CubeSegment> toRemoveSegs = Lists.newArrayList(); if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) { - toUpdateSegs.getSnapshots().clear(); + toUpdateSeg.getSnapshots().clear(); // update the snapshot table path for (Map.Entry<String, String> entry : currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) { - toUpdateSegs.putSnapshotResPath(entry.getKey(), entry.getValue()); + toUpdateSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); } } else { - toUpdateSegs.setStatus(SegmentStatusEnum.READY); + toUpdateSeg.setStatus(SegmentStatusEnum.READY); for (CubeSegment segment : currentInstanceCopy.getSegments()) { if (!tobeSegments.contains(segment)) toRemoveSegs.add(segment); @@ -92,11 +111,11 @@ public class UpdateMetadataUtil { } } - logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs); + logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSeg, toRemoveSegs); - toUpdateSegs.setLastBuildTime(System.currentTimeMillis()); + toUpdateSeg.setLastBuildTime(System.currentTimeMillis()); update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])) - .setToUpdateSegs(toUpdateSegs); + .setToUpdateSegs(toUpdateSeg); cubeManager.updateCube(update); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 10514df..7df8cf0 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -94,6 +94,7 @@ public class CubeBuildJob extends SparkApplication { private BuildLayoutWithUpdate buildLayoutWithUpdate; private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap(); + private Map<Long, Long> recommendCuboidMap = new HashMap<>(); public static void main(String[] args) { CubeBuildJob cubeBuildJob = new CubeBuildJob(); @@ -119,7 +120,9 @@ public class CubeBuildJob extends SparkApplication { SpanningTree spanningTree ; ParentSourceChooser sourceChooser; - boolean needStatistics = StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(newSegment); // Cuboid Statistics is served for Cube Planner Phase One + // Cuboid Statistics is served for Cube Planner Phase One at the moment + boolean needStatistics = StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(newSegment) + || config.isSegmentStatisticsEnabled(); if (needStatistics) { // 1.1 Call CuboidStatistics#statistics @@ -136,19 +139,21 @@ public class CubeBuildJob extends SparkApplication { // 1.2 Save cuboid statistics String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId); - CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS), hllMap, 1); + Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS); + CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1); FileSystem fs = HadoopUtil.getWorkingFileSystem(); ResourceStore rs = ResourceStore.getStore(config); - String resPath = newSegment.getStatisticsResourcePath(); - Path statisticsFile = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); + String metaKey = newSegment.getStatisticsResourcePath(); + Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); FSDataInputStream is = fs.open(statisticsFile); - rs.putResource(resPath, is, System.currentTimeMillis()); - logger.info("{} stats saved to resource {}", newSegment, resPath); + rs.putResource(metaKey, is, System.currentTimeMillis()); // write to Job-Local metastore + logger.info("{}'s stats saved to resource key({}) with path({})", newSegment, metaKey, statisticsFile); // 1.3 Trigger cube planner phase one and save optimized cuboid set into CubeInstance - logger.info("Trigger cube planner phase one ."); - StatisticsDecisionUtil.optimizeCubingPlan(newSegment); + recommendCuboidMap = StatisticsDecisionUtil.optimizeCubingPlan(newSegment); + if (!recommendCuboidMap.isEmpty()) + logger.info("Triggered cube planner phase one ."); } buildLayoutWithUpdate = new BuildLayoutWithUpdate(); @@ -161,7 +166,7 @@ public class CubeBuildJob extends SparkApplication { seg = ManagerHub.getSegmentInfo(config, cubeName, segId); spanningTree = new ForestSpanningTree( JavaConversions.asJavaCollection(seg.toBuildLayouts())); - logger.debug("There are {} cuboids to be built in segment {}.", + logger.info("There are {} cuboids to be built in segment {}.", seg.toBuildLayouts().size(), seg.name()); for (LayoutEntity cuboid : JavaConversions.asJavaCollection(seg.toBuildLayouts())) { logger.debug("Cuboid {} has row keys: {}", cuboid.getId(), @@ -192,8 +197,8 @@ public class CubeBuildJob extends SparkApplication { assert buildFromFlatTable != null; updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlatTableDS().count()); } - updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID), - ResourceDetectUtils.getSegmentSourceSize(shareDir)); + updateCubeAndSegmentMeta(getParam(MetadataConstants.P_CUBE_ID), + ResourceDetectUtils.getSegmentSourceSize(shareDir), recommendCuboidMap); } finally { FileSystem fs = HadoopUtil.getWorkingFileSystem(); for (String viewPath : persistedViewFactTable) { @@ -218,22 +223,28 @@ public class CubeBuildJob extends SparkApplication { segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024); List<String> cuboidStatics = new LinkedList<>(); - String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d}"; + String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d \"deviation\": %7f}"; for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) { - cuboidStatics.add(String.format(Locale.getDefault(), template, layoutEntity.getId(), layoutEntity.getRows(), layoutEntity.getByteSize())); + double deviation = 0.0d; + if (layoutEntity.getRows() > 0 && recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) { + long diff = (layoutEntity.getRows() - recommendCuboidMap.get(layoutEntity.getId())); + deviation = diff / (layoutEntity.getRows() + 0.0d); + } + cuboidStatics.add(String.format(Locale.getDefault(), template, layoutEntity.getId(), + layoutEntity.getRows(), layoutEntity.getByteSize(), deviation)); } - JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext()); - JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics); - for (String cuboid : cuboidStatics) { - logger.info("Statistics \t: {}", cuboid); - } - String path = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath(); - logger.info("Saving {} {}", path, segmentInfo); try { + JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext()); + JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics, 1); + for (String cuboid : cuboidStatics) { + logger.info("Statistics \t: {}", cuboid); + } + String path = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath(); + logger.info("Saving {} {}", path, segmentInfo); cuboidStatRdd.saveAsTextFile(path); } catch (Exception e) { - logger.error("Error", e); + logger.error("Write metrics failed.", e); } segment.setLastBuildTime(System.currentTimeMillis()); @@ -256,15 +267,19 @@ public class CubeBuildJob extends SparkApplication { } } - private void updateSegmentSourceBytesSize(String cubeId, Map<String, Object> toUpdateSegmentSourceSize) - throws IOException { + private void updateCubeAndSegmentMeta(String cubeId, Map<String, Object> toUpdateSegmentSourceSize, + Map<Long, Long> recommendCuboidMap) throws IOException { CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId); CubeInstance cubeCopy = cubeInstance.latestCopyForWrite(); CubeUpdate update = new CubeUpdate(cubeCopy); + + if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) + update.setCuboids(recommendCuboidMap); + List<CubeSegment> cubeSegments = Lists.newArrayList(); for (Map.Entry<String, Object> entry : toUpdateSegmentSourceSize.entrySet()) { CubeSegment segment = cubeCopy.getSegmentById(entry.getKey()); - if (segment.getInputRecords() > 0l) { + if (segment.getInputRecords() > 0L) { segment.setInputRecordsSize((Long) entry.getValue()); segment.setLastBuildTime(System.currentTimeMillis()); cubeSegments.add(segment); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala index ab2089d..9b80447 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala @@ -88,7 +88,7 @@ class ParentSourceChooser( logInfo("Sampling start ...") val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, rowKeyColumns.tail: _*) aggInfo = CuboidStatisticsJob.statistics(coreDs, seg) - logInfo("Sampling finished and cost " + (System.currentTimeMillis() - startMs) + " s .") + logInfo("Sampling finished and cost " + (System.currentTimeMillis() - startMs)/1000 + " s .") val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + x._2.cuboid.counter.getCountEstimate).mkString(", ") logInfo("Cuboid Statistics results : \t" + statisticsStr) } else {