APACHE-KYLIN-2866: refine reading statistics file by using CubeStatsReader.CubeStatsResult
Signed-off-by: lidongsjtu <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ebafc766 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ebafc766 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ebafc766 Branch: refs/heads/master Commit: ebafc766e613adf755675c574a76b1a4f8658346 Parents: fba9d1d Author: Zhong <[email protected]> Authored: Wed Dec 20 11:13:02 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Wed Dec 20 23:20:11 2017 +0800 ---------------------------------------------------------------------- .../kylin/engine/mr/common/CubeStatsReader.java | 26 ++++++-- .../engine/mr/steps/SaveStatisticsStep.java | 65 +++++++------------- 2 files changed, 42 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ebafc766/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 3d7d542..d06c22b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -93,8 +93,7 @@ public class CubeStatsReader { File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); - CubeStatsResult cubeStatsResult = new CubeStatsResult(); - cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision()); + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); tmpSeqFile.delete(); this.seg = cubeSegment; @@ -117,8 +116,7 @@ public class CubeStatsReader { */ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path) throws IOException { - CubeStatsResult cubeStatsResult = new CubeStatsResult(); - cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision()); + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); this.seg = cubeSegment; this.cuboidScheduler = cuboidScheduler; @@ -331,13 +329,13 @@ public class CubeStatsReader { return new DecimalFormat("#.##").format(input); } - private class CubeStatsResult { + public static class CubeStatsResult { private int percentage = 100; private double mapperOverlapRatio = 0; private int mapperNumber = 0; Map<Long, HLLCounter> counterMap = Maps.newHashMap(); - void initialize(Path path, int precision) throws IOException { + public CubeStatsResult(Path path, int precision) throws IOException { Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); Option seqInput = SequenceFile.Reader.file(path); try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) { @@ -359,6 +357,22 @@ public class CubeStatsReader { } } } + + public int getPercentage() { + return percentage; + } + + public double getMapperOverlapRatio() { + return mapperOverlapRatio; + } + + public int getMapperNumber() { + return mapperNumber; + } + + public Map<Long, HLLCounter> getCounterMap() { + return Collections.unmodifiableMap(counterMap); + } } public static void main(String[] args) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/ebafc766/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index f69bf67..99ebbef 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -27,19 +27,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.common.util.ByteArray; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.mr.common.CubeStatsWriter; import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.job.exception.ExecuteException; @@ -86,46 +81,30 @@ public class SaveStatisticsStep extends AbstractExecutable { int samplingPercentage = -1; int mapperNumber = -1; for (Path item : statisticsFiles) { - int pSamplingPercentage = 0; - double pMapperOverlapRatio = 0; - int pMapperNumber = 0; - long pGrantTotal = 0; - try (SequenceFile.Reader reader = new SequenceFile.Reader(hadoopConf, SequenceFile.Reader.file(item))) { - LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); - BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), - hadoopConf); - while (reader.next(key, value)) { - if (key.get() == 0L) { - pSamplingPercentage = Bytes.toInt(value.getBytes()); - } else if (key.get() == -1L) { - pMapperOverlapRatio = Bytes.toDouble(value.getBytes()); - } else if (key.get() == -2L) { - pMapperNumber = Bytes.toInt(value.getBytes()); - } else { - HLLCounter hll = new HLLCounter(kylinConf.getCubeStatsHLLPrecision()); - ByteArray byteArray = new ByteArray(value.getBytes()); - hll.readRegisters(byteArray.asBuffer()); - cuboidHLLMap.put(key.get(), hll); - pGrantTotal += hll.getCountEstimate(); - } - } - totalRowsBeforeMerge += pGrantTotal * pMapperOverlapRatio; - grantTotal += pGrantTotal; - if (pMapperNumber > 0) { - if (mapperNumber < 0) { - mapperNumber = pMapperNumber; - } else { - throw new RuntimeException( - "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); - } - } - if (samplingPercentage < 0) { - samplingPercentage = pSamplingPercentage; - } else if (samplingPercentage != pSamplingPercentage) { + CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item, + kylinConf.getCubeStatsHLLPrecision()); + long pGrantTotal = 0L; + for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) { + pGrantTotal += hll.getCountEstimate(); + } + totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio(); + grantTotal += pGrantTotal; + int pMapperNumber = cubeStatsResult.getMapperNumber(); + if (pMapperNumber > 0) { + if (mapperNumber < 0) { + mapperNumber = pMapperNumber; + } else { throw new RuntimeException( - "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + "Base cuboid has been distributed to multiple reducers at step FactDistinctColumnsReducer!!!"); } } + int pSamplingPercentage = cubeStatsResult.getPercentage(); + if (samplingPercentage < 0) { + samplingPercentage = pSamplingPercentage; + } else if (samplingPercentage != pSamplingPercentage) { + throw new RuntimeException( + "The sampling percentage should be same among all of the reducer of FactDistinctColumnsReducer!!!"); + } } if (samplingPercentage < 0) { logger.warn("The sampling percentage should be set!!!");
