This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new d7ca7a5  KYLIN-4918 Support Cube Level configuration in FilePruner
d7ca7a5 is described below

commit d7ca7a5918f28ec7bf8de187014451c5709456e4
Author: zhengshengjun <shengjun_zh...@sina.com>
AuthorDate: Fri Mar 5 11:25:38 2021 +0800

    KYLIN-4918 Support Cube Level configuration in FilePruner
---
 .../spark/sql/execution/datasource/FilePruner.scala       | 15 +++++++--------
 .../sql/execution/datasource/ResetShufflePartition.scala  | 12 +++++-------
 2 files changed, 12 insertions(+), 15 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index f3fe76d..b6008c2 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -21,7 +21,6 @@ package org.apache.spark.sql.execution.datasource
 import java.sql.{Date, Timestamp}
 
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.DateFormat
 import org.apache.kylin.cube.cuboid.Cuboid
 import org.apache.kylin.cube.CubeInstance
@@ -78,6 +77,9 @@ class FilePruner(cubeInstance: CubeInstance,
                  val options: Map[String, String])
   extends FileIndex with ResetShufflePartition with Logging {
 
+  val MAX_SHARDING_SIZE_PER_TASK: Long =
+    cubeInstance.getConfig.getMaxShardingSizeMBPerTask * 1024 * 1024
+
   private lazy val segmentDirs: Seq[SegmentDirectory] = {
     cubeInstance.getSegments.asScala
       .filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => {
@@ -170,7 +172,7 @@ class FilePruner(cubeInstance: CubeInstance,
   }
 
   private def genShardSpec(selected: Seq[SegmentDirectory]): Option[ShardSpec] 
= {
-    if (!KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled || 
selected.isEmpty) {
+    if (!cubeInstance.getConfig.isShardingJoinOptEnabled || selected.isEmpty) {
       None
     } else {
       val segments = selected.par.map { segDir =>
@@ -190,9 +192,9 @@ class FilePruner(cubeInstance: CubeInstance,
           (FilePruner.getPartitionId(f.getPath), f.getLen)
         ).groupBy(_._1).mapValues(_.map(_._2).sum)
         // if there are some partition ids which the file size exceeds the 
threshold
-        if (partitionSizePerId.exists(_._2 > 
FilePruner.MAX_SHARDING_SIZE_PER_TASK)) {
+        if (partitionSizePerId.exists(_._2 > MAX_SHARDING_SIZE_PER_TASK)) {
           logInfo(s"There are some partition ids which the file size exceeds 
the " +
-            s"threshold size ${FilePruner.MAX_SHARDING_SIZE_PER_TASK}, skip 
shard join.")
+            s"threshold size ${MAX_SHARDING_SIZE_PER_TASK}, skip shard join.")
           None
         } else {
           val sortColumns = if (segments.length == 1) {
@@ -259,7 +261,7 @@ class FilePruner(cubeInstance: CubeInstance,
     //    QueryContextFacade.current().record("shard_pruning")
     val totalFileSize = selected.flatMap(_.files).map(_.getLen).sum
     logInfo(s"After files pruning, total file size is ${totalFileSize}")
-    setShufflePartitions(totalFileSize, session)
+    setShufflePartitions(totalFileSize, session, cubeInstance.getConfig)
     logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble / 
1000000} ms")
     if (selected.isEmpty) {
       val value = Seq.empty[PartitionDirectory]
@@ -447,9 +449,6 @@ class FilePruner(cubeInstance: CubeInstance,
 
 object FilePruner {
 
-  val MAX_SHARDING_SIZE_PER_TASK: Long = KylinConfig.getInstanceFromEnv
-    .getMaxShardingSizeMBPerTask * 1024 * 1024
-
   def getPartitionId(p: Path): Int = {
     // path like: 
part-00001-91f13932-3d5e-4f85-9a56-d1e2b47d0ccb-c000.snappy.parquet
     // we need to get 00001.
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index 1549c45..6724bce 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -23,17 +23,15 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.utils.SparderUtils
 
 trait ResetShufflePartition extends Logging {
-  val PARTITION_SPLIT_BYTES: Long =
-    KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 
// 64MB
 
-  def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
+  def setShufflePartitions(bytes: Long, sparkSession: SparkSession, conf: 
KylinConfig): Unit = {
     QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
     val defaultParallelism = 
SparderUtils.getTotalCore(sparkSession.sparkContext.getConf)
-    val kylinConfig = KylinConfig.getInstanceFromEnv
-    val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) {
-      kylinConfig.getSparkSqlShufflePartitions
+    val partitionsNum = if (conf.getSparkSqlShufflePartitions != -1) {
+      conf.getSparkSqlShufflePartitions
     } else {
-      Math.min(QueryContextFacade.current().getSourceScanBytes / 
PARTITION_SPLIT_BYTES + 1,
+      Math.min(QueryContextFacade.current().getSourceScanBytes /
+        (conf.getQueryPartitionSplitSizeMB * 1024 * 1024) + 1,
         defaultParallelism).toInt
     }
     // when hitting cube, this will override the value of 
'spark.sql.shuffle.partitions'

Reply via email to