This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new d7ca7a5 KYLIN-4918 Support Cube Level configuration in FilePruner d7ca7a5 is described below commit d7ca7a5918f28ec7bf8de187014451c5709456e4 Author: zhengshengjun <shengjun_zh...@sina.com> AuthorDate: Fri Mar 5 11:25:38 2021 +0800 KYLIN-4918 Support Cube Level configuration in FilePruner --- .../spark/sql/execution/datasource/FilePruner.scala | 15 +++++++-------- .../sql/execution/datasource/ResetShufflePartition.scala | 12 +++++------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index f3fe76d..b6008c2 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -21,7 +21,6 @@ package org.apache.spark.sql.execution.datasource import java.sql.{Date, Timestamp} import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.DateFormat import org.apache.kylin.cube.cuboid.Cuboid import org.apache.kylin.cube.CubeInstance @@ -78,6 +77,9 @@ class FilePruner(cubeInstance: CubeInstance, val options: Map[String, String]) extends FileIndex with ResetShufflePartition with Logging { + val MAX_SHARDING_SIZE_PER_TASK: Long = + cubeInstance.getConfig.getMaxShardingSizeMBPerTask * 1024 * 1024 + private lazy val segmentDirs: Seq[SegmentDirectory] = { cubeInstance.getSegments.asScala .filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => { @@ -170,7 +172,7 @@ class FilePruner(cubeInstance: CubeInstance, } private def genShardSpec(selected: Seq[SegmentDirectory]): Option[ShardSpec] = { - if (!KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled || selected.isEmpty) { + if (!cubeInstance.getConfig.isShardingJoinOptEnabled || selected.isEmpty) { None } else { val segments = selected.par.map { segDir => @@ -190,9 +192,9 @@ class FilePruner(cubeInstance: CubeInstance, (FilePruner.getPartitionId(f.getPath), f.getLen) ).groupBy(_._1).mapValues(_.map(_._2).sum) // if there are some partition ids which the file size exceeds the threshold - if (partitionSizePerId.exists(_._2 > FilePruner.MAX_SHARDING_SIZE_PER_TASK)) { + if (partitionSizePerId.exists(_._2 > MAX_SHARDING_SIZE_PER_TASK)) { logInfo(s"There are some partition ids which the file size exceeds the " + - s"threshold size ${FilePruner.MAX_SHARDING_SIZE_PER_TASK}, skip shard join.") + s"threshold size ${MAX_SHARDING_SIZE_PER_TASK}, skip shard join.") None } else { val sortColumns = if (segments.length == 1) { @@ -259,7 +261,7 @@ class FilePruner(cubeInstance: CubeInstance, // QueryContextFacade.current().record("shard_pruning") val totalFileSize = selected.flatMap(_.files).map(_.getLen).sum logInfo(s"After files pruning, total file size is ${totalFileSize}") - setShufflePartitions(totalFileSize, session) + setShufflePartitions(totalFileSize, session, cubeInstance.getConfig) logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble / 1000000} ms") if (selected.isEmpty) { val value = Seq.empty[PartitionDirectory] @@ -447,9 +449,6 @@ class FilePruner(cubeInstance: CubeInstance, object FilePruner { - val MAX_SHARDING_SIZE_PER_TASK: Long = KylinConfig.getInstanceFromEnv - .getMaxShardingSizeMBPerTask * 1024 * 1024 - def getPartitionId(p: Path): Int = { // path like: part-00001-91f13932-3d5e-4f85-9a56-d1e2b47d0ccb-c000.snappy.parquet // we need to get 00001. diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala index 1549c45..6724bce 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala @@ -23,17 +23,15 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.utils.SparderUtils trait ResetShufflePartition extends Logging { - val PARTITION_SPLIT_BYTES: Long = - KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 // 64MB - def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = { + def setShufflePartitions(bytes: Long, sparkSession: SparkSession, conf: KylinConfig): Unit = { QueryContextFacade.current().addAndGetSourceScanBytes(bytes) val defaultParallelism = SparderUtils.getTotalCore(sparkSession.sparkContext.getConf) - val kylinConfig = KylinConfig.getInstanceFromEnv - val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) { - kylinConfig.getSparkSqlShufflePartitions + val partitionsNum = if (conf.getSparkSqlShufflePartitions != -1) { + conf.getSparkSqlShufflePartitions } else { - Math.min(QueryContextFacade.current().getSourceScanBytes / PARTITION_SPLIT_BYTES + 1, + Math.min(QueryContextFacade.current().getSourceScanBytes / + (conf.getQueryPartitionSplitSizeMB * 1024 * 1024) + 1, defaultParallelism).toInt } // when hitting cube, this will override the value of 'spark.sql.shuffle.partitions'