This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d8e5db8457bf5bbd1ae05b9360c5ae48038b57fb Author: XiaoxiangYu <x...@apache.org> AuthorDate: Mon Dec 7 21:30:43 2020 +0800 KYLIN-4818 Performance profile for CuboidStatisticsJob --- .../kylin/engine/mr/common/CubeStatsReader.java | 11 +++++--- .../engine/mr/common/CuboidRecommenderUtil.java | 6 ++-- .../org/apache/kylin/common/KylinConfigBase.java | 20 ++++++++++++-- .../src/main/resources/kylin-defaults.properties | 2 +- .../cube/cuboid/algorithm/CuboidRecommender.java | 3 +- .../cuboid/algorithm/greedy/GreedyAlgorithm.java | 16 +++++------ .../org/apache/kylin/cube/kv/CubeDimEncMap.java | 2 ++ .../kylin/engine/spark/job/NSparkExecutable.java | 7 +++-- .../kylin/engine/spark/job/CubeBuildJob.java | 6 +++- .../engine/spark/job/CuboidStatisticsJob.scala | 32 +++++++++++++++++++--- 10 files changed, 80 insertions(+), 25 deletions(-) diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index e63fc1a..1a1dd11 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -171,7 +171,6 @@ public class CubeStatsReader { return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage); } - // return map of Cuboid ID => MB public Map<Long, Double> getCuboidSizeMap() { return getCuboidSizeMap(false); } @@ -218,7 +217,12 @@ public class CubeStatsReader { final Long baseCuboidRowCount = rowCountMap.get(baseCuboid.getId()); for (int i = 0; i < columnList.size(); i++) { - rowkeyColumnSize.add(dimEncMap.get(columnList.get(i)).getLengthOfEncoding()); + /* + * A workaround, for the fact kylin do not support self-defined encode in Kylin 4, + * it is done by Parquet(https://github.com/apache/parquet-format/blob/master/Encodings.md) for Kylin 4. + * It's complex and hard to calculate real size for specific literal value, so I propose to use 4 for a rough estimation. + */ + rowkeyColumnSize.add(4); } Map<Long, Double> sizeMap = Maps.newHashMap(); @@ -360,11 +364,10 @@ public class CubeStatsReader { } } - double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio(); double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio(); double cuboidSizeTopNRatio = kylinConf.getJobCuboidSizeTopNRatio(); - double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio + double ret = (1.0 * normalSpace * rowCount + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio + 1.0 * percentileSpace * rowCount + 1.0 * topNSpace * rowCount * cuboidSizeTopNRatio) / (1024L * 1024L); return ret; diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java index 6d9b748..f6ae332 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java @@ -60,8 +60,10 @@ public class CuboidRecommenderUtil { Set<Long> mandatoryCuboids = segment.getCubeDesc().getMandatoryCuboids(); String key = cube.getName(); - CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), - cubeStatsReader.getCuboidSizeMap()).setMandatoryCuboids(mandatoryCuboids).setBPUSMinBenefitRatio(segment.getConfig().getCubePlannerBPUSMinBenefitRatio()).build(); + CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(), cubeStatsReader.getCuboidSizeMap()) + .setMandatoryCuboids(mandatoryCuboids) + .setBPUSMinBenefitRatio(segment.getConfig().getCubePlannerBPUSMinBenefitRatio()) + .build(); return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), !mandatoryCuboids.isEmpty()); } diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index cb0d863..dc91a7f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -715,11 +715,12 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.cube.size-estimate-enable-optimize", "false")); } + @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.NOT_CLEAR}) public double getJobCuboidSizeRatio() { return Double.parseDouble(getOptional("kylin.cube.size-estimate-ratio", "0.25")); } - @Deprecated + @ConfigTag({ConfigTag.Tag.DEPRECATED, ConfigTag.Tag.NOT_CLEAR}) public double getJobCuboidSizeMemHungryRatio() { return Double.parseDouble(getOptional("kylin.cube.size-estimate-memhungry-ratio", "0.05")); } @@ -801,7 +802,7 @@ public abstract class KylinConfigBase implements Serializable { // ============================================================================ public boolean isCubePlannerEnabled() { - return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", TRUE)); + return Boolean.parseBoolean(getOptional("kylin.cube.cubeplanner.enabled", FALSE)); } public boolean isCubePlannerEnabledForExistingCube() { @@ -833,6 +834,14 @@ public abstract class KylinConfigBase implements Serializable { } /** + * Columnar storage, like apache parquet, often use encode and compression to make data smaller, + * and this will affect CuboidRecommendAlgorithm. + */ + public double getStorageCompressionRatio() { + return Double.parseDouble(getOptional("kylin.cube.cubeplanner.storage.compression.ratio", "0.2")); + } + + /** * get assigned server array, which a empty string array in default * @return */ @@ -2638,6 +2647,13 @@ public abstract class KylinConfigBase implements Serializable { return getFileName(kylinHome + File.separator + "lib", PARQUET_JOB_JAR_NAME_PATTERN); } + /** + * Use https://github.com/spektom/spark-flamegraph for Spark profile + */ + public String getSparkSubmitCmd() { + return getOptional("kylin.engine.spark-cmd", null); + } + public void overrideKylinParquetJobJarPath(String path) { logger.info("override {} to {}", KYLIN_ENGINE_PARQUET_JOB_JAR, path); System.setProperty(KYLIN_ENGINE_PARQUET_JOB_JAR, path); diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties index e33dbe9..c5cd317 100644 --- a/core-common/src/main/resources/kylin-defaults.properties +++ b/core-common/src/main/resources/kylin-defaults.properties @@ -153,7 +153,7 @@ kylin.cube.algorithm.inmem-auto-optimize=true kylin.cube.aggrgroup.max-combination=32768 -kylin.cube.cubeplanner.enabled=true +kylin.cube.cubeplanner.enabled=false kylin.cube.cubeplanner.enabled-for-existing-cube=false kylin.cube.cubeplanner.expansion-threshold=15.0 kylin.cube.cubeplanner.recommend-cache-max-size=200 diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java index 1f3eaaf..c5f7101 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidRecommender.java @@ -143,7 +143,8 @@ public class CuboidRecommender { long startTime = System.currentTimeMillis(); logger.info("Cube Planner Algorithm started at {}", startTime); - List<Long> recommendCuboidList = algorithm.recommend(kylinConf.getCubePlannerExpansionRateThreshold()); + List<Long> recommendCuboidList = algorithm.recommend( + kylinConf.getCubePlannerExpansionRateThreshold() / kylinConf.getStorageCompressionRatio()); logger.info("Cube Planner Algorithm ended at {}", System.currentTimeMillis() - startTime); if (recommendCuboidList.size() < allCuboidCount) { diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java index 0a48eea..34b73b9 100755 --- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/greedy/GreedyAlgorithm.java @@ -92,8 +92,8 @@ public class GreedyAlgorithm extends AbstractRecommendAlgorithm { remaining.remove(best.getCuboidId()); benefitPolicy.propagateAggregationCost(best.getCuboidId(), selected); round++; - if (logger.isTraceEnabled()) { - logger.trace("Recommend in round {} : {}", round, best); + if (logger.isDebugEnabled()) { + logger.debug("Recommend in round {} : {}", round, best); } } else { doesRemainSpace = false; @@ -111,15 +111,15 @@ public class GreedyAlgorithm extends AbstractRecommendAlgorithm { "There should be no intersection between excluded list and selected list."); logger.info("Greedy Algorithm finished."); - if (logger.isTraceEnabled()) { - logger.trace("Excluded cuboidId size: {}", excluded.size()); - logger.trace("Excluded cuboidId detail:"); + if (logger.isDebugEnabled()) { + logger.debug("Excluded cuboidId size: {}", excluded.size()); + logger.debug("Excluded cuboidId detail:"); for (Long cuboid : excluded) { - logger.trace("cuboidId {} and Cost: {} and Space: {}", cuboid, + logger.debug("cuboidId {} and Cost: {} and Space: {}", cuboid, cuboidStats.getCuboidQueryCost(cuboid), cuboidStats.getCuboidSize(cuboid)); } - logger.trace("Total Space: {}", spaceLimit - remainingSpace); - logger.trace("Space Expansion Rate: {}", (spaceLimit - remainingSpace) / cuboidStats.getBaseCuboidSize()); + logger.debug("Total Space: {}", spaceLimit - remainingSpace); + logger.debug("Space Expansion Rate: {}", (spaceLimit - remainingSpace) / cuboidStats.getBaseCuboidSize()); } return Lists.newArrayList(selected); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java index cea9db1..7342a69 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/CubeDimEncMap.java @@ -20,6 +20,7 @@ package org.apache.kylin.cube.kv; import java.util.Map; +import org.apache.kylin.common.annotation.Clarification; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeDesc; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.kylin.shaded.com.google.common.collect.Maps; +@Clarification(deprecated = true, msg = "Useless code in Kylin 4") public class CubeDimEncMap implements IDimensionEncodingMap, java.io.Serializable { private static final Logger logger = LoggerFactory.getLogger(CubeDimEncMap.class); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 393c10d..4558cf0 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -347,7 +347,10 @@ public class NSparkExecutable extends AbstractExecutable { protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar, String appArgs) { StringBuilder sb = new StringBuilder(); - sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry "); + + String sparkSubmitCmd = config.getSparkSubmitCmd() != null ? + config.getSparkSubmitCmd() : KylinConfig.getSparkHome() + "/bin/spark-submit"; + sb.append("export HADOOP_CONF_DIR=%s && %s --class org.apache.kylin.engine.spark.application.SparkEntry "); Map<String, String> sparkConfs = getSparkConfigOverride(config); for (Entry<String, String> entry : sparkConfs.entrySet()) { @@ -362,7 +365,7 @@ public class NSparkExecutable extends AbstractExecutable { sb.append("--files ").append(config.sparkUploadFiles()).append(" "); sb.append("--name job_step_%s "); sb.append("--jars %s %s %s"); - String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar, + String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar, appArgs); // SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver) logger.info("spark submit cmd: {}", cmd); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 7df8cf0..9504ac8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -22,11 +22,13 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -140,7 +142,9 @@ public class CubeBuildJob extends SparkApplication { // 1.2 Save cuboid statistics String jobWorkingDirPath = JobBuilderSupport.getJobWorkingDir(cubeInstance.getConfig().getHdfsWorkingDirectory(), jobId); Path statisticsDir = new Path(jobWorkingDirPath + "/" + firstSegmentId + "/" + CFG_OUTPUT_STATISTICS); - CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1); + Optional<HLLCounter> hll = hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate)); + long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L); + CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), statisticsDir, hllMap, 1, rc); FileSystem fs = HadoopUtil.getWorkingFileSystem(); ResourceStore rs = ResourceStore.getStore(config); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala index c08a6ae..3b963e4 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala @@ -49,41 +49,56 @@ object CuboidStatisticsJob { class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { private val info = mutable.Map[String, AggInfo]() - val allCuboidsBitSet: Array[Array[Integer]] = getCuboidBitSet(ids, rkc) + private var allCuboidsBitSet: Array[Array[Integer]] = Array() private val hf: HashFunction = Hashing.murmur3_128 private val rowHashCodesLong = new Array[Long](rkc) private var idx = 0 + private var meter1 = 0L + private var meter2 = 0L + private var startMills = 0L + private var endMills = 0L + def statisticsWithinPartition(rows: Iterator[Row]): Iterator[AggInfo] = { init() + println("CuboidStatisticsJob-statisticsWithinPartition1-" + System.currentTimeMillis()) rows.foreach(update) + printStat() + println("CuboidStatisticsJob-statisticsWithinPartition2-" + System.currentTimeMillis()) info.valuesIterator } def init(): Unit = { + println("CuboidStatisticsJob-Init1-" + System.currentTimeMillis()) + allCuboidsBitSet = getCuboidBitSet(ids, rkc) ids.foreach(i => info.put(i.toString, AggInfo(i.toString))) + println("CuboidStatisticsJob-Init2-" + System.currentTimeMillis()) } def update(r: Row): Unit = { idx += 1 - if (idx <= 5 || idx % 300 == 0) + if (idx <= 5) println(r) updateCuboid(r) } def updateCuboid(r: Row): Unit = { // generate hash for each row key column - + startMills = System.currentTimeMillis() var idx = 0 while (idx < rkc) { val hc = hf.newHasher var colValue = r.get(idx).toString if (colValue == null) colValue = "0" - //add column ordinal to the hash value to distinguish between (a,b) and (b,a) + // add column ordinal to the hash value to distinguish between (a,b) and (b,a) rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx idx += 1 } + endMills = System.currentTimeMillis() + meter1 += (endMills - startMills) + + startMills = System.currentTimeMillis() // use the row key column hash to get a consolidated hash for each cuboid val n = allCuboidsBitSet.length idx = 0 @@ -97,6 +112,8 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { info(ids(idx).toString).cuboid.counter.addHashDirectly(value) idx += 1 } + endMills = System.currentTimeMillis() + meter2 += (endMills - startMills) } def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = { @@ -120,6 +137,13 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { } allCuboidsBitSet } + + def printStat(): Unit = { + println(" Stats") + println(" i :" + idx) + println("meter1 :" + meter1) + println("meter2 :" + meter2) + } } case class AggInfo(key: String,