This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ee0b0ec92bdb73d13f306549dbb1b22cdbac6226
Author: huangsheng <huangshen...@163.com>
AuthorDate: Wed Mar 22 10:07:36 2023 +0800

    KYLIN-5576 Don't use the filters with subquery on partition columns to 
detect resources during build the model
---
 .../kylin/engine/spark/job/RDPartitionBuildExec.scala  |  2 +-
 .../kylin/engine/spark/job/RDSegmentBuildExec.scala    |  2 +-
 .../spark/job/ResourceDetectBeforeCubingJob.java       |  2 +-
 .../spark/job/ResourceDetectBeforeMergingJob.java      |  2 +-
 .../engine/spark/job/ResourceDetectBeforeSampling.java |  2 +-
 .../spark/sql/hive/utils/ResourceDetectUtils.scala     | 18 ++++++++++++------
 6 files changed, 17 insertions(+), 11 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
index 11bc92ee3a..6279452f39 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
@@ -65,7 +65,7 @@ class RDPartitionBuildExec(private val jobContext: 
SegmentJob, //
         
Integer.parseInt(ResourceDetectUtils.getPartitions(execution.executedPlan))).sum
 
       val paths = executions.flatMap(execution => //
-        ResourceDetectUtils.getPaths(execution.sparkPlan).map(_.toString)
+        ResourceDetectUtils.getPaths(execution.sparkPlan, true).map(_.toString)
       ).asJava
 
       logInfo(s"Detected source: $sourceName $leaves 
${paths.asScala.mkString(",")}")
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
index fb8951a947..5e9abab57b 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
@@ -61,7 +61,7 @@ class RDSegmentBuildExec(private val jobContext: SegmentJob, 
//
       val sourceName = String.valueOf(parentId)
       val leaves = 
Integer.parseInt(ResourceDetectUtils.getPartitions(execution.executedPlan))
       logInfo(s"Leaf nodes: $leaves")
-      val paths = 
ResourceDetectUtils.getPaths(execution.sparkPlan).map(_.toString).asJava
+      val paths = ResourceDetectUtils.getPaths(execution.sparkPlan, 
true).map(_.toString).asJava
       logInfo(s"Detected source: $sourceName $leaves 
${paths.asScala.mkString(",")}")
       val startTime = System.currentTimeMillis()
       val resourceSize = 
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize,
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
index 94f655b64d..6b23f991df 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
@@ -99,7 +99,7 @@ public class ResourceDetectBeforeCubingJob extends 
SparkApplication {
                 logger.info("leaf nodes is: {} ", leafNodeNum);
                 infos.recordSparkPlan(dataset.queryExecution().sparkPlan());
                 List<Path> paths = JavaConversions
-                        
.seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan()));
+                        
.seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan(),
 true));
                 resourceSize.put(String.valueOf(source.getLayoutId()),
                         getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize(),
                                 
asScalaIteratorConverter(paths.iterator()).asScala().toSeq()));
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
index 899321e060..7c74757d36 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
@@ -65,7 +65,7 @@ public class ResourceDetectBeforeMergingJob extends 
SparkApplication implements
             Dataset<Row> afterMerge = entry.getValue().merge();
             infos.recordSparkPlan(afterMerge.queryExecution().sparkPlan());
             List<Path> paths = JavaConversions
-                    
.seqAsJavaList(ResourceDetectUtils.getPaths(afterMerge.queryExecution().sparkPlan()));
+                    
.seqAsJavaList(ResourceDetectUtils.getPaths(afterMerge.queryExecution().sparkPlan(),
 true));
             resourceSize.put(String.valueOf(entry.getKey()),
                     
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize(),
                             
JavaConverters.asScalaIteratorConverter(paths.iterator()).asScala().toSeq()));
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
index 7c317fe7d2..2cf166b5ff 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
@@ -56,7 +56,7 @@ public class ResourceDetectBeforeSampling extends 
SparkApplication implements Re
                 .createEngineAdapter(tableDesc, 
NSparkCubingEngine.NSparkCubingSource.class)
                 .getSourceData(tableDesc, ss, params);
         final List<Path> paths = JavaConversions
-                
.seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan()));
+                
.seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan(),
 true));
 
         Map<String, Long> resourceSize = Maps.newHashMap();
         resourceSize.put(String.valueOf(tableName),
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
index e7f97a5655..4b1e06aeb9 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
@@ -28,7 +28,7 @@ import org.apache.kylin.guava30.shaded.common.collect.Maps
 import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity}
 import org.apache.kylin.query.util.QueryInterruptChecker
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.datasources.FileIndex
@@ -51,15 +51,15 @@ object ResourceDetectUtils extends Logging {
 
   private val errorMsgLog: String = "Interrupted at the stage of get paths in 
ResourceDetectUtils."
 
-  def getPaths(plan: SparkPlan): Seq[Path] = {
+  def getPaths(plan: SparkPlan, isResourceDetectJob: Boolean = false): 
Seq[Path] = {
     var paths = Seq.empty[Path]
     plan.foreach {
       case plan: FileSourceScanExec =>
         val info = "Current step: get Partition file status of 
FileSourceScanExec."
-        paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, 
plan.dataFilters, info)
+        paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, 
plan.dataFilters, info, isResourceDetectJob)
       case plan: LayoutFileSourceScanExec =>
         val info = "Current step: get Partition file status of 
LayoutFileSourceScanExec."
-        paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, 
plan.dataFilters, info)
+        paths ++= getFilePaths(plan.relation.location, plan.partitionFilters, 
plan.dataFilters, info, isResourceDetectJob)
       case plan: InMemoryTableScanExec =>
         val _plan = plan.relation.cachedPlan
         paths ++= getPaths(_plan)
@@ -84,10 +84,16 @@ object ResourceDetectUtils extends Logging {
     paths
   }
 
-  def getFilePaths(fileIndex: FileIndex, partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression], info: String): Seq[Path] = {
+  def getFilePaths(fileIndex: FileIndex, partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]
+                   , info: String, isResourceDetectJob: Boolean): Seq[Path] = {
     var paths = Seq.empty[Path]
     if (fileIndex.partitionSchema.nonEmpty) {
-      val selectedPartitions = fileIndex.listFiles(partitionFilters, 
dataFilters)
+      var newPartitionFilters = partitionFilters
+      if (isResourceDetectJob) {
+        logInfo("The job is resource detect job, add filterNot of 
SubqueryExpression to the job.")
+        newPartitionFilters = 
partitionFilters.filterNot(SubqueryExpression.hasSubquery)
+      }
+      val selectedPartitions = fileIndex.listFiles(newPartitionFilters, 
dataFilters)
       selectedPartitions.flatMap(partition => {
         QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, info)
         partition.files

Reply via email to