This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9e33d93b2dc668c1309a1f4c1afb8f8f5a946c29
Author: XiaoxiangYu <x...@apache.org>
AuthorDate: Thu Dec 3 18:36:18 2020 +0800

    KYLIN-4818 Persist metadata in SparkExecutable
---
 .../kylin/engine/mr/common/CubeStatsReader.java    |  7 +--
 .../engine/mr/common/StatisticsDecisionUtil.java   |  8 +--
 .../org/apache/kylin/common/KylinConfigBase.java   |  7 +++
 .../engine/spark/utils/UpdateMetadataUtil.java     | 41 ++++++++++----
 .../kylin/engine/spark/job/CubeBuildJob.java       | 63 +++++++++++++---------
 .../engine/spark/job/ParentSourceChooser.scala     |  2 +-
 6 files changed, 86 insertions(+), 42 deletions(-)

diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 88b94b7..e63fc1a 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -100,9 +100,10 @@ public class CubeStatsReader {
         ResourceStore store = ResourceStore.getStore(kylinConfig);
         String statsKey = cubeSegment.getStatisticsResourcePath();
         RawResource resource = store.getResource(statsKey);
-        if (resource == null)
-            throw new IllegalStateException("Missing resource at " + statsKey);
-
+        if (resource == null) {
+            // throw new IllegalStateException("Missing resource at " + 
statsKey);
+            logger.warn("{} is not exists.", statsKey);
+        }
         File tmpSeqFile = writeTmpSeqFile(resource.content());
         Path path = new Path(HadoopUtil.fixWindowsPath("file://" + 
tmpSeqFile.getAbsolutePath()));
         logger.info("Reading statistics from {}", path);
diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index 3890e38..dc96c56 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.common;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -93,22 +94,23 @@ public class StatisticsDecisionUtil {
     }
 
     // For triggering cube planner phase one
-    public static void optimizeCubingPlan(CubeSegment segment) throws 
IOException {
+    public static Map<Long, Long> optimizeCubingPlan(CubeSegment segment) 
throws IOException {
         if (isAbleToOptimizeCubingPlan(segment)) {
             logger.info("It's able to trigger cuboid planner algorithm.");
         } else {
-            return;
+            return new HashMap<>();
         }
 
         Map<Long, Long> recommendCuboidsWithStats = 
CuboidRecommenderUtil.getRecommendCuboidList(segment);
         if (recommendCuboidsWithStats == null || 
recommendCuboidsWithStats.isEmpty()) {
-            return;
+            return new HashMap<>();
         }
 
         CubeInstance cube = segment.getCubeInstance();
         CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite());
         update.setCuboids(recommendCuboidsWithStats);
         CubeManager.getInstance(cube.getConfig()).updateCube(update);
+        return recommendCuboidsWithStats;
     }
 
     public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) {
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f57f899..cb0d863 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2963,6 +2963,13 @@ public abstract class KylinConfigBase implements 
Serializable {
         return 
Integer.parseInt(this.getOptional("kylin.canary.sparder-context-period-min", 
"3"));
     }
 
+    /**
+     * If we should calculate cuboid statistics for each segment, which is 
needed for cube planner phase two
+     */
+    public boolean isSegmentStatisticsEnabled() {
+        return 
Boolean.parseBoolean(this.getOptional("kylin.engine.segment-statistics-enabled",
 "false"));
+    }
+
     // 
============================================================================
     // Spark with Kerberos
     // 
============================================================================
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index ab89f44..5560a1c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -29,12 +29,19 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.job.NSparkExecutable;
 import org.apache.kylin.metadata.MetadataConstants;
@@ -43,12 +50,14 @@ import 
org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.kylin.engine.mr.common.BatchConstants.CFG_OUTPUT_STATISTICS;
+
 public class UpdateMetadataUtil {
 
     protected static final Logger logger = 
LoggerFactory.getLogger(UpdateMetadataUtil.class);
 
     public static void syncLocalMetadataToRemote(KylinConfig config,
-                                         NSparkExecutable nsparkExecutable) 
throws IOException {
+                                                 NSparkExecutable 
nsparkExecutable) throws IOException {
         String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID);
         Set<String> segmentIds = Sets.newHashSet(StringUtils.split(
                 nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " 
"));
@@ -62,26 +71,36 @@ public class UpdateMetadataUtil {
         // load the meta from local meta path of this job
         KylinConfig kylinDistConfig = 
MetaDumpUtil.loadKylinConfigFromHdfs(remoteResourceStore);
         CubeInstance distCube = 
CubeManager.getInstance(kylinDistConfig).getCubeByUuid(cubeId);
-        CubeSegment toUpdateSegs = distCube.getSegmentById(segmentId);
+        CubeSegment toUpdateSeg = distCube.getSegmentById(segmentId);
 
-        List<CubeSegment> tobeSegments = 
currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
-        if (!tobeSegments.contains(toUpdateSegs))
+        List<CubeSegment> tobeSegments = 
currentInstanceCopy.calculateToBeSegments(toUpdateSeg);
+        if (!tobeSegments.contains(toUpdateSeg))
             throw new IllegalStateException(
                     String.format(Locale.ROOT, "For cube %s, segment %s is 
expected but not in the tobe %s",
-                            currentInstanceCopy.toString(), 
toUpdateSegs.toString(), tobeSegments.toString()));
+                            currentInstanceCopy.toString(), 
toUpdateSeg.toString(), tobeSegments.toString()));
+
+        String resKey = toUpdateSeg.getStatisticsResourcePath();
+        String jobWorkingDirPath = 
JobBuilderSupport.getJobWorkingDir(currentInstanceCopy.getConfig().getHdfsWorkingDirectory(),
 nsparkExecutable.getParam(MetadataConstants.P_JOB_ID));
+        Path statisticsFile = new Path(jobWorkingDirPath + "/" + segmentId + 
"/" + CFG_OUTPUT_STATISTICS + "/" + 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        if (fs.exists(statisticsFile)) {
+            FSDataInputStream is = fs.open(statisticsFile);
+            ResourceStore.getStore(config).putResource(resKey, is, 
System.currentTimeMillis());
+        }
 
         CubeUpdate update = new CubeUpdate(currentInstanceCopy);
+        update.setCuboids(distCube.getCuboids());
         List<CubeSegment> toRemoveSegs = Lists.newArrayList();
 
         if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) {
-            toUpdateSegs.getSnapshots().clear();
+            toUpdateSeg.getSnapshots().clear();
             // update the snapshot table path
             for (Map.Entry<String, String> entry :
                     
currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) {
-                toUpdateSegs.putSnapshotResPath(entry.getKey(), 
entry.getValue());
+                toUpdateSeg.putSnapshotResPath(entry.getKey(), 
entry.getValue());
             }
         } else {
-            toUpdateSegs.setStatus(SegmentStatusEnum.READY);
+            toUpdateSeg.setStatus(SegmentStatusEnum.READY);
             for (CubeSegment segment : currentInstanceCopy.getSegments()) {
                 if (!tobeSegments.contains(segment))
                     toRemoveSegs.add(segment);
@@ -92,11 +111,11 @@ public class UpdateMetadataUtil {
             }
         }
 
-        logger.info("Promoting cube {}, new segment {}, to remove segments 
{}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
+        logger.info("Promoting cube {}, new segment {}, to remove segments 
{}", currentInstanceCopy, toUpdateSeg, toRemoveSegs);
 
-        toUpdateSegs.setLastBuildTime(System.currentTimeMillis());
+        toUpdateSeg.setLastBuildTime(System.currentTimeMillis());
         update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
-                .setToUpdateSegs(toUpdateSegs);
+                .setToUpdateSegs(toUpdateSeg);
         cubeManager.updateCube(update);
     }
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 10514df..7df8cf0 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -94,6 +94,7 @@ public class CubeBuildJob extends SparkApplication {
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
     private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
     private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
+    private Map<Long, Long> recommendCuboidMap = new HashMap<>();
 
     public static void main(String[] args) {
         CubeBuildJob cubeBuildJob = new CubeBuildJob();
@@ -119,7 +120,9 @@ public class CubeBuildJob extends SparkApplication {
         SpanningTree spanningTree ;
         ParentSourceChooser sourceChooser;
 
-        boolean needStatistics = 
StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(newSegment); // Cuboid 
Statistics is served for Cube Planner Phase One
+        // Cuboid Statistics is served for Cube Planner Phase One at the moment
+        boolean needStatistics = 
StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(newSegment)
+                || config.isSegmentStatisticsEnabled();
 
         if (needStatistics) {
             // 1.1 Call CuboidStatistics#statistics
@@ -136,19 +139,21 @@ public class CubeBuildJob extends SparkApplication {
 
             // 1.2 Save cuboid statistics
             String jobWorkingDirPath = 
JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(),
 jobId);
-            
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new 
Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS), 
hllMap, 1);
+            Path statisticsDir = new Path(jobWorkingDirPath + "/" + 
firstSegmentId + "/" + CFG_OUTPUT_STATISTICS);
+            
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), 
statisticsDir, hllMap, 1);
 
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
             ResourceStore rs = ResourceStore.getStore(config);
-            String resPath = newSegment.getStatisticsResourcePath();
-            Path statisticsFile = new Path(jobWorkingDirPath + "/" + 
firstSegmentId + "/" + CFG_OUTPUT_STATISTICS + "/" + 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            String metaKey = newSegment.getStatisticsResourcePath();
+            Path statisticsFile = new Path(statisticsDir, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
             FSDataInputStream is = fs.open(statisticsFile);
-            rs.putResource(resPath, is, System.currentTimeMillis());
-            logger.info("{} stats saved to resource {}", newSegment, resPath);
+            rs.putResource(metaKey, is, System.currentTimeMillis()); // write 
to Job-Local metastore
+            logger.info("{}'s stats saved to resource key({}) with path({})", 
newSegment, metaKey, statisticsFile);
 
             // 1.3 Trigger cube planner phase one and save optimized cuboid 
set into CubeInstance
-            logger.info("Trigger cube planner phase one .");
-            StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
+            recommendCuboidMap = 
StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
+            if (!recommendCuboidMap.isEmpty())
+                logger.info("Triggered cube planner phase one .");
         }
 
         buildLayoutWithUpdate = new BuildLayoutWithUpdate();
@@ -161,7 +166,7 @@ public class CubeBuildJob extends SparkApplication {
                 seg = ManagerHub.getSegmentInfo(config, cubeName, segId);
                 spanningTree = new ForestSpanningTree(
                         
JavaConversions.asJavaCollection(seg.toBuildLayouts()));
-                logger.debug("There are {} cuboids to be built in segment {}.",
+                logger.info("There are {} cuboids to be built in segment {}.",
                         seg.toBuildLayouts().size(), seg.name());
                 for (LayoutEntity cuboid : 
JavaConversions.asJavaCollection(seg.toBuildLayouts())) {
                     logger.debug("Cuboid {} has row keys: {}", cuboid.getId(),
@@ -192,8 +197,8 @@ public class CubeBuildJob extends SparkApplication {
                 assert buildFromFlatTable != null;
                 updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, 
buildFromFlatTable.getFlatTableDS().count());
             }
-            updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID),
-                    ResourceDetectUtils.getSegmentSourceSize(shareDir));
+            updateCubeAndSegmentMeta(getParam(MetadataConstants.P_CUBE_ID),
+                    ResourceDetectUtils.getSegmentSourceSize(shareDir), 
recommendCuboidMap);
         } finally {
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
             for (String viewPath : persistedViewFactTable) {
@@ -218,22 +223,28 @@ public class CubeBuildJob extends SparkApplication {
         segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024);
         List<String> cuboidStatics = new LinkedList<>();
 
-        String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d}";
+        String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d 
\"deviation\": %7f}";
         for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) {
-            cuboidStatics.add(String.format(Locale.getDefault(), template, 
layoutEntity.getId(), layoutEntity.getRows(), layoutEntity.getByteSize()));
+            double deviation = 0.0d;
+            if (layoutEntity.getRows() > 0 && recommendCuboidMap != null && 
!recommendCuboidMap.isEmpty()) {
+                long diff = (layoutEntity.getRows() - 
recommendCuboidMap.get(layoutEntity.getId()));
+                deviation = diff / (layoutEntity.getRows() + 0.0d);
+            }
+            cuboidStatics.add(String.format(Locale.getDefault(), template, 
layoutEntity.getId(),
+                    layoutEntity.getRows(), layoutEntity.getByteSize(), 
deviation));
         }
 
-        JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(ss.sparkContext());
-        JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics);
-        for (String cuboid : cuboidStatics) {
-            logger.info("Statistics \t: {}", cuboid);
-        }
-        String path = config.getHdfsWorkingDirectory() + 
segment.getPreciseStatisticsResourcePath();
-        logger.info("Saving {} {}", path, segmentInfo);
         try {
+            JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(ss.sparkContext());
+            JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics, 1);
+            for (String cuboid : cuboidStatics) {
+                logger.info("Statistics \t: {}", cuboid);
+            }
+            String path = config.getHdfsWorkingDirectory() + 
segment.getPreciseStatisticsResourcePath();
+            logger.info("Saving {} {}", path, segmentInfo);
             cuboidStatRdd.saveAsTextFile(path);
         } catch (Exception e) {
-            logger.error("Error", e);
+            logger.error("Write metrics failed.", e);
         }
 
         segment.setLastBuildTime(System.currentTimeMillis());
@@ -256,15 +267,19 @@ public class CubeBuildJob extends SparkApplication {
         }
     }
 
-    private void updateSegmentSourceBytesSize(String cubeId, Map<String, 
Object> toUpdateSegmentSourceSize)
-            throws IOException {
+    private void updateCubeAndSegmentMeta(String cubeId, Map<String, Object> 
toUpdateSegmentSourceSize,
+                                          Map<Long, Long> recommendCuboidMap) 
throws IOException {
         CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId);
         CubeInstance cubeCopy = cubeInstance.latestCopyForWrite();
         CubeUpdate update = new CubeUpdate(cubeCopy);
+
+        if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty())
+            update.setCuboids(recommendCuboidMap);
+
         List<CubeSegment> cubeSegments = Lists.newArrayList();
         for (Map.Entry<String, Object> entry : 
toUpdateSegmentSourceSize.entrySet()) {
             CubeSegment segment = cubeCopy.getSegmentById(entry.getKey());
-            if (segment.getInputRecords() > 0l) {
+            if (segment.getInputRecords() > 0L) {
                 segment.setInputRecordsSize((Long) entry.getValue());
                 segment.setLastBuildTime(System.currentTimeMillis());
                 cubeSegments.add(segment);
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index ab2089d..9b80447 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -88,7 +88,7 @@ class ParentSourceChooser(
         logInfo("Sampling start ...")
         val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, 
rowKeyColumns.tail: _*)
         aggInfo = CuboidStatisticsJob.statistics(coreDs, seg)
-        logInfo("Sampling finished and cost " + (System.currentTimeMillis() - 
startMs) + " s .")
+        logInfo("Sampling finished and cost " + (System.currentTimeMillis() - 
startMs)/1000 + " s .")
         val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + 
x._2.cuboid.counter.getCountEstimate).mkString(", ")
         logInfo("Cuboid Statistics results : \t" + statisticsStr)
       } else {

Reply via email to