This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit af8c8fee7c98d7fb4b3ccd594ed0e8b200ff3e64 Author: XiaoxiangYu <x...@apache.org> AuthorDate: Thu Dec 10 18:25:11 2020 +0800 KYLIN-4818 Refine CuboidStatisticsJob to improve performance --- .../kylin/engine/mr/common/CubeStatsReader.java | 4 +-- .../java/org/apache/kylin/cube/CubeSegment.java | 2 +- .../org/apache/kylin/measure/hllc/HLLCounter.java | 2 +- .../kylin/engine/spark/job/CubeBuildJob.java | 11 +++++--- .../engine/spark/job/CuboidStatisticsJob.scala | 29 +++++++++------------- 5 files changed, 23 insertions(+), 25 deletions(-) diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 1a1dd11..3f804dd 100644 --- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -56,7 +56,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.cuboid.CuboidScheduler; -import org.apache.kylin.cube.kv.CubeDimEncMap; import org.apache.kylin.cube.kv.RowKeyEncoder; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.measure.hllc.HLLCounter; @@ -213,7 +212,6 @@ public class CubeStatsReader { final List<Integer> rowkeyColumnSize = Lists.newArrayList(); final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc); final List<TblColRef> columnList = baseCuboid.getColumns(); - final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap(); final Long baseCuboidRowCount = rowCountMap.get(baseCuboid.getId()); for (int i = 0; i < columnList.size(); i++) { @@ -231,7 +229,7 @@ public class CubeStatsReader { baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, sourceRowCount)); } - if (origin == false && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) { + if (!origin && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) { optimizeSizeMap(sizeMap, cubeSegment); } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index 715e684..706cd97 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -541,7 +541,7 @@ public class CubeSegment implements IBuildable, ISegment, Serializable { } public String getPreciseStatisticsResourcePath() { - return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), ".json"); + return getStatisticsResourcePath(this.getCubeInstance().getName(), this.getUuid(), "json"); } public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) { diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java index 1c1371f..86e63d1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java @@ -151,7 +151,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> { public void merge(HLLCounter another) { assert this.p == another.p; - assert this.hashFunc == another.hashFunc; + assert this.hashFunc.equals(another.hashFunc); switch (register.getRegisterType()) { case SINGLE_VALUE: switch (another.getRegisterType()) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 4a0226d..89ecad4 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -239,14 +239,19 @@ public class CubeBuildJob extends SparkApplication { } try { + FileSystem fs = HadoopUtil.getWorkingFileSystem(); JavaSparkContext jsc = JavaSparkContext.fromSparkContext(ss.sparkContext()); JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics, 1); for (String cuboid : cuboidStatics) { logger.info("Statistics \t: {}", cuboid); } - String path = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath(); - logger.info("Saving {} {}", path, segmentInfo); - cuboidStatRdd.saveAsTextFile(path); + String pathDir = config.getHdfsWorkingDirectory() + segment.getPreciseStatisticsResourcePath(); + logger.info("Saving {} {} .", pathDir, segmentInfo); + Path path = new Path(pathDir); + if (fs.exists(path)) { + fs.delete(path, true); + } + cuboidStatRdd.saveAsTextFile(pathDir); } catch (Exception e) { logger.error("Write metrics failed.", e); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala index eb2a815..9c18765 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala @@ -39,21 +39,20 @@ object CuboidStatisticsJob { val rkc = seg.allColumns.count(c => c.rowKey) // maybe we should use sample operation to reduce cost later - val res = inputDs.rdd - .mapPartitions(new CuboidStatisticsJob(seg.getAllLayout.map(x => x.getId), rkc).statisticsWithinPartition) + val res = inputDs.rdd.repartition(inputDs.sparkSession.sparkContext.defaultParallelism) + .mapPartitions(new CuboidStatisticsJob(seg.getAllLayout.map(x => x.getId).toArray, rkc).statisticsWithinPartition) val l = res.map(a => (a.key, a)).reduceByKey((a, b) => a.merge(b)).collect() // l.foreach(x => println(x._1 + " >>><<< " + x._2.cuboid.counter.getCountEstimate)) l } } -class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { - private val info = mutable.Map[Long, AggInfo]() +class CuboidStatisticsJob(ids: Array[Long], rkc: Int) extends Serializable { + private val info = mutable.LongMap[AggInfo]() private var allCuboidsBitSet: Array[Array[Integer]] = Array() private val hf: HashFunction = Hashing.murmur3_128 private val rowHashCodesLong = new Array[Long](rkc) private var idx = 0 - private var meter1 = 0L private var meter2 = 0L private var startMills = 0L private var endMills = 0L @@ -84,19 +83,14 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { def updateCuboid(r: Row): Unit = { // generate hash for each row key column - startMills = System.currentTimeMillis() var idx = 0 while (idx < rkc) { val hc = hf.newHasher - var colValue = r.get(idx).toString - if (colValue == null) colValue = "0" + val colValue = if (r.get(idx) == null) "0" else r.get(idx).toString // add column ordinal to the hash value to distinguish between (a,b) and (b,a) rowHashCodesLong(idx) = hc.putUnencodedChars(colValue).hash().padToLong() + idx idx += 1 } - endMills = System.currentTimeMillis() - meter1 += (endMills - startMills) - startMills = System.currentTimeMillis() // use the row key column hash to get a consolidated hash for each cuboid @@ -105,8 +99,10 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { while (idx < n) { var value: Long = 0 var position = 0 - while (position < allCuboidsBitSet(idx).length) { - value += rowHashCodesLong(allCuboidsBitSet(idx)(position)) + val currCuboidBitSet = allCuboidsBitSet(idx) + val currCuboidLength = currCuboidBitSet.length + while (position < currCuboidLength) { + value += rowHashCodesLong(currCuboidBitSet(position)) position += 1 } info(ids(idx)).cuboid.counter.addHashDirectly(value) @@ -116,7 +112,7 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { meter2 += (endMills - startMills) } - def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): Array[Array[Integer]] = { + def getCuboidBitSet(cuboidIds: Array[Long], nRowKey: Int): Array[Array[Integer]] = { val allCuboidsBitSet: Array[Array[Integer]] = new Array[Array[Integer]](cuboidIds.length) var j: Int = 0 while (j < cuboidIds.length) { @@ -140,9 +136,8 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable { def printStat(): Unit = { println(" Stats") - println(" i :" + idx) - println("meter1 :" + meter1) - println("meter2 :" + meter2) + println("i :" + idx) + println("meter :" + meter2) } }