APACHE-KYLIN-2866: refine reading statistics file by using 
CubeStatsReader.CubeStatsResult

Signed-off-by: lidongsjtu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ebafc766
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ebafc766
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ebafc766

Branch: refs/heads/master
Commit: ebafc766e613adf755675c574a76b1a4f8658346
Parents: fba9d1d
Author: Zhong <[email protected]>
Authored: Wed Dec 20 11:13:02 2017 +0800
Committer: lidongsjtu <[email protected]>
Committed: Wed Dec 20 23:20:11 2017 +0800

----------------------------------------------------------------------
 .../kylin/engine/mr/common/CubeStatsReader.java | 26 ++++++--
 .../engine/mr/steps/SaveStatisticsStep.java     | 65 +++++++-------------
 2 files changed, 42 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/ebafc766/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 3d7d542..d06c22b 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -93,8 +93,7 @@ public class CubeStatsReader {
         File tmpSeqFile = 
writeTmpSeqFile(store.getResource(statsKey).inputStream);
         Path path = new Path(HadoopUtil.fixWindowsPath("file://" + 
tmpSeqFile.getAbsolutePath()));
 
-        CubeStatsResult cubeStatsResult = new CubeStatsResult();
-        cubeStatsResult.initialize(path, 
kylinConfig.getCubeStatsHLLPrecision());
+        CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
         tmpSeqFile.delete();
 
         this.seg = cubeSegment;
@@ -117,8 +116,7 @@ public class CubeStatsReader {
      */
     public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler 
cuboidScheduler, KylinConfig kylinConfig, Path path)
             throws IOException {
-        CubeStatsResult cubeStatsResult = new CubeStatsResult();
-        cubeStatsResult.initialize(path, 
kylinConfig.getCubeStatsHLLPrecision());
+        CubeStatsResult cubeStatsResult = new CubeStatsResult(path, 
kylinConfig.getCubeStatsHLLPrecision());
 
         this.seg = cubeSegment;
         this.cuboidScheduler = cuboidScheduler;
@@ -331,13 +329,13 @@ public class CubeStatsReader {
         return new DecimalFormat("#.##").format(input);
     }
 
-    private class CubeStatsResult {
+    public static class CubeStatsResult {
         private int percentage = 100;
         private double mapperOverlapRatio = 0;
         private int mapperNumber = 0;
         Map<Long, HLLCounter> counterMap = Maps.newHashMap();
 
-        void initialize(Path path, int precision) throws IOException {
+        public CubeStatsResult(Path path, int precision) throws IOException {
             Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
             Option seqInput = SequenceFile.Reader.file(path);
             try (Reader reader = new SequenceFile.Reader(hadoopConf, 
seqInput)) {
@@ -359,6 +357,22 @@ public class CubeStatsReader {
                 }
             }
         }
+
+        public int getPercentage() {
+            return percentage;
+        }
+
+        public double getMapperOverlapRatio() {
+            return mapperOverlapRatio;
+        }
+
+        public int getMapperNumber() {
+            return mapperNumber;
+        }
+
+        public Map<Long, HLLCounter> getCounterMap() {
+            return Collections.unmodifiableMap(counterMap);
+        }
     }
 
     public static void main(String[] args) throws IOException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/ebafc766/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
index f69bf67..99ebbef 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
@@ -27,19 +27,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CubeStatsWriter;
 import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -86,46 +81,30 @@ public class SaveStatisticsStep extends AbstractExecutable {
             int samplingPercentage = -1;
             int mapperNumber = -1;
             for (Path item : statisticsFiles) {
-                int pSamplingPercentage = 0;
-                double pMapperOverlapRatio = 0;
-                int pMapperNumber = 0;
-                long pGrantTotal = 0;
-                try (SequenceFile.Reader reader = new 
SequenceFile.Reader(hadoopConf, SequenceFile.Reader.file(item))) {
-                    LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
-                    BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(),
-                            hadoopConf);
-                    while (reader.next(key, value)) {
-                        if (key.get() == 0L) {
-                            pSamplingPercentage = 
Bytes.toInt(value.getBytes());
-                        } else if (key.get() == -1L) {
-                            pMapperOverlapRatio = 
Bytes.toDouble(value.getBytes());
-                        } else if (key.get() == -2L) {
-                            pMapperNumber = Bytes.toInt(value.getBytes());
-                        } else {
-                            HLLCounter hll = new 
HLLCounter(kylinConf.getCubeStatsHLLPrecision());
-                            ByteArray byteArray = new 
ByteArray(value.getBytes());
-                            hll.readRegisters(byteArray.asBuffer());
-                            cuboidHLLMap.put(key.get(), hll);
-                            pGrantTotal += hll.getCountEstimate();
-                        }
-                    }
-                    totalRowsBeforeMerge += pGrantTotal * pMapperOverlapRatio;
-                    grantTotal += pGrantTotal;
-                    if (pMapperNumber > 0) {
-                        if (mapperNumber < 0) {
-                            mapperNumber = pMapperNumber;
-                        } else {
-                            throw new RuntimeException(
-                                    "Base cuboid has been distributed to 
multiple reducers at step FactDistinctColumnsReducer!!!");
-                        }
-                    }
-                    if (samplingPercentage < 0) {
-                        samplingPercentage = pSamplingPercentage;
-                    } else if (samplingPercentage != pSamplingPercentage) {
+                CubeStatsReader.CubeStatsResult cubeStatsResult = new 
CubeStatsReader.CubeStatsResult(item,
+                        kylinConf.getCubeStatsHLLPrecision());
+                long pGrantTotal = 0L;
+                for (HLLCounter hll : 
cubeStatsResult.getCounterMap().values()) {
+                    pGrantTotal += hll.getCountEstimate();
+                }
+                totalRowsBeforeMerge += pGrantTotal * 
cubeStatsResult.getMapperOverlapRatio();
+                grantTotal += pGrantTotal;
+                int pMapperNumber = cubeStatsResult.getMapperNumber();
+                if (pMapperNumber > 0) {
+                    if (mapperNumber < 0) {
+                        mapperNumber = pMapperNumber;
+                    } else {
                         throw new RuntimeException(
-                                "The sampling percentage should be same among 
all of the reducer of FactDistinctColumnsReducer!!!");
+                                "Base cuboid has been distributed to multiple 
reducers at step FactDistinctColumnsReducer!!!");
                     }
                 }
+                int pSamplingPercentage = cubeStatsResult.getPercentage();
+                if (samplingPercentage < 0) {
+                    samplingPercentage = pSamplingPercentage;
+                } else if (samplingPercentage != pSamplingPercentage) {
+                    throw new RuntimeException(
+                            "The sampling percentage should be same among all 
of the reducer of FactDistinctColumnsReducer!!!");
+                }
             }
             if (samplingPercentage < 0) {
                 logger.warn("The sampling percentage should be set!!!");

Reply via email to