This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ee0b0ec92bdb73d13f306549dbb1b22cdbac6226 Author: huangsheng <huangshen...@163.com> AuthorDate: Wed Mar 22 10:07:36 2023 +0800 KYLIN-5576 Don't use the filters with subquery on partition columns to detect resources during build the model --- .../kylin/engine/spark/job/RDPartitionBuildExec.scala | 2 +- .../kylin/engine/spark/job/RDSegmentBuildExec.scala | 2 +- .../spark/job/ResourceDetectBeforeCubingJob.java | 2 +- .../spark/job/ResourceDetectBeforeMergingJob.java | 2 +- .../engine/spark/job/ResourceDetectBeforeSampling.java | 2 +- .../spark/sql/hive/utils/ResourceDetectUtils.scala | 18 ++++++++++++------ 6 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala index 11bc92ee3a..6279452f39 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala @@ -65,7 +65,7 @@ class RDPartitionBuildExec(private val jobContext: SegmentJob, // Integer.parseInt(ResourceDetectUtils.getPartitions(execution.executedPlan))).sum val paths = executions.flatMap(execution => // - ResourceDetectUtils.getPaths(execution.sparkPlan).map(_.toString) + ResourceDetectUtils.getPaths(execution.sparkPlan, true).map(_.toString) ).asJava logInfo(s"Detected source: $sourceName $leaves ${paths.asScala.mkString(",")}") diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala index fb8951a947..5e9abab57b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala @@ -61,7 +61,7 @@ class RDSegmentBuildExec(private val jobContext: SegmentJob, // val sourceName = String.valueOf(parentId) val leaves = Integer.parseInt(ResourceDetectUtils.getPartitions(execution.executedPlan)) logInfo(s"Leaf nodes: $leaves") - val paths = ResourceDetectUtils.getPaths(execution.sparkPlan).map(_.toString).asJava + val paths = ResourceDetectUtils.getPaths(execution.sparkPlan, true).map(_.toString).asJava logInfo(s"Detected source: $sourceName $leaves ${paths.asScala.mkString(",")}") val startTime = System.currentTimeMillis() val resourceSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize, diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java index 94f655b64d..6b23f991df 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java @@ -99,7 +99,7 @@ public class ResourceDetectBeforeCubingJob extends SparkApplication { logger.info("leaf nodes is: {} ", leafNodeNum); infos.recordSparkPlan(dataset.queryExecution().sparkPlan()); List<Path> paths = JavaConversions - .seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan())); + .seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan(), true)); resourceSize.put(String.valueOf(source.getLayoutId()), getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize(), asScalaIteratorConverter(paths.iterator()).asScala().toSeq())); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java index 899321e060..7c74757d36 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java @@ -65,7 +65,7 @@ public class ResourceDetectBeforeMergingJob extends SparkApplication implements Dataset<Row> afterMerge = entry.getValue().merge(); infos.recordSparkPlan(afterMerge.queryExecution().sparkPlan()); List<Path> paths = JavaConversions - .seqAsJavaList(ResourceDetectUtils.getPaths(afterMerge.queryExecution().sparkPlan())); + .seqAsJavaList(ResourceDetectUtils.getPaths(afterMerge.queryExecution().sparkPlan(), true)); resourceSize.put(String.valueOf(entry.getKey()), ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize(), JavaConverters.asScalaIteratorConverter(paths.iterator()).asScala().toSeq())); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java index 7c317fe7d2..2cf166b5ff 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java @@ -56,7 +56,7 @@ public class ResourceDetectBeforeSampling extends SparkApplication implements Re .createEngineAdapter(tableDesc, NSparkCubingEngine.NSparkCubingSource.class) .getSourceData(tableDesc, ss, params); final List<Path> paths = JavaConversions - .seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan())); + .seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan(), true)); Map<String, Long> resourceSize = Maps.newHashMap(); resourceSize.put(String.valueOf(tableName), diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala index e7f97a5655..4b1e06aeb9 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala @@ -28,7 +28,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Maps import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity} import org.apache.kylin.query.util.QueryInterruptChecker import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.FileIndex @@ -51,15 +51,15 @@ object ResourceDetectUtils extends Logging { private val errorMsgLog: String = "Interrupted at the stage of get paths in ResourceDetectUtils." - def getPaths(plan: SparkPlan): Seq[Path] = { + def getPaths(plan: SparkPlan, isResourceDetectJob: Boolean = false): Seq[Path] = { var paths = Seq.empty[Path] plan.foreach { case plan: FileSourceScanExec => val info = "Current step: get Partition file status of FileSourceScanExec." - paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, plan.dataFilters, info) + paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, plan.dataFilters, info, isResourceDetectJob) case plan: LayoutFileSourceScanExec => val info = "Current step: get Partition file status of LayoutFileSourceScanExec." - paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, plan.dataFilters, info) + paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, plan.dataFilters, info, isResourceDetectJob) case plan: InMemoryTableScanExec => val _plan = plan.relation.cachedPlan paths ++= getPaths(_plan) @@ -84,10 +84,16 @@ object ResourceDetectUtils extends Logging { paths } - def getFilePaths(fileIndex: FileIndex, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], info: String): Seq[Path] = { + def getFilePaths(fileIndex: FileIndex, partitionFilters: Seq[Expression], dataFilters: Seq[Expression] + , info: String, isResourceDetectJob: Boolean): Seq[Path] = { var paths = Seq.empty[Path] if (fileIndex.partitionSchema.nonEmpty) { - val selectedPartitions = fileIndex.listFiles(partitionFilters, dataFilters) + var newPartitionFilters = partitionFilters + if (isResourceDetectJob) { + logInfo("The job is resource detect job, add filterNot of SubqueryExpression to the job.") + newPartitionFilters = partitionFilters.filterNot(SubqueryExpression.hasSubquery) + } + val selectedPartitions = fileIndex.listFiles(newPartitionFilters, dataFilters) selectedPartitions.flatMap(partition => { QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, info) partition.files