This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5_beta
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7f3da6a6aed23e52d12cadbbec18eeb4bf4b60ae
Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
AuthorDate: Thu Apr 27 19:24:16 2023 +0800

    KYLIN-5646 Fix in yarn cluster mode, an error is reported for incrementally 
building the partition table job
    
    Co-authored-by: sibing.zhang <sibing.zh...@qq.com>
---
 .../engine/spark/job/RDPartitionBuildExec.scala    |  4 +--
 .../engine/spark/job/RDSegmentBuildExec.scala      |  7 +++--
 .../spark/job/ResourceDetectBeforeCubingJob.java   |  2 +-
 .../spark/job/ResourceDetectBeforeMergingJob.java  | 14 ++++-----
 .../spark/job/ResourceDetectBeforeSampling.java    | 13 ++++----
 .../sql/hive/utils/TestResourceDetectUtils.scala   | 22 ++++++++++----
 .../kylin/query/pushdown/SparkSqlClient.scala      |  5 ++--
 .../spark/sql/hive/utils/ResourceDetectUtils.scala | 35 +++++++++++-----------
 8 files changed, 58 insertions(+), 44 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
index 6279452f39..f57a237508 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
@@ -69,8 +69,8 @@ class RDPartitionBuildExec(private val jobContext: 
SegmentJob, //
       ).asJava
 
       logInfo(s"Detected source: $sourceName $leaves 
${paths.asScala.mkString(",")}")
-      sourceSize.put(sourceName, 
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),
-        config.isConcurrencyFetchDataSourceSize, paths.asScala.map(path => new 
Path(path)): _*))
+      sourceSize.put(sourceName, ResourceDetectUtils.getResourceSize(config, 
SparderEnv.getHadoopConfiguration(),
+        paths.asScala.map(path => new Path(path)): _*))
       sourceLeaves.put(sourceName, leaves)
     }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
index 5e9abab57b..8716ee8f9a 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
@@ -18,16 +18,17 @@
 
 package org.apache.kylin.engine.spark.job
 
-import org.apache.kylin.guava30.shaded.common.collect.Maps
+import java.io.IOException
+
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.engine.spark.job.stage.BuildParam
 import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase
+import org.apache.kylin.guava30.shaded.common.collect.Maps
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.spark.sql.SparderEnv
 import org.apache.spark.sql.datasource.storage.StorageStoreUtils
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
 
-import java.io.IOException
 import scala.collection.JavaConverters._
 
 class RDSegmentBuildExec(private val jobContext: SegmentJob, //
@@ -64,7 +65,7 @@ class RDSegmentBuildExec(private val jobContext: SegmentJob, 
//
       val paths = ResourceDetectUtils.getPaths(execution.sparkPlan, 
true).map(_.toString).asJava
       logInfo(s"Detected source: $sourceName $leaves 
${paths.asScala.mkString(",")}")
       val startTime = System.currentTimeMillis()
-      val resourceSize = 
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize,
+      val resourceSize = ResourceDetectUtils.getResourceSize(config, 
SparderEnv.getHadoopConfiguration(),
         paths.asScala.map(path => new Path(path)): _*)
       val endTime = System.currentTimeMillis()
       logInfo(s"Detect source size cost time is ${endTime - startTime} ms.")
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
index 6b23f991df..b40823875d 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
@@ -101,7 +101,7 @@ public class ResourceDetectBeforeCubingJob extends 
SparkApplication {
                 List<Path> paths = JavaConversions
                         
.seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan(),
 true));
                 resourceSize.put(String.valueOf(source.getLayoutId()),
-                        getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize(),
+                        getResourceSize(config, 
SparderEnv.getHadoopConfiguration(),
                                 
asScalaIteratorConverter(paths.iterator()).asScala().toSeq()));
 
                 layoutLeafTaskNums.put(String.valueOf(source.getLayoutId()), 
Integer.parseInt(leafNodeNum));
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
index 7c74757d36..468eb50420 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
@@ -18,11 +18,14 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.engine.spark.application.SparkApplication;
 import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist;
-import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
@@ -33,13 +36,10 @@ import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
 public class ResourceDetectBeforeMergingJob extends SparkApplication 
implements ResourceDetect {
     protected static final Logger logger = 
LoggerFactory.getLogger(ResourceDetectBeforeMergingJob.class);
 
@@ -67,7 +67,7 @@ public class ResourceDetectBeforeMergingJob extends 
SparkApplication implements
             List<Path> paths = JavaConversions
                     
.seqAsJavaList(ResourceDetectUtils.getPaths(afterMerge.queryExecution().sparkPlan(),
 true));
             resourceSize.put(String.valueOf(entry.getKey()),
-                    
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), 
config.isConcurrencyFetchDataSourceSize(),
+                    ResourceDetectUtils.getResourceSize(config, 
SparderEnv.getHadoopConfiguration(),
                             
JavaConverters.asScalaIteratorConverter(paths.iterator()).asScala().toSeq()));
         }
         ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, 
jobId),
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
index 2cf166b5ff..3b74081556 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java
@@ -17,11 +17,13 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import lombok.extern.slf4j.Slf4j;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.engine.spark.NSparkCubingEngine;
 import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -31,12 +33,11 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparderEnv;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
+
+import lombok.extern.slf4j.Slf4j;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConverters;
 
-import java.util.List;
-import java.util.Map;
-
 @Slf4j
 public class ResourceDetectBeforeSampling extends SparkApplication implements 
ResourceDetect {
     public static void main(String[] args) {
@@ -60,7 +61,7 @@ public class ResourceDetectBeforeSampling extends 
SparkApplication implements Re
 
         Map<String, Long> resourceSize = Maps.newHashMap();
         resourceSize.put(String.valueOf(tableName),
-                
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),config.isConcurrencyFetchDataSourceSize(),
+                ResourceDetectUtils.getResourceSize(config, 
SparderEnv.getHadoopConfiguration(),
                         
JavaConverters.asScalaIteratorConverter(paths.iterator()).asScala().toSeq()));
 
         Map<String, String> tableLeafTaskNums = Maps.newHashMap();
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala
index b50155c342..ceada5b655 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala
@@ -19,16 +19,16 @@
 package org.apache.spark.sql.hive.utils
 
 import java.io.FileOutputStream
+import java.nio.charset.Charset
 import java.util.{List => JList, Map => JMap}
-import org.apache.kylin.guava30.shaded.common.collect.{Lists, Maps}
+
 import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase
+import org.apache.kylin.guava30.shaded.common.collect.{Lists, Maps}
 import org.apache.spark.sql.common.SparderBaseFunSuite
 import org.apache.spark.util.Utils
 
-import java.nio.charset.Charset
-
 class TestResourceDetectUtils extends SparderBaseFunSuite {
   private var config: KylinConfig = _
 
@@ -71,16 +71,26 @@ class TestResourceDetectUtils extends SparderBaseFunSuite {
     val contents = List("test", "test_test_test")
     val tempDir = Utils.createTempDir()
     val files = List(new Path(tempDir.getPath, "test1"), new 
Path(tempDir.getPath, "test2"))
+    val files2 = List(new Path(tempDir.getPath, "test3"))
     try {
       for (i <- 0 to 1) {
         val out = new FileOutputStream(files.apply(i).toString)
         out.write(contents.apply(i).getBytes(Charset.defaultCharset()))
         out.close()
       }
-      var l = ResourceDetectUtils.getResourceSize(false, files.head, 
files.last)
+      var l = ResourceDetectUtils.getResourceSize(config, files.head, 
files.last)
       assert(l == 
contents.map(_.getBytes(Charset.defaultCharset()).length).sum)
-      l = ResourceDetectUtils.getResourceSize(true, files.head, files.last)
+
+      // test file not exist
+      l = ResourceDetectUtils.getResourceSize(config, files2.head)
+      assert(l == 0)
+
+      
config.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", 
"false")
+      l = ResourceDetectUtils.getResourceSize(config, files.head, files.last)
       assert(l == 
contents.map(_.getBytes(Charset.defaultCharset()).length).sum)
+
+      l = ResourceDetectUtils.getResourceSize(config, files2.head)
+      assert(l == 0)
     } finally {
       Utils.deleteRecursively(tempDir)
     }
@@ -103,7 +113,7 @@ class TestResourceDetectUtils extends SparderBaseFunSuite {
       }
       import scala.collection.JavaConverters._
 
-      val l = resourcePaths.values().asScala.map(path => 
ResourceDetectUtils.getResourceSize(false,
+      val l = resourcePaths.values().asScala.map(path => 
ResourceDetectUtils.getResourceSize(config,
         new Path(path.get(0)))).max
       assert(l == contents.last.getBytes(Charset.defaultCharset()).length)
     } finally {
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 2b09a05d66..bd72f8b219 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -104,10 +104,11 @@ object SparkSqlClient {
         val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan)
         var sourceTableSize: String = ""
         if (isConcurrency) {
-          sourceTableSize = 
ResourceDetectUtils.getResourceSizeWithTimeoutByConcurrency(
+          sourceTableSize = 
ResourceDetectUtils.getResourceSizeWithTimeoutByConcurrency(config,
             Duration(timeOut, TimeUnit.SECONDS), 
SparderEnv.getHadoopConfiguration(), paths: _*) + "b"
         } else {
-          sourceTableSize = 
ResourceDetectUtils.getResourceSizBySerial(SparderEnv.getHadoopConfiguration(), 
paths: _*) + "b"
+          sourceTableSize = ResourceDetectUtils.getResourceSizBySerial(config, 
SparderEnv.getHadoopConfiguration(),
+            paths: _*) + "b"
         }
         val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) 
/ basePartitionSize).toString
         df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, 
partitions)
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
index 4b1e06aeb9..0b0302c151 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
@@ -18,6 +18,11 @@
 
 package org.apache.spark.sql.hive.utils
 
+import java.io.IOException
+import java.nio.charset.Charset
+import java.util.concurrent.Executors
+import java.util.{Map => JMap}
+
 import com.google.gson.Gson
 import com.google.gson.reflect.TypeToken
 import org.apache.hadoop.conf.Configuration
@@ -37,10 +42,6 @@ import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.sources.NBaseRelation
 import org.apache.spark.util.ThreadUtils
 
-import java.io.IOException
-import java.nio.charset.Charset
-import java.util.concurrent.Executors
-import java.util.{Map => JMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.concurrent.duration._
@@ -169,36 +170,36 @@ object ResourceDetectUtils extends Logging {
     false
   }
 
-  def getResourceSize(configuration: Configuration, 
isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = {
+  def getResourceSize(kylinConfig: KylinConfig, configuration: Configuration, 
paths: Path*): Long = {
     val resourceSize = {
-      if (isConcurrencyFetchDataSourceSize) {
-        getResourceSizeWithTimeoutByConcurrency(Duration.Inf, configuration, 
paths: _*)
+      if (kylinConfig.isConcurrencyFetchDataSourceSize) {
+        getResourceSizeWithTimeoutByConcurrency(kylinConfig, Duration.Inf, 
configuration, paths: _*)
       } else {
-        getResourceSizBySerial(configuration, paths: _*)
+        getResourceSizBySerial(kylinConfig, configuration, paths: _*)
       }
     }
     resourceSize
   }
 
-  def getResourceSizBySerial(configuration: Configuration, paths: Path*): Long 
= {
+  def getResourceSizBySerial(kylinConfig: KylinConfig, configuration: 
Configuration, paths: Path*): Long = {
     paths.map(path => {
       QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, "Current step: 
get resource size.")
       val fs = path.getFileSystem(configuration)
       if (fs.exists(path)) {
-        HadoopUtil.getContentSummary(fs, path).getLength
+        HadoopUtil.getContentSummaryFromHdfsKylinConfig(fs, path, 
kylinConfig).getLength
       } else {
         0L
       }
     }).sum
   }
 
-  def getResourceSizeWithTimeoutByConcurrency(timeout: Duration, 
configuration: Configuration, paths: Path*): Long = {
-    val kylinConfig = KylinConfig.getInstanceFromEnv
+  def getResourceSizeWithTimeoutByConcurrency(kylinConfig: KylinConfig, 
timeout: Duration,
+                                              configuration: Configuration, 
paths: Path*): Long = {
     val threadNumber = 
kylinConfig.getConcurrencyFetchDataSourceSizeThreadNumber
     logInfo(s"Get resource size concurrency, thread number is $threadNumber")
     val executor = Executors.newFixedThreadPool(threadNumber)
     implicit val executionContext: ExecutionContextExecutor = 
ExecutionContext.fromExecutor(executor)
-    val futures: Seq[Future[Long]] = getResourceSize(configuration, 
executionContext, paths: _*)
+    val futures: Seq[Future[Long]] = getResourceSize(kylinConfig, 
configuration, executionContext, paths: _*)
     try {
       val combinedFuture = Future.sequence(futures)
       val results: Seq[Long] = ThreadUtils.awaitResult(combinedFuture, timeout)
@@ -208,8 +209,8 @@ object ResourceDetectUtils extends Logging {
     }
   }
 
-  def getResourceSize(configuration: Configuration, executionContext: 
ExecutionContextExecutor, paths: Path*): Seq[Future[Long]] = {
-    val kylinConfig = KylinConfig.getInstanceFromEnv
+  def getResourceSize(kylinConfig: KylinConfig, configuration: Configuration,
+                      executionContext: ExecutionContextExecutor, paths: 
Path*): Seq[Future[Long]] = {
     paths.map { path =>
       Future {
         val fs = path.getFileSystem(configuration)
@@ -222,8 +223,8 @@ object ResourceDetectUtils extends Logging {
     }
   }
 
-  def getResourceSize(isConcurrencyFetchDataSourceSize: Boolean, paths: 
Path*): Long = {
-    getResourceSize(HadoopUtil.getCurrentConfiguration, 
isConcurrencyFetchDataSourceSize, paths: _*)
+  def getResourceSize(kylinConfig: KylinConfig, paths: Path*): Long = {
+    getResourceSize(kylinConfig, HadoopUtil.getCurrentConfiguration, paths: _*)
   }
 
   def getMaxResourceSize(shareDir: Path): Long = {

Reply via email to