This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 1cd0026 KYLIN-3453 Improve cube size estimation for topn,count distinct 1cd0026 is described below commit 1cd00269a63f51a011f93effe6fc982d3e614459 Author: chao long <wayn...@qq.com> AuthorDate: Thu Jul 26 11:46:25 2018 +0800 KYLIN-3453 Improve cube size estimation for topn,count distinct --- .../org/apache/kylin/common/KylinConfigBase.java | 16 +++++---- .../kylin/measure/bitmap/BitmapSerializer.java | 14 ++++++++ .../apache/kylin/measure/hllc/HLLCSerializer.java | 12 +++++++ .../org/apache/kylin/measure/hllc/HLLCounter.java | 4 +-- .../kylin/measure/topn/TopNCounterSerializer.java | 17 ++++++++-- .../kylin/engine/mr/common/CubeStatsReader.java | 39 +++++++++++++++++----- .../kylin/engine/mr/common/CubeStatsWriter.java | 16 +++++---- .../kylin/engine/mr/steps/SaveStatisticsStep.java | 9 ++--- 8 files changed, 99 insertions(+), 28 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index b2331e1..43e7d62 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -426,7 +426,6 @@ abstract public class KylinConfigBase implements Serializable { return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200")); } - // ============================================================================ // CUBE // ============================================================================ @@ -449,7 +448,11 @@ abstract public class KylinConfigBase implements Serializable { } public double getJobCuboidSizeCountDistinctRatio() { - return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.05")); + return Double.parseDouble(getOptional("kylin.cube.size-estimate-countdistinct-ratio", "0.5")); + } + + public double getJobCuboidSizeTopNRatio() { + return Double.parseDouble(getOptional("kylin.cube.size-estimate-topn-ratio", "0.5")); } public String getCubeAlgorithm() { @@ -876,7 +879,7 @@ abstract public class KylinConfigBase implements Serializable { public Map<String, String> getSqoopConfigOverride() { return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override."); } - + public String getJdbcSourceFieldDelimiter() { return getOptional("kylin.source.jdbc.field-delimiter", "|"); } @@ -1227,11 +1230,11 @@ abstract public class KylinConfigBase implements Serializable { public Boolean isEnumerableRulesEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.calcite.enumerable-rules-enabled", "false")); } - + public boolean isReduceExpressionsRulesEnabled() { return Boolean.parseBoolean(getOptional("kylin.query.calcite.reduce-rules-enabled", "true")); } - + public boolean isConvertCreateTableToWith() { return Boolean.valueOf(getOptional("kylin.query.convert-create-table-to-with", "false")); } @@ -1332,12 +1335,13 @@ abstract public class KylinConfigBase implements Serializable { public int getBadQueryDefaultAlertingSeconds() { return Integer.parseInt(getOptional("kylin.query.badquery-alerting-seconds", "90")); } + public double getBadQueryDefaultAlertingCoefficient() { return Double.parseDouble(getOptional("kylin.query.timeout-seconds-coefficient", "0.5")); } public int getBadQueryDefaultDetectIntervalSeconds() { - int time =(int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout + int time = (int) (getQueryTimeoutSeconds() * getBadQueryDefaultAlertingCoefficient()); // half of query timeout if (time == 0) { time = 60; // 60 sec } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java index 1c13876..29a25e9 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapSerializer.java @@ -30,6 +30,7 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { private static final int IS_RESULT_FLAG = 1; private static final int RESULT_SIZE = 12; + private static final int DEFAULT_MAX_SIZE = 1024; // called by reflection public BitmapSerializer(DataType type) { @@ -86,6 +87,19 @@ public class BitmapSerializer extends DataTypeSerializer<BitmapCounter> { } @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + // MappeableArrayContainer DEFAULT_MAX_SIZE = 4096 + if (averageNumOfElementsInCounter < DEFAULT_MAX_SIZE) { + // 8 = 4 + 4 for SERIAL_COOKIE_NO_RUNCONTAINER + size + // size * 8 = 2 * size + 2 * size + 4 * size as keys + values Cardinality + startOffsets + // size * 8 for values array + return 8 + averageNumOfElementsInCounter * 16; + } else { + return getStorageBytesEstimate(); + } + } + + @Override public boolean supportDirectReturnResult() { return true; } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 98bc5cf..9310864 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -80,4 +80,16 @@ public class HLLCSerializer extends DataTypeSerializer<HLLCounter> { return current().maxLength(); } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + int registerIndexSize = current().getRegisterIndexSize(); + int m = 1 << precision; + if (!current().isDense((int) averageNumOfElementsInCounter) + || averageNumOfElementsInCounter < (m - 5) / (1 + registerIndexSize)) { + // 5 = 1 + 4 for scheme and size + // size * (getRegisterIndexSize + 1) + return 5 + averageNumOfElementsInCounter * (registerIndexSize + 1); + } + return getStorageBytesEstimate(); + } } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java index b793465..80bbb2a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -78,7 +78,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { } } - private boolean isDense(int size) { + public boolean isDense(int size) { double over = OVERFLOW_FACTOR * m; return size > (int) over; } @@ -358,7 +358,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { return 1 + m; } - private int getRegisterIndexSize() { + public int getRegisterIndexSize() { return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17 } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java index 77a69cf..eff510f 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java @@ -61,12 +61,12 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr @Override public int maxLength() { - return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8), 1024 * 1024); // use at least 1M + return Math.max(precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(), 1024 * 1024); // use at least 1M } @Override public int getStorageBytesEstimate() { - return precision * TopNCounter.EXTRA_SPACE_RATE * (scale + 8); + return precision * TopNCounter.EXTRA_SPACE_RATE * storageBytesEstimatePerCounter(); } @Override @@ -107,4 +107,17 @@ public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteAr return counter; } + @Override + protected double getStorageBytesEstimate(double averageNumOfElementsInCounter) { + if (averageNumOfElementsInCounter < precision * TopNCounter.EXTRA_SPACE_RATE) { + return averageNumOfElementsInCounter * storageBytesEstimatePerCounter() + 12; + } else { + return getStorageBytesEstimate(); + } + } + + private int storageBytesEstimatePerCounter() { + return (scale + 8); + } + } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 3c054a3..6b8934a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -55,6 +55,7 @@ import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.measure.hllc.HLLCounter; +import org.apache.kylin.measure.topn.TopNMeasureType; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; @@ -79,6 +80,7 @@ public class CubeStatsReader { final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge final Map<Long, HLLCounter> cuboidRowEstimatesHLL; final CuboidScheduler cuboidScheduler; + final long sourceRowCount; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig); @@ -94,7 +96,7 @@ public class CubeStatsReader { RawResource resource = store.getResource(statsKey); if (resource == null) throw new IllegalStateException("Missing resource at " + statsKey); - + File tmpSeqFile = writeTmpSeqFile(resource.inputStream); Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); @@ -107,6 +109,7 @@ public class CubeStatsReader { this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); } /** @@ -129,6 +132,7 @@ public class CubeStatsReader { this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + this.sourceRowCount = cubeStatsResult.getSourceRecordCount(); } private File writeTmpSeqFile(InputStream inputStream) throws IOException { @@ -158,7 +162,7 @@ public class CubeStatsReader { // return map of Cuboid ID => MB public Map<Long, Double> getCuboidSizeMap() { - return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL()); + return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL(), sourceRowCount); } public double estimateCubeSize() { @@ -184,7 +188,8 @@ public class CubeStatsReader { return cuboidRowCountMap; } - public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap) { + public static Map<Long, Double> getCuboidSizeMapFromRowCount(CubeSegment cubeSegment, Map<Long, Long> rowCountMap, + long sourceRowCount) { final CubeDesc cubeDesc = cubeSegment.getCubeDesc(); final List<Integer> rowkeyColumnSize = Lists.newArrayList(); final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); @@ -199,7 +204,7 @@ public class CubeStatsReader { Map<Long, Double> sizeMap = Maps.newHashMap(); for (Map.Entry<Long, Long> entry : rowCountMap.entrySet()) { sizeMap.put(entry.getKey(), estimateCuboidStorageSize(cubeSegment, entry.getKey(), entry.getValue(), - baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize)); + baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount)); } return sizeMap; } @@ -210,7 +215,7 @@ public class CubeStatsReader { * @return the cuboid size in M bytes */ private static double estimateCuboidStorageSize(CubeSegment cubeSegment, long cuboidId, long rowCount, - long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength) { + long baseCuboidId, long baseCuboidCount, List<Integer> rowKeyColumnLength, long sourceRowCount) { int rowkeyLength = cubeSegment.getRowKeyPreambleSize(); KylinConfig kylinConf = cubeSegment.getConfig(); @@ -228,12 +233,21 @@ public class CubeStatsReader { int normalSpace = rowkeyLength; int countDistinctSpace = 0; double percentileSpace = 0; + int topNSpace = 0; for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) { + if (rowCount == 0) + break; DataType returnType = measureDesc.getFunction().getReturnDataType(); if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_COUNT_DISTINCT)) { - countDistinctSpace += returnType.getStorageBytesEstimate(); + long estimateDistinctCount = sourceRowCount / rowCount; + estimateDistinctCount = estimateDistinctCount == 0 ? 1L : estimateDistinctCount; + countDistinctSpace += returnType.getStorageBytesEstimate(estimateDistinctCount); } else if (measureDesc.getFunction().getExpression().equals(FunctionDesc.FUNC_PERCENTILE)) { percentileSpace += returnType.getStorageBytesEstimate(baseCuboidCount * 1.0 / rowCount); + } else if (measureDesc.getFunction().getExpression().equals(TopNMeasureType.FUNC_TOP_N)) { + long estimateTopNCount = sourceRowCount / rowCount; + estimateTopNCount = estimateTopNCount == 0 ? 1L : estimateTopNCount; + topNSpace += returnType.getStorageBytesEstimate(estimateTopNCount); } else { normalSpace += returnType.getStorageBytesEstimate(); } @@ -241,9 +255,11 @@ public class CubeStatsReader { double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio(); double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio(); + double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio(); + double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio - + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount) - / (1024L * 1024L); + + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount + + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L); return ret; } @@ -351,6 +367,7 @@ public class CubeStatsReader { public static class CubeStatsResult { private int percentage = 100; private double mapperOverlapRatio = 0; + private long sourceRecordCount = 0; private int mapperNumber = 0; private Map<Long, HLLCounter> counterMap = Maps.newHashMap(); @@ -367,6 +384,8 @@ public class CubeStatsReader { mapperOverlapRatio = Bytes.toDouble(value.getBytes()); } else if (key.get() == -2) { mapperNumber = Bytes.toInt(value.getBytes()); + } else if (key.get() == -3) { + sourceRecordCount = Bytes.toLong(value.getBytes()); } else if (key.get() > 0) { HLLCounter hll = new HLLCounter(precision); ByteArray byteArray = new ByteArray(value.getBytes()); @@ -392,6 +411,10 @@ public class CubeStatsReader { public Map<Long, HLLCounter> getCounterMap() { return Collections.unmodifiableMap(counterMap); } + + public long getSourceRecordCount() { + return sourceRecordCount; + } } public static void main(String[] args) throws IOException { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java index f50a4be..c3d6042 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsWriter.java @@ -40,14 +40,15 @@ public class CubeStatsWriter { public static void writeCuboidStatistics(Configuration conf, Path outputPath, // Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage) throws IOException { - writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0); + writeCuboidStatistics(conf, outputPath, cuboidHLLMap, samplingPercentage, 0, 0, 0); } public static void writeCuboidStatistics(Configuration conf, Path outputPath, // - Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) throws IOException { + Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + long sourceRecordCoun) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, - mapperOverlapRatio); + mapperOverlapRatio, sourceRecordCoun); } //Be care of that the file name for partial cuboid statistics should start with BatchConstants.CFG_OUTPUT_STATISTICS, @@ -57,12 +58,12 @@ public class CubeStatsWriter { int shard) throws IOException { Path seqFilePath = new Path(outputPath, BatchConstants.CFG_OUTPUT_STATISTICS + "_" + shard); writeCuboidStatisticsInner(conf, seqFilePath, cuboidHLLMap, samplingPercentage, mapperNumber, - mapperOverlapRatio); + mapperOverlapRatio, 0); } private static void writeCuboidStatisticsInner(Configuration conf, Path outputFilePath, // - Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio) - throws IOException { + Map<Long, HLLCounter> cuboidHLLMap, int samplingPercentage, int mapperNumber, double mapperOverlapRatio, + long sourceRecordCount) throws IOException { List<Long> allCuboids = Lists.newArrayList(); allCuboids.addAll(cuboidHLLMap.keySet()); Collections.sort(allCuboids); @@ -80,6 +81,9 @@ public class CubeStatsWriter { // sampling percentage at key 0 writer.append(new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage))); + // flat table source_count at key -3 + writer.append(new LongWritable(-3), new BytesWritable(Bytes.toBytes(sourceRecordCount))); + for (long i : allCuboids) { valueBuf.clear(); cuboidHLLMap.get(i).writeRegisters(valueBuf); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index b532360..1f79539 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -122,12 +122,15 @@ public class SaveStatisticsStep extends AbstractExecutable { totalRowsBeforeMerge); } double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal; + CubingJob cubingJob = (CubingJob) getManager() + .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + long sourceRecordCount = cubingJob.findSourceRecordCount(); CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage, - mapperNumber, mapperOverlapRatio); + mapperNumber, mapperOverlapRatio, sourceRecordCount); Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME); logger.info(newSegment + " stats saved to hdfs " + statisticsFile); - + FSDataInputStream is = fs.open(statisticsFile); try { // put the statistics to metadata store @@ -135,8 +138,6 @@ public class SaveStatisticsStep extends AbstractExecutable { rs.putResource(resPath, is, System.currentTimeMillis()); logger.info(newSegment + " stats saved to resource " + resPath); - CubingJob cubingJob = (CubingJob) getManager() - .getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment); StatisticsDecisionUtil.optimizeCubingPlan(newSegment); } finally {