This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit af8c8fee7c98d7fb4b3ccd594ed0e8b200ff3e64
Author: XiaoxiangYu <x...@apache.org>
AuthorDate: Thu Dec 10 18:25:11 2020 +0800

    KYLIN-4818 Refine CuboidStatisticsJob to improve performance
---
 .../kylin/engine/mr/common/CubeStatsReader.java    |  4 +--
 .../java/org/apache/kylin/cube/CubeSegment.java    |  2 +-
 .../org/apache/kylin/measure/hllc/HLLCounter.java  |  2 +-
 .../kylin/engine/spark/job/CubeBuildJob.java       | 11 +++++---
 .../engine/spark/job/CuboidStatisticsJob.scala     | 29 +++++++++-------------
 5 files changed, 23 insertions(+), 25 deletions(-)

diff --git 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 1a1dd11..3f804dd 100644
--- 
a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -56,7 +56,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.kv.CubeDimEncMap;
 import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.measure.hllc.HLLCounter;
@@ -213,7 +212,6 @@ public class CubeStatsReader {
         final List<Integer> rowkeyColumnSize = Lists.newArrayList();
         final Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeDesc);
         final List<TblColRef> columnList = baseCuboid.getColumns();
-        final CubeDimEncMap dimEncMap = cubeSegment.getDimensionEncodingMap();
         final Long baseCuboidRowCount = rowCountMap.get(baseCuboid.getId());
 
         for (int i = 0; i < columnList.size(); i++) {
@@ -231,7 +229,7 @@ public class CubeStatsReader {
                     baseCuboid.getId(), baseCuboidRowCount, rowkeyColumnSize, 
sourceRowCount));
         }
 
-        if (origin == false && 
cubeSegment.getConfig().enableJobCuboidSizeOptimize()) {
+        if (!origin && cubeSegment.getConfig().enableJobCuboidSizeOptimize()) {
             optimizeSizeMap(sizeMap, cubeSegment);
         }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java 
b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 715e684..706cd97 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -541,7 +541,7 @@ public class CubeSegment implements IBuildable, ISegment, 
Serializable {
     }
 
     public String getPreciseStatisticsResourcePath() {
-        return getStatisticsResourcePath(this.getCubeInstance().getName(), 
this.getUuid(), ".json");
+        return getStatisticsResourcePath(this.getCubeInstance().getName(), 
this.getUuid(), "json");
     }
 
     public static String getStatisticsResourcePath(String cubeName, String 
cubeSegmentId) {
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index 1c1371f..86e63d1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -151,7 +151,7 @@ public class HLLCounter implements Serializable, 
Comparable<HLLCounter> {
 
     public void merge(HLLCounter another) {
         assert this.p == another.p;
-        assert this.hashFunc == another.hashFunc;
+        assert this.hashFunc.equals(another.hashFunc);
         switch (register.getRegisterType()) {
             case SINGLE_VALUE:
                 switch (another.getRegisterType()) {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 4a0226d..89ecad4 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -239,14 +239,19 @@ public class CubeBuildJob extends SparkApplication {
         }
 
         try {
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
             JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(ss.sparkContext());
             JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics, 1);
             for (String cuboid : cuboidStatics) {
                 logger.info("Statistics \t: {}", cuboid);
             }
-            String path = config.getHdfsWorkingDirectory() + 
segment.getPreciseStatisticsResourcePath();
-            logger.info("Saving {} {}", path, segmentInfo);
-            cuboidStatRdd.saveAsTextFile(path);
+            String pathDir = config.getHdfsWorkingDirectory() + 
segment.getPreciseStatisticsResourcePath();
+            logger.info("Saving {} {} .", pathDir, segmentInfo);
+            Path path = new Path(pathDir);
+            if (fs.exists(path)) {
+                fs.delete(path, true);
+            }
+            cuboidStatRdd.saveAsTextFile(pathDir);
         } catch (Exception e) {
             logger.error("Write metrics failed.", e);
         }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
index eb2a815..9c18765 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
@@ -39,21 +39,20 @@ object CuboidStatisticsJob {
 
     val rkc = seg.allColumns.count(c => c.rowKey)
     // maybe we should use sample operation to reduce cost later
-    val res = inputDs.rdd
-      .mapPartitions(new CuboidStatisticsJob(seg.getAllLayout.map(x => 
x.getId), rkc).statisticsWithinPartition)
+    val res = 
inputDs.rdd.repartition(inputDs.sparkSession.sparkContext.defaultParallelism)
+      .mapPartitions(new CuboidStatisticsJob(seg.getAllLayout.map(x => 
x.getId).toArray, rkc).statisticsWithinPartition)
     val l = res.map(a => (a.key, a)).reduceByKey((a, b) => 
a.merge(b)).collect()
     //    l.foreach(x => println(x._1 + " >>><<< " + 
x._2.cuboid.counter.getCountEstimate))
     l
   }
 }
 
-class CuboidStatisticsJob(ids: List[Long], rkc: Int) extends Serializable {
-  private val info = mutable.Map[Long, AggInfo]()
+class CuboidStatisticsJob(ids: Array[Long], rkc: Int) extends Serializable {
+  private val info = mutable.LongMap[AggInfo]()
   private var allCuboidsBitSet: Array[Array[Integer]] = Array()
   private val hf: HashFunction = Hashing.murmur3_128
   private val rowHashCodesLong = new Array[Long](rkc)
   private var idx = 0
-  private var meter1 = 0L
   private var meter2 = 0L
   private var startMills = 0L
   private var endMills = 0L
@@ -84,19 +83,14 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) 
extends Serializable {
 
   def updateCuboid(r: Row): Unit = {
     // generate hash for each row key column
-    startMills = System.currentTimeMillis()
     var idx = 0
     while (idx < rkc) {
       val hc = hf.newHasher
-      var colValue = r.get(idx).toString
-      if (colValue == null) colValue = "0"
+      val colValue = if (r.get(idx) == null) "0" else r.get(idx).toString
       // add column ordinal to the hash value to distinguish between (a,b) and 
(b,a)
       rowHashCodesLong(idx) = 
hc.putUnencodedChars(colValue).hash().padToLong() + idx
       idx += 1
     }
-    endMills = System.currentTimeMillis()
-    meter1 += (endMills - startMills)
-
 
     startMills = System.currentTimeMillis()
     // use the row key column hash to get a consolidated hash for each cuboid
@@ -105,8 +99,10 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) 
extends Serializable {
     while (idx < n) {
       var value: Long = 0
       var position = 0
-      while (position < allCuboidsBitSet(idx).length) {
-        value += rowHashCodesLong(allCuboidsBitSet(idx)(position))
+      val currCuboidBitSet = allCuboidsBitSet(idx)
+      val currCuboidLength = currCuboidBitSet.length
+      while (position < currCuboidLength) {
+        value += rowHashCodesLong(currCuboidBitSet(position))
         position += 1
       }
       info(ids(idx)).cuboid.counter.addHashDirectly(value)
@@ -116,7 +112,7 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) 
extends Serializable {
     meter2 += (endMills - startMills)
   }
 
-  def getCuboidBitSet(cuboidIds: List[Long], nRowKey: Int): 
Array[Array[Integer]] = {
+  def getCuboidBitSet(cuboidIds: Array[Long], nRowKey: Int): 
Array[Array[Integer]] = {
     val allCuboidsBitSet: Array[Array[Integer]] = new 
Array[Array[Integer]](cuboidIds.length)
     var j: Int = 0
     while (j < cuboidIds.length) {
@@ -140,9 +136,8 @@ class CuboidStatisticsJob(ids: List[Long], rkc: Int) 
extends Serializable {
 
   def printStat(): Unit = {
     println("    Stats")
-    println("   i   :" + idx)
-    println("meter1 :" + meter1)
-    println("meter2 :" + meter2)
+    println("i      :" + idx)
+    println("meter  :" + meter2)
   }
 }
 

Reply via email to