This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new 444ae0604e KYLIN-5184, fix cuboid statistic feature (#1877)
444ae0604e is described below

commit 444ae0604e8290cf18b5defddfdd6d79006f7501
Author: Tengting Xu <34978943+muk...@users.noreply.github.com>
AuthorDate: Thu Jun 16 10:58:50 2022 +0800

    KYLIN-5184, fix cuboid statistic feature (#1877)
    
    * KYLIN-5184, fix cuboid statistic feature
    
    * minor, adapt for cuboids without base cuboid
    
    * minor, adapt for cuboids with original logic
    
    * minor, adapt for cuboids with original logic
    
    * minor, adapt for cuboids with original logic
---
 .../java/org/apache/kylin/engine/mr/CubingJob.java |   6 +-
 .../kylin/engine/mr/common/CubeStatsReader.java    | 167 +++++++++++----------
 .../engine/mr/common/CuboidRecommenderUtil.java    |   4 +-
 .../engine/mr/common/CuboidStatsReaderUtil.java    |   6 +-
 .../kylin/engine/mr/common/MapReduceUtil.java      |   8 +-
 .../engine/mr/common/StatisticsDecisionUtil.java   |  16 +-
 .../java/org/apache/kylin/cube/CubeManager.java    |  10 +-
 .../java/org/apache/kylin/cube/CubeSegment.java    |  58 +++++++
 .../kylin/cube/cuboid/DefaultCuboidScheduler.java  |   5 +-
 .../org/apache/kylin/cube/kv/CubeDimEncMap.java    |  10 +-
 .../job/impl/curator/CuratorLeaderSelector.java    |   5 +-
 .../org/apache/kylin/metadata/model/Segments.java  |  14 +-
 .../kylin/engine/spark/job/OptimizeBuildJob.java   |  17 ++-
 .../engine/spark/utils/UpdateMetadataUtil.java     |   4 +-
 .../engine/spark/builder/CubeMergeAssist.java      |   5 +-
 .../kylin/engine/spark/job/CubeBuildJob.java       |  18 ++-
 .../kylin/engine/spark/job/CubeMergeJob.java       |  14 +-
 .../engine/spark/job/ParentSourceChooser.scala     |  11 +-
 .../kylin/engine/spark2/NOptimizeJobTest.java      |   2 +-
 19 files changed, 246 insertions(+), 134 deletions(-)

diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index dc6350fd1b..bd391f29b8 100644
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -364,7 +364,7 @@ public class CubingJob extends DefaultChainedExecutable {
         String cuboidRootPath = getCuboidRootPath(seg, config);
 
         try {
-            estimatedSizeMap = new CubeStatsReader(seg, 
config).getCuboidSizeMap(true);
+            estimatedSizeMap = new CubeStatsReader(seg, config, 
true).getCuboidSizeMap(true);
         } catch (IOException e) {
             logger.warn("Cannot get segment {} estimated size map", 
seg.getName());
 
diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 353e7f4b92..14f0cbf0ae 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -86,65 +86,73 @@ public class CubeStatsReader {
     final Map<Long, HLLCounter> cuboidRowEstimatesHLL;
     final CuboidScheduler cuboidScheduler;
     public final long sourceRowCount;
+    private boolean isPrecise = false;
 
-    public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) 
throws IOException {
-        this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
+    public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig, 
boolean enableHll) throws IOException {
+        this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig, 
enableHll);
     }
 
     /**
      * @param cuboidScheduler if it's null, part of it's functions will not be 
supported
      */
-    public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler 
cuboidScheduler, KylinConfig kylinConfig)
+    public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler 
cuboidScheduler, KylinConfig kylinConfig, boolean enableHll)
             throws IOException {
         this.seg = cubeSegment;
         this.cuboidScheduler = cuboidScheduler;
-        ResourceStore store = ResourceStore.getStore(kylinConfig);
-        String statsKey = cubeSegment.getStatisticsResourcePath();
-        RawResource resource = store.getResource(statsKey);
-        if (resource != null) {
-            File tmpSeqFile = writeTmpSeqFile(resource.content());
-            Path path = new Path(HadoopUtil.fixWindowsPath("file://" + 
tmpSeqFile.getAbsolutePath()));
-            logger.info("Reading statistics from {}", path);
-            CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
-            tmpSeqFile.delete();
-
-            this.samplingPercentage = cubeStatsResult.getPercentage();
-            this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
-            this.mapperOverlapRatioOfFirstBuild = 
cubeStatsResult.getMapperOverlapRatio();
-            this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
-            this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
-        } else {
-            // throw new IllegalStateException("Missing resource at " + 
statsKey);
-            logger.warn("{} is not exists.", statsKey);
-            this.samplingPercentage = -1;
+
+        if (!enableHll && seg.getCuboidStaticsSize() != null
+                && !seg.getCuboidStaticsSize().isEmpty()
+                && seg.getCuboidStaticsRows() != null
+                && !seg.getCuboidStaticsRows().isEmpty()) {
+            logger.info("Reading precise statics from segment {}", 
seg.getUuid());
+            this.isPrecise = true;
+            this.samplingPercentage = 100;
             this.mapperNumberOfFirstBuild = -1;
             this.mapperOverlapRatioOfFirstBuild = -1.0;
             this.cuboidRowEstimatesHLL = null;
             this.sourceRowCount = -1L;
+        } else {
+            ResourceStore store = ResourceStore.getStore(kylinConfig);
+            String statsKey = seg.getStatisticsResourcePath();
+            RawResource resource = store.getResource(statsKey);
+            if (resource != null) {
+                File tmpSeqFile = writeTmpSeqFile(resource.content());
+                Path path = new Path(HadoopUtil.fixWindowsPath("file://" + 
tmpSeqFile.getAbsolutePath()));
+                logger.info("Reading statistics from {}", path);
+                CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
+                tmpSeqFile.delete();
+
+                this.samplingPercentage = cubeStatsResult.getPercentage();
+                this.mapperNumberOfFirstBuild = 
cubeStatsResult.getMapperNumber();
+                this.mapperOverlapRatioOfFirstBuild = 
cubeStatsResult.getMapperOverlapRatio();
+                this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
+                this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
+            } else {
+                // throw new IllegalStateException("Missing resource at " + 
statsKey);
+                logger.warn("{} is not exists.", statsKey);
+                this.samplingPercentage = -1;
+                this.mapperNumberOfFirstBuild = -1;
+                this.mapperOverlapRatioOfFirstBuild = -1.0;
+                this.cuboidRowEstimatesHLL = null;
+                this.sourceRowCount = -1L;
+            }
         }
     }
 
-    /**
-     * Read statistics from
-     * @param path
-     * rather than
-     * @param cubeSegment
-     *
-     * Since the statistics are from
-     * @param path
-     * cuboid scheduler should be provided by default
-     */
-    public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler 
cuboidScheduler, KylinConfig kylinConfig, Path path)
-            throws IOException {
-        CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
+    public Map<Long, Long> getPreciseCuboidsRowsMap() {
+        return this.seg.getCuboidStaticsRows();
+    }
 
-        this.seg = cubeSegment;
-        this.cuboidScheduler = cuboidScheduler;
-        this.samplingPercentage = cubeStatsResult.getPercentage();
-        this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber();
-        this.mapperOverlapRatioOfFirstBuild = 
cubeStatsResult.getMapperOverlapRatio();
-        this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap();
-        this.sourceRowCount = cubeStatsResult.getSourceRecordCount();
+    public Map<Long, Double> getPreciseCuboidsSizeMap() {
+        return handlePreciseCuboidsSize(this.seg.getCuboidStaticsSize());
+    }
+
+    private Map<Long, Double> handlePreciseCuboidsSize(Map<Long, Long> 
cuboidSizeMap) {
+        Map<Long, Double> sizeMap = Maps.newHashMap();
+        for (Map.Entry<Long, Long> entry: cuboidSizeMap.entrySet()) {
+            sizeMap.put(entry.getKey(), 1.0 * entry.getValue()/(1024L * 
1024L));
+        }
+        return sizeMap;
     }
 
     private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -320,13 +328,9 @@ public class CubeStatsReader {
                 sizeMap.put(cuboidId, oriValue * rate);
             }
         }
-
         logger.info("cube size is {} after optimize", 
SumHelper.sumDouble(sizeMap.values()));
-
-        return;
     }
 
-
     /**
      * Estimate the cuboid's size
      *
@@ -353,8 +357,9 @@ public class CubeStatsReader {
         double percentileSpace = 0;
         int topNSpace = 0;
         for (MeasureDesc measureDesc : 
cubeSegment.getCubeDesc().getMeasures()) {
-            if (rowCount == 0)
+            if (rowCount == 0) {
                 break;
+            }
             DataType returnType = 
measureDesc.getFunction().getReturnDataType();
             if 
(measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT))
 {
                 long estimateDistinctCount = sourceRowCount / rowCount;
@@ -381,19 +386,29 @@ public class CubeStatsReader {
     }
 
     private void print(PrintWriter out) {
-        Map<Long, Long> cuboidRows = getCuboidRowEstimatesHLL();
-        Map<Long, Double> cuboidSizes = getCuboidSizeMap();
+        Map<Long, Long> cuboidRows;
+        Map<Long, Double> cuboidSizes;
+        if (isPrecise) {
+            cuboidRows = getPreciseCuboidsRowsMap();
+            cuboidSizes = getPreciseCuboidsSizeMap();
+        } else {
+            cuboidRows = getCuboidRowEstimatesHLL();
+            cuboidSizes = getCuboidSizeMap();
+        }
         List<Long> cuboids = new ArrayList<Long>(cuboidRows.keySet());
         Collections.sort(cuboids);
 
+        String estimatedOrPrecise =  isPrecise? "precise" : "estimated";
         
out.println("============================================================================");
         out.println("Statistics of " + seg);
         out.println();
-        out.println(
-                "Cube statistics hll precision: " + 
cuboidRowEstimatesHLL.values().iterator().next().getPrecision());
+        if (!isPrecise) {
+            out.println("Cube statistics hll precision: "
+                    + 
cuboidRowEstimatesHLL.values().iterator().next().getPrecision());
+        }
         out.println("Total cuboids: " + cuboidRows.size());
-        out.println("Total estimated rows: " + 
SumHelper.sumLong(cuboidRows.values()));
-        out.println("Total estimated size(MB): " + 
SumHelper.sumDouble(cuboidSizes.values()));
+        out.println("Total " + estimatedOrPrecise + " rows: " + 
SumHelper.sumLong(cuboidRows.values()));
+        out.println("Total " + estimatedOrPrecise + " size(MB): " + 
SumHelper.sumDouble(cuboidSizes.values()));
         out.println("Sampling percentage:  " + samplingPercentage);
         out.println("Mapper overlap ratio: " + mapperOverlapRatioOfFirstBuild);
         out.println("Mapper number: " + mapperNumberOfFirstBuild);
@@ -419,21 +434,14 @@ public class CubeStatsReader {
         return ret;
     }
 
-    public List<Long> getCuboidsByLayer(int level) {
-        if (cuboidScheduler == null) {
-            throw new UnsupportedOperationException("cuboid scheduler is 
null");
-        }
-        List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
-        return layeredCuboids.get(level);
-    }
-
     private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, 
Map<Long, Double> cuboidSizes, PrintWriter out) {
         if (cuboidScheduler == null) {
             throw new UnsupportedOperationException("cuboid scheduler is 
null");
         }
         long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc());
         int dimensionCount = Long.bitCount(baseCuboid);
-        printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, 
cuboidSizes, dimensionCount, 0, out);
+        printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, 
cuboidSizes, dimensionCount, 0, out,
+                this.isPrecise);
     }
 
     private void printKVInfo(PrintWriter writer) {
@@ -445,19 +453,21 @@ public class CubeStatsReader {
     }
 
     private static void printCuboidInfoTree(long parent, long cuboidID, final 
CuboidScheduler scheduler,
-            Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int 
dimensionCount, int depth, PrintWriter out) {
-        printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes, 
dimensionCount, depth, out);
+            Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, int 
dimensionCount, int depth, PrintWriter out,
+                                            boolean isPrecise) {
+        printOneCuboidInfo(parent, cuboidID, cuboidRows, cuboidSizes, 
dimensionCount, depth, out, isPrecise);
 
         List<Long> children = scheduler.getSpanningCuboid(cuboidID);
         Collections.sort(children);
 
         for (Long child : children) {
-            printCuboidInfoTree(cuboidID, child, scheduler, cuboidRows, 
cuboidSizes, dimensionCount, depth + 1, out);
+            printCuboidInfoTree(cuboidID, child, scheduler, cuboidRows, 
cuboidSizes, dimensionCount, depth + 1,
+                    out, isPrecise);
         }
     }
 
     private static void printOneCuboidInfo(long parent, long cuboidID, 
Map<Long, Long> cuboidRows,
-            Map<Long, Double> cuboidSizes, int dimensionCount, int depth, 
PrintWriter out) {
+            Map<Long, Double> cuboidSizes, int dimensionCount, int depth, 
PrintWriter out, boolean isPrecise) {
         StringBuffer sb = new StringBuffer();
         for (int i = 0; i < depth; i++) {
             sb.append("    ");
@@ -465,16 +475,21 @@ public class CubeStatsReader {
         String cuboidName = Cuboid.getDisplayName(cuboidID, dimensionCount);
         sb.append("|---- Cuboid ").append(cuboidName);
 
-        long rowCount = cuboidRows.get(cuboidID);
-        double size = cuboidSizes.get(cuboidID);
-        sb.append(", est row: ").append(rowCount).append(", est MB: 
").append(formatDouble(size));
+        long rowCount = cuboidRows.get(cuboidID) == null ? 0: 
cuboidRows.get(cuboidID);
+        double size = cuboidSizes.get(cuboidID)== null ? 0.0: 
cuboidSizes.get(cuboidID);
+        String markPreciseOrEstimate =  isPrecise ? "precise" : "est";
+        sb.append(", ").append(markPreciseOrEstimate).append(" row: 
").append(rowCount)
+                .append(", ").append(markPreciseOrEstimate).append(" MB: ")
+                .append(formatDouble(size));
 
         if (parent != -1) {
-            sb.append(", shrink: ").append(formatDouble(100.0 * 
cuboidRows.get(cuboidID) / cuboidRows.get(parent)))
-                    .append("%");
+            double shrink = -0.0;
+            if (cuboidRows.get(parent) != null) {
+                shrink = 100.0 * cuboidRows.get(cuboidID) / 
cuboidRows.get(parent);
+            }
+            sb.append(", shrink: ").append(formatDouble(shrink)).append("%");
         }
-
-        out.println(sb.toString());
+        out.println(sb);
     }
 
     private static String formatDouble(double input) {
@@ -544,7 +559,7 @@ public class CubeStatsReader {
                 new BufferedWriter(new OutputStreamWriter(System.out, 
StandardCharsets.UTF_8)));
         for (CubeSegment seg : segments) {
             try {
-                new CubeStatsReader(seg, config).print(out);
+                new CubeStatsReader(seg, config, false).print(out);
             } catch (Exception e) {
                 logger.info("CubeStatsReader for Segment {} failed, skip it.", 
seg.getName());
             }
diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index 71d2fd1a6a..f989a68258 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -43,7 +43,7 @@ public class CuboidRecommenderUtil {
             return null;
         }
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, 
segment.getConfig());
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, 
segment.getConfig(), true);
         if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
             logger.info("Cuboid Statistics is not enabled.");
@@ -115,7 +115,7 @@ public class CuboidRecommenderUtil {
             return null;
         }
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, 
segment.getConfig());
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, 
segment.getConfig(), true);
         if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
             logger.info("Cuboid Statistics is not enabled.");
diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index a5fbe2b75b..9cf50f7c1c 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -74,7 +74,7 @@ public class CuboidStatsReaderUtil {
         List<CubeSegment> segmentList = 
cube.getSegments(SegmentStatusEnum.READY);
         Map<Long, Double> sizeMerged = 
Maps.newHashMapWithExpectedSize(statistics.size());
         for (CubeSegment pSegment : segmentList) {
-            CubeStatsReader pReader = new CubeStatsReader(pSegment, null, 
pSegment.getConfig());
+            CubeStatsReader pReader = new CubeStatsReader(pSegment, null, 
pSegment.getConfig(), true);
             Map<Long, Double> pSizeMap = 
CubeStatsReader.getCuboidSizeMapFromRowCount(pSegment, statistics,
                     pReader.sourceRowCount);
             for (Long pCuboid : statistics.keySet()) {
@@ -102,7 +102,7 @@ public class CuboidStatsReaderUtil {
         Map<Long, HLLCounter> cuboidHLLMapMerged = 
Maps.newHashMapWithExpectedSize(cuboidSet.size());
         Map<Long, Double> sizeMapMerged = 
Maps.newHashMapWithExpectedSize(cuboidSet.size());
         for (CubeSegment pSegment : segmentList) {
-            CubeStatsReader pReader = new CubeStatsReader(pSegment, null, 
pSegment.getConfig());
+            CubeStatsReader pReader = new CubeStatsReader(pSegment, null, 
pSegment.getConfig(), true);
             Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowHLLCounters();
             if (pHLLMap == null || pHLLMap.isEmpty()) {
                 logger.info("Cuboid Statistics for segment " + 
pSegment.getName() + " is not enabled.");
@@ -147,7 +147,7 @@ public class CuboidStatsReaderUtil {
             return null;
         }
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
null, cubeSegment.getConfig());
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
null, cubeSegment.getConfig(), true);
         if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
                 || cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
             logger.info("Cuboid Statistics is not enabled.");
diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
index ecde4aa7ba..9a6a550682 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *  
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *  
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -67,7 +67,7 @@ public class MapReduceUtil {
         logger.info("Having per reduce MB " + perReduceInputMB + ", reduce 
count ratio " + reduceCountRatio + ", level "
                 + level);
 
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
cuboidScheduler, kylinConfig);
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, 
cuboidScheduler, kylinConfig, true);
 
         double parentLayerSizeEst, currentLayerSizeEst, 
adjustedCurrentLayerSizeEst;
 
@@ -111,7 +111,7 @@ public class MapReduceUtil {
             throws IOException {
         KylinConfig kylinConfig = cubeSeg.getConfig();
 
-        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, 
cuboidScheduler, kylinConfig).getCuboidSizeMap();
+        Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, 
cuboidScheduler, kylinConfig, true).getCuboidSizeMap();
         double totalSizeInM = 0;
         for (Double cuboidSize : cubeSizeMap.values()) {
             totalSizeInM += cuboidSize;
diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index dc96c56c4a..3974b652a8 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -19,11 +19,11 @@
 package org.apache.kylin.engine.mr.common;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -39,7 +39,7 @@ public class StatisticsDecisionUtil {
     protected static final Logger logger = 
LoggerFactory.getLogger(StatisticsDecisionUtil.class);
 
     public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment 
seg) throws IOException {
-        CubeStatsReader cubeStats = new CubeStatsReader(seg, null, 
seg.getConfig());
+        CubeStatsReader cubeStats = new CubeStatsReader(seg, null, 
seg.getConfig(), true);
         decideCubingAlgorithm(cubingJob, seg, 
cubeStats.getMapperOverlapRatioOfFirstBuild(),
                 cubeStats.getMapperNumberOfFirstBuild());
     }
@@ -95,15 +95,14 @@ public class StatisticsDecisionUtil {
 
     // For triggering cube planner phase one
     public static Map<Long, Long> optimizeCubingPlan(CubeSegment segment) 
throws IOException {
-        if (isAbleToOptimizeCubingPlan(segment)) {
-            logger.info("It's able to trigger cuboid planner algorithm.");
-        } else {
-            return new HashMap<>();
+        if (!isAbleToOptimizeCubingPlan(segment)) {
+            return Maps.newHashMap();
         }
 
+        logger.info("It's able to trigger cuboid planner algorithm.");
         Map<Long, Long> recommendCuboidsWithStats = 
CuboidRecommenderUtil.getRecommendCuboidList(segment);
         if (recommendCuboidsWithStats == null || 
recommendCuboidsWithStats.isEmpty()) {
-            return new HashMap<>();
+            return Maps.newHashMap();
         }
 
         CubeInstance cube = segment.getCubeInstance();
@@ -115,8 +114,9 @@ public class StatisticsDecisionUtil {
 
     public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) {
         CubeInstance cube = segment.getCubeInstance();
-        if (!cube.getConfig().isCubePlannerEnabled())
+        if (!cube.getConfig().isCubePlannerEnabled()) {
             return false;
+        }
 
         if (cube.getSegments(SegmentStatusEnum.READY_PENDING).size() > 0) {
             logger.info("Has read pending segments and will not enable cube 
planner.");
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 838c8233df..d24ecd480c 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -371,16 +371,18 @@ public class CubeManager implements IRealizationProvider {
     }
 
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry, 
boolean isLocal) throws IOException {
-        if (update == null || update.getCubeInstance() == null)
+        if (update == null || update.getCubeInstance() == null) {
             throw new IllegalStateException();
+        }
 
         CubeInstance cube = update.getCubeInstance();
         logger.info("Updating cube instance '{}'", cube.getName());
 
         Segments<CubeSegment> newSegs = (Segments) cube.getSegments().clone();
 
-        if (update.getToAddSegs() != null)
+        if (update.getToAddSegs() != null) {
             newSegs.addAll(Arrays.asList(update.getToAddSegs()));
+        }
 
         List<String> toRemoveResources = Lists.newArrayList();
         if (update.getToRemoveSegs() != null) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 6f54e1eb98..b6d134a079 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.cube;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Collection;
@@ -30,9 +32,12 @@ import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.annotation.Clarification;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
@@ -139,6 +144,12 @@ public class CubeSegment implements IBuildable, ISegment, 
Serializable {
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private Map<String, DimensionRangeInfo> dimensionRangeInfoMap = 
Maps.newHashMap();
 
+    @JsonProperty("cuboid_statics_rows_bytes")
+    private byte[] cuboidStaticsRowsBytes;
+
+    @JsonProperty("cuboid_statics_size_bytes")
+    private byte[] cuboidStaticsSizeBytes;
+
     private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap(); // 
cuboid id ==> base(starting) shard for this cuboid
 
     // lazy init
@@ -690,6 +701,53 @@ public class CubeSegment implements IBuildable, ISegment, 
Serializable {
         copy.dimensionRangeInfoMap = other.dimensionRangeInfoMap == null ? null
                 : Maps.newHashMap(other.dimensionRangeInfoMap);
         copy.binarySignature = other.binarySignature;
+        copy.cuboidStaticsRowsBytes = other.cuboidStaticsRowsBytes;
+        copy.cuboidStaticsSizeBytes = other.cuboidStaticsSizeBytes;
         return copy;
     }
+
+    public void setCuboidStaticsRowsBytes(Map<Long, Long> staticsOfRows) {
+        this.cuboidStaticsRowsBytes = compressedCuboids(staticsOfRows);
+    }
+
+    public void setCuboidStaticsSizeBytes(Map<Long, Long> staticsOfSize) {
+        this.cuboidStaticsSizeBytes = compressedCuboids(staticsOfSize);
+    }
+
+    public Map<Long, Long> getCuboidStaticsRows() {
+        return this.decompressCuboids(this.cuboidStaticsRowsBytes);
+    }
+
+    public Map<Long, Long> getCuboidStaticsSize() {
+        return this.decompressCuboids(this.cuboidStaticsSizeBytes);
+    }
+
+    private Map<Long, Long> decompressCuboids(byte[] cuboidStaticsBytes) {
+        if (cuboidStaticsBytes == null) {
+            return null;
+        }
+        byte[] uncompressed;
+        try {
+            uncompressed = CompressionUtils.decompress(cuboidStaticsBytes);
+            String str = new String(uncompressed, StandardCharsets.UTF_8);
+            TypeReference<Map<Long, Long>> typeRef = new 
TypeReference<Map<Long, Long>>() {};
+            Map<Long, Long> cuboids = JsonUtil.readValue(str, typeRef);
+            return cuboids.isEmpty() ? null : cuboids;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private byte[] compressedCuboids(Map<Long, Long> cuboids) {
+        if (cuboids == null || cuboids.isEmpty()) {
+            return null;
+        }
+
+        try {
+            String str = JsonUtil.writeValueAsString(cuboids);
+            return 
CompressionUtils.compress(str.getBytes(StandardCharsets.UTF_8));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
index a5585f6d89..07e2030f7f 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/DefaultCuboidScheduler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -289,6 +289,7 @@ public class DefaultCuboidScheduler extends CuboidScheduler 
{
      * @param agg agg group
      * @return cuboidId list
      */
+    @Override
     public Set<Long> calculateCuboidsForAggGroup(AggregationGroup agg) {
         Set<Long> cuboidHolder = new HashSet<>();
 
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java 
b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
index 64624cad89..7d71c34388 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -83,10 +83,10 @@ public class CubeDimEncMap implements 
IDimensionEncodingMap, java.io.Serializabl
 
     @Override
     public Dictionary<String> getDictionary(TblColRef col) {
-//        if (seg == null)
+        if (dictionaryMap == null) {
+            return null;
+        }
         return dictionaryMap.get(col);
-//        else
-//            return seg.getDictionary(col);
     }
 
 }
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
 
b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
index f67cf454e7..d6e9532e1e 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorLeaderSelector.java
@@ -109,8 +109,9 @@ public class CuratorLeaderSelector extends 
LeaderSelectorListenerAdapter impleme
             logger.error("Other exception occurred when initialization 
DefaultScheduler:", th);
         } finally {
             logger.warn(this.name + " relinquishing leadership.");
-            if (defaultScheduler != null)
+            if (defaultScheduler != null) {
                 defaultScheduler.shutdown();
+            }
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
index 1a2b7d7e76..a3c17fa71e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java
@@ -109,8 +109,9 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         for (int i = this.size() - 1; i >= 0; i--) {
             T seg = this.get(i);
             if (seg.getLastBuildTime() > 0) {
-                if (latest == null || seg.getLastBuildTime() > 
latest.getLastBuildTime())
+                if (latest == null || seg.getLastBuildTime() > 
latest.getLastBuildTime()) {
                     latest = seg;
+                }
             }
         }
         return latest;
@@ -277,10 +278,10 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
             }
         }
 
-        if (result.size() <= 1)
+        if (result.size() <= 1) {
             return null;
-        else
-            return (Pair<T, T>) Pair.newPair(result.getFirst(), 
result.getLast());
+        }
+        return (Pair<T, T>) Pair.newPair(result.getFirst(), result.getLast());
     }
 
     /**
@@ -295,8 +296,9 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
         if (newSegment != null && !tobe.contains(newSegment)) {
             tobe.add(newSegment);
         }
-        if (tobe.size() == 0)
+        if (tobe.size() == 0) {
             return tobe;
+        }
 
         // sort by source offset
         Collections.sort(tobe);
@@ -468,7 +470,7 @@ public class Segments<T extends ISegment> extends 
ArrayList<T> implements Serial
 
     public Pair<Boolean, Boolean> fitInSegments(ISegment newOne) {
         if (this.isEmpty()) {
-          return Pair.newPair(false, false);        
+          return Pair.newPair(false, false);
         }
 
         ISegment first = this.get(0);
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
index 8a412d5d53..a0598f818f 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
@@ -74,12 +74,13 @@ import java.util.stream.Collectors;
 public class OptimizeBuildJob extends SparkApplication {
     private static final Logger logger = 
LoggerFactory.getLogger(OptimizeBuildJob.class);
 
-    private Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
     protected static String TEMP_DIR_SUFFIX = "_temp";
 
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
     private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
     private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
+    Map<Long, Long> cuboidIdToPreciseRows;
+    Map<Long, Long> cuboidIdToPreciseSize;
 
     private Configuration conf = HadoopUtil.getCurrentConfiguration();
     private CubeManager cubeManager;
@@ -105,6 +106,9 @@ public class OptimizeBuildJob extends SparkApplication {
         optSeg = cubeInstance.getSegmentById(segmentId);
         originalSeg = cubeInstance.getOriginalSegmentToOptimize(optSeg);
         originalSegInfo = ManagerHub.getSegmentInfo(config, cubeId, 
originalSeg.getUuid());
+        // copy the original statistics
+        cuboidIdToPreciseSize = originalSeg.getCuboidStaticsSize();
+        cuboidIdToPreciseRows = originalSeg.getCuboidStaticsRows();
 
         calculateCuboidFromBaseCuboid();
         buildCuboidFromParent(cubeId);
@@ -150,10 +154,11 @@ public class OptimizeBuildJob extends SparkApplication {
         SpanningTree spanningTree;
         ParentSourceChooser sourceChooser;
         try {
-            spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts()));
+            Collection<LayoutEntity> cuboids = 
JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts());
+            spanningTree = new ForestSpanningTree(cuboids);
             logger.info("There are {} cuboids to be built in segment {}.", 
optSegInfo.toBuildLayouts().size(),
                     optSegInfo.name());
-            for (LayoutEntity cuboid : 
JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())) {
+            for (LayoutEntity cuboid : cuboids) {
                 logger.debug("Cuboid {} has row keys: {}", cuboid.getId(),
                         Joiner.on(", 
").join(cuboid.getOrderedDimensions().keySet()));
             }
@@ -205,6 +210,10 @@ public class OptimizeBuildJob extends SparkApplication {
         }
         optSeg.setCuboidShardNums(cuboidShardNum);
         optSeg.setInputRecordsSize(originalSeg.getInputRecordsSize());
+
+        optSeg.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize);
+        optSeg.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows);
+
         Map<String, String> additionalInfo = optSeg.getAdditionalInfo();
         additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
         optSeg.setAdditionalInfo(additionalInfo);
@@ -407,6 +416,8 @@ public class OptimizeBuildJob extends SparkApplication {
         cuboidShardNum.put(layoutId, (short) shardNum);
         JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
         BuildUtils.fillCuboidInfo(layout, path);
+        cuboidIdToPreciseRows.put(layoutId, layout.getRows());
+        cuboidIdToPreciseSize.put(layoutId, layout.getByteSize());
     }
 
     private void updateExistingLayout(LayoutEntity layout, long parentId) 
throws IOException {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index 86e256f4ac..11291b3698 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -112,8 +112,8 @@ public class UpdateMetadataUtil {
             toUpdateSeg.getSnapshots().putAll(origSeg.getSnapshots());
             toUpdateSeg.getRowkeyStats().addAll(origSeg.getRowkeyStats());
 
-            CubeStatsReader optSegStatsReader = new 
CubeStatsReader(toUpdateSeg, config);
-            CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, 
config);
+            CubeStatsReader optSegStatsReader = new 
CubeStatsReader(toUpdateSeg, config, true);
+            CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, 
config, true);
             Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
             if (origSegStatsReader.getCuboidRowHLLCounters() == null) {
                 logger.warn(
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
index c2b59e7e58..e142f73163 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
@@ -85,11 +85,10 @@ public class CubeMergeAssist implements Serializable {
 
             if (mergeDataset == null) {
                 mergeDataset = layoutDataset;
-            } else
+            } else {
                 mergeDataset = mergeDataset.union(layoutDataset);
+            }
         }
         return mergeDataset;
-
     }
-
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index a5f0e11403..f7b528f27c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -89,6 +89,8 @@ public class CubeBuildJob extends SparkApplication {
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
     private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
     private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
+    private Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap();
+    private Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap();
     private Map<Long, Long> recommendCuboidMap = new HashMap<>();
 
     public static void main(String[] args) {
@@ -124,7 +126,7 @@ public class CubeBuildJob extends SparkApplication {
             long startMills = System.currentTimeMillis();
             spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(statisticsSeg.toBuildLayouts()));
             sourceChooser = new ParentSourceChooser(spanningTree, 
statisticsSeg, newSegment, jobId, ss, config, false);
-            sourceChooser.setNeedStatistics();
+            sourceChooser.toStatistics();
             sourceChooser.decideFlatTableSource(null);
             Map<Long, HLLCounter> hllMap = new HashMap<>();
             for (Tuple2<Object, AggInfo> cuboidData : sourceChooser.aggInfo()) 
{
@@ -231,6 +233,8 @@ public class CubeBuildJob extends SparkApplication {
                 long diff = (layoutEntity.getRows() - 
recommendCuboidMap.get(layoutEntity.getId()));
                 deviation = diff / (layoutEntity.getRows() + 0.0d);
             }
+
+            collectPreciseStatics(cuboidIdToPreciseRows, 
cuboidIdToPreciseSize, layoutEntity);
             cuboidStatics.add(String.format(Locale.getDefault(), template, 
layoutEntity.getId(),
                     layoutEntity.getRows(), layoutEntity.getByteSize(), 
deviation));
         }
@@ -258,6 +262,8 @@ public class CubeBuildJob extends SparkApplication {
         segment.setInputRecords(sourceRowCount);
         segment.setSnapshots(new 
ConcurrentHashMap<>(segmentInfo.getSnapShot2JavaMap()));
         segment.setCuboidShardNums(cuboidShardNum);
+        segment.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows);
+        segment.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize);
         Map<String, String> additionalInfo = segment.getAdditionalInfo();
         additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
         segment.setAdditionalInfo(additionalInfo);
@@ -266,6 +272,13 @@ public class CubeBuildJob extends SparkApplication {
         cubeManager.updateCube(update, true);
     }
 
+    private void collectPreciseStatics(Map<Long, Long> cuboidIdToPreciseRows, 
Map<Long, Long> cuboidIdToPreciseSize,
+                                       LayoutEntity layoutEntity) {
+        // Hold the precise statics of cuboid in segment
+        cuboidIdToPreciseRows.put(layoutEntity.getId(), 
layoutEntity.getRows());
+        cuboidIdToPreciseSize.put(layoutEntity.getId(), 
layoutEntity.getByteSize());
+    }
+
     private void collectPersistedTablePath(List<String> persistedFlatTable, 
ParentSourceChooser sourceChooser) {
         String flatTablePath = sourceChooser.persistFlatTableIfNecessary();
         if (!flatTablePath.isEmpty()) {
@@ -279,8 +292,9 @@ public class CubeBuildJob extends SparkApplication {
         CubeInstance cubeCopy = cubeInstance.latestCopyForWrite();
         CubeUpdate update = new CubeUpdate(cubeCopy);
 
-        if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty())
+        if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty()) {
             update.setCuboids(recommendCuboidMap);
+        }
 
         List<CubeSegment> cubeSegments = Lists.newArrayList();
         for (Map.Entry<String, Object> entry : 
toUpdateSegmentSourceSize.entrySet()) {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 16fecd385c..6d7e10e1df 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -61,6 +61,8 @@ public class CubeMergeJob extends SparkApplication {
     private List<CubeSegment> mergingSegments = Lists.newArrayList();
     private List<SegmentInfo> mergingSegInfos = Lists.newArrayList();
     private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
+    Map<Long, Long> cuboidIdToPreciseRows = Maps.newConcurrentMap();
+    Map<Long, Long> cuboidIdToPreciseSize = Maps.newConcurrentMap();
 
     @Override
     protected void doExecute() throws Exception {
@@ -134,7 +136,6 @@ public class CubeMergeJob extends SparkApplication {
             for (int i = 0; i < cuboids.size(); i++) {
                 LayoutEntity cuboid = cuboids.apply(i);
                 long layoutId = cuboid.getId();
-
                 CubeMergeAssist assist = mergeCuboidsAssist.get(layoutId);
                 if (assist == null) {
                     assist = new CubeMergeAssist();
@@ -191,7 +192,8 @@ public class CubeMergeJob extends SparkApplication {
         JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
 
         BuildUtils.fillCuboidInfo(layout, path);
-
+        cuboidIdToPreciseSize.put(layoutId, layout.getByteSize());
+        cuboidIdToPreciseRows.put(layoutId, layout.getRows());
         return layout;
     }
 
@@ -202,9 +204,9 @@ public class CubeMergeJob extends SparkApplication {
 
         List<CubeSegment> cubeSegments = Lists.newArrayList();
         CubeSegment segment = cubeCopy.getSegmentById(segmentId);
-        long totalSourceSize = 0l;
-        long totalInputRecords = 0l;
-        long totalInputRecordsSize = 0l;
+        long totalSourceSize = 0L;
+        long totalInputRecords = 0L;
+        long totalInputRecordsSize = 0L;
         for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
             totalSourceSize += assist.getLayout().getByteSize();
         }
@@ -221,6 +223,8 @@ public class CubeMergeJob extends SparkApplication {
         Map<String, String> additionalInfo = segment.getAdditionalInfo();
         additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
         segment.setAdditionalInfo(additionalInfo);
+        segment.setCuboidStaticsRowsBytes(cuboidIdToPreciseRows);
+        segment.setCuboidStaticsSizeBytes(cuboidIdToPreciseSize);
         cubeSegments.add(segment);
         update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
         cubeManager.updateCube(update, true);
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 0f77c768a9..a82a82144d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -50,15 +50,20 @@ class ParentSourceChooser(
   // build from flatTable.
   var flatTableSource: NBuildSourceInfo = _
 
-  private var needStatistics = false
+  private var needStatistics: Boolean = false
 
   //TODO: MetadataConverter don't have getCubeDesc() now
 
   /*val flatTableDesc = new CubeJoinedFlatTableDesc(
     MetadataConverter.getCubeDesc(segInfo.getCube),
     ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree))*/
-  def setNeedStatistics(): Unit =
-    needStatistics = true
+  def toStatistics(): Unit = {
+    this.needStatistics = true
+  }
+
+  def cancelStatistics(): Unit = {
+    this.needStatistics = false
+  }
 
   def getAggInfo : Array[(Long, AggInfo)] = aggInfo
 
diff --git 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
index 5e5789c3eb..6fdea85947 100644
--- 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
+++ 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
@@ -110,7 +110,7 @@ public class NOptimizeJobTest extends 
LocalWithSparkSessionTest {
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
         for (CubeSegment segment : cube.getSegments()) {
             Assert.assertEquals(SegmentStatusEnum.READY, segment.getStatus());
-            CubeStatsReader segStatsReader = new CubeStatsReader(segment, 
config);
+            CubeStatsReader segStatsReader = new CubeStatsReader(segment, 
config, true);
             Assert.assertEquals(recommendCuboids, 
segStatsReader.getCuboidRowHLLCounters().keySet());
             String cuboidPath = PathManager.getSegmentParquetStoragePath(cube, 
segment.getName(), segment.getStorageLocationIdentifier());
             Assert.assertTrue(fs.exists(new Path(cuboidPath)));

Reply via email to