This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5_beta in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7f3da6a6aed23e52d12cadbbec18eeb4bf4b60ae Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Thu Apr 27 19:24:16 2023 +0800 KYLIN-5646 Fix in yarn cluster mode, an error is reported for incrementally building the partition table job Co-authored-by: sibing.zhang <sibing.zh...@qq.com> --- .../engine/spark/job/RDPartitionBuildExec.scala | 4 +-- .../engine/spark/job/RDSegmentBuildExec.scala | 7 +++-- .../spark/job/ResourceDetectBeforeCubingJob.java | 2 +- .../spark/job/ResourceDetectBeforeMergingJob.java | 14 ++++----- .../spark/job/ResourceDetectBeforeSampling.java | 13 ++++---- .../sql/hive/utils/TestResourceDetectUtils.scala | 22 ++++++++++---- .../kylin/query/pushdown/SparkSqlClient.scala | 5 ++-- .../spark/sql/hive/utils/ResourceDetectUtils.scala | 35 +++++++++++----------- 8 files changed, 58 insertions(+), 44 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala index 6279452f39..f57a237508 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala @@ -69,8 +69,8 @@ class RDPartitionBuildExec(private val jobContext: SegmentJob, // ).asJava logInfo(s"Detected source: $sourceName $leaves ${paths.asScala.mkString(",")}") - sourceSize.put(sourceName, ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), - config.isConcurrencyFetchDataSourceSize, paths.asScala.map(path => new Path(path)): _*)) + sourceSize.put(sourceName, ResourceDetectUtils.getResourceSize(config, SparderEnv.getHadoopConfiguration(), + paths.asScala.map(path => new Path(path)): _*)) sourceLeaves.put(sourceName, leaves) } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala index 5e9abab57b..8716ee8f9a 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala @@ -18,16 +18,17 @@ package org.apache.kylin.engine.spark.job -import org.apache.kylin.guava30.shaded.common.collect.Maps +import java.io.IOException + import org.apache.hadoop.fs.Path import org.apache.kylin.engine.spark.job.stage.BuildParam import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase +import org.apache.kylin.guava30.shaded.common.collect.Maps import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.spark.sql.SparderEnv import org.apache.spark.sql.datasource.storage.StorageStoreUtils import org.apache.spark.sql.hive.utils.ResourceDetectUtils -import java.io.IOException import scala.collection.JavaConverters._ class RDSegmentBuildExec(private val jobContext: SegmentJob, // @@ -64,7 +65,7 @@ class RDSegmentBuildExec(private val jobContext: SegmentJob, // val paths = ResourceDetectUtils.getPaths(execution.sparkPlan, true).map(_.toString).asJava logInfo(s"Detected source: $sourceName $leaves ${paths.asScala.mkString(",")}") val startTime = System.currentTimeMillis() - val resourceSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize, + val resourceSize = ResourceDetectUtils.getResourceSize(config, SparderEnv.getHadoopConfiguration(), paths.asScala.map(path => new Path(path)): _*) val endTime = System.currentTimeMillis() logInfo(s"Detect source size cost time is ${endTime - startTime} ms.") diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java index 6b23f991df..b40823875d 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java @@ -101,7 +101,7 @@ public class ResourceDetectBeforeCubingJob extends SparkApplication { List<Path> paths = JavaConversions .seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan(), true)); resourceSize.put(String.valueOf(source.getLayoutId()), - getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize(), + getResourceSize(config, SparderEnv.getHadoopConfiguration(), asScalaIteratorConverter(paths.iterator()).asScala().toSeq())); layoutLeafTaskNums.put(String.valueOf(source.getLayoutId()), Integer.parseInt(leafNodeNum)); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java index 7c74757d36..468eb50420 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java @@ -18,11 +18,14 @@ package org.apache.kylin.engine.spark.job; -import org.apache.kylin.guava30.shaded.common.collect.Maps; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.Path; import org.apache.kylin.engine.spark.application.SparkApplication; import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist; -import org.apache.kylin.engine.spark.job.LogJobInfoUtils; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; @@ -33,13 +36,10 @@ import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.collection.JavaConversions; import scala.collection.JavaConverters; -import java.util.Collections; -import java.util.List; -import java.util.Map; - public class ResourceDetectBeforeMergingJob extends SparkApplication implements ResourceDetect { protected static final Logger logger = LoggerFactory.getLogger(ResourceDetectBeforeMergingJob.class); @@ -67,7 +67,7 @@ public class ResourceDetectBeforeMergingJob extends SparkApplication implements List<Path> paths = JavaConversions .seqAsJavaList(ResourceDetectUtils.getPaths(afterMerge.queryExecution().sparkPlan(), true)); resourceSize.put(String.valueOf(entry.getKey()), - ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), config.isConcurrencyFetchDataSourceSize(), + ResourceDetectUtils.getResourceSize(config, SparderEnv.getHadoopConfiguration(), JavaConverters.asScalaIteratorConverter(paths.iterator()).asScala().toSeq())); } ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, jobId), diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java index 2cf166b5ff..3b74081556 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeSampling.java @@ -17,11 +17,13 @@ package org.apache.kylin.engine.spark.job; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.fs.Path; import org.apache.kylin.engine.spark.NSparkCubingEngine; import org.apache.kylin.engine.spark.application.SparkApplication; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.metadata.cube.model.NBatchConstants; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.TableDesc; @@ -31,12 +33,11 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparderEnv; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; + +import lombok.extern.slf4j.Slf4j; import scala.collection.JavaConversions; import scala.collection.JavaConverters; -import java.util.List; -import java.util.Map; - @Slf4j public class ResourceDetectBeforeSampling extends SparkApplication implements ResourceDetect { public static void main(String[] args) { @@ -60,7 +61,7 @@ public class ResourceDetectBeforeSampling extends SparkApplication implements Re Map<String, Long> resourceSize = Maps.newHashMap(); resourceSize.put(String.valueOf(tableName), - ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),config.isConcurrencyFetchDataSourceSize(), + ResourceDetectUtils.getResourceSize(config, SparderEnv.getHadoopConfiguration(), JavaConverters.asScalaIteratorConverter(paths.iterator()).asScala().toSeq())); Map<String, String> tableLeafTaskNums = Maps.newHashMap(); diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala index b50155c342..ceada5b655 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtils.scala @@ -19,16 +19,16 @@ package org.apache.spark.sql.hive.utils import java.io.FileOutputStream +import java.nio.charset.Charset import java.util.{List => JList, Map => JMap} -import org.apache.kylin.guava30.shaded.common.collect.{Lists, Maps} + import org.apache.hadoop.fs.Path import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.NLocalFileMetadataTestCase +import org.apache.kylin.guava30.shaded.common.collect.{Lists, Maps} import org.apache.spark.sql.common.SparderBaseFunSuite import org.apache.spark.util.Utils -import java.nio.charset.Charset - class TestResourceDetectUtils extends SparderBaseFunSuite { private var config: KylinConfig = _ @@ -71,16 +71,26 @@ class TestResourceDetectUtils extends SparderBaseFunSuite { val contents = List("test", "test_test_test") val tempDir = Utils.createTempDir() val files = List(new Path(tempDir.getPath, "test1"), new Path(tempDir.getPath, "test2")) + val files2 = List(new Path(tempDir.getPath, "test3")) try { for (i <- 0 to 1) { val out = new FileOutputStream(files.apply(i).toString) out.write(contents.apply(i).getBytes(Charset.defaultCharset())) out.close() } - var l = ResourceDetectUtils.getResourceSize(false, files.head, files.last) + var l = ResourceDetectUtils.getResourceSize(config, files.head, files.last) assert(l == contents.map(_.getBytes(Charset.defaultCharset()).length).sum) - l = ResourceDetectUtils.getResourceSize(true, files.head, files.last) + + // test file not exist + l = ResourceDetectUtils.getResourceSize(config, files2.head) + assert(l == 0) + + config.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", "false") + l = ResourceDetectUtils.getResourceSize(config, files.head, files.last) assert(l == contents.map(_.getBytes(Charset.defaultCharset()).length).sum) + + l = ResourceDetectUtils.getResourceSize(config, files2.head) + assert(l == 0) } finally { Utils.deleteRecursively(tempDir) } @@ -103,7 +113,7 @@ class TestResourceDetectUtils extends SparderBaseFunSuite { } import scala.collection.JavaConverters._ - val l = resourcePaths.values().asScala.map(path => ResourceDetectUtils.getResourceSize(false, + val l = resourcePaths.values().asScala.map(path => ResourceDetectUtils.getResourceSize(config, new Path(path.get(0)))).max assert(l == contents.last.getBytes(Charset.defaultCharset()).length) } finally { diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index 2b09a05d66..bd72f8b219 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -104,10 +104,11 @@ object SparkSqlClient { val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan) var sourceTableSize: String = "" if (isConcurrency) { - sourceTableSize = ResourceDetectUtils.getResourceSizeWithTimeoutByConcurrency( + sourceTableSize = ResourceDetectUtils.getResourceSizeWithTimeoutByConcurrency(config, Duration(timeOut, TimeUnit.SECONDS), SparderEnv.getHadoopConfiguration(), paths: _*) + "b" } else { - sourceTableSize = ResourceDetectUtils.getResourceSizBySerial(SparderEnv.getHadoopConfiguration(), paths: _*) + "b" + sourceTableSize = ResourceDetectUtils.getResourceSizBySerial(config, SparderEnv.getHadoopConfiguration(), + paths: _*) + "b" } val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, partitions) diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala index 4b1e06aeb9..0b0302c151 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala @@ -18,6 +18,11 @@ package org.apache.spark.sql.hive.utils +import java.io.IOException +import java.nio.charset.Charset +import java.util.concurrent.Executors +import java.util.{Map => JMap} + import com.google.gson.Gson import com.google.gson.reflect.TypeToken import org.apache.hadoop.conf.Configuration @@ -37,10 +42,6 @@ import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.sources.NBaseRelation import org.apache.spark.util.ThreadUtils -import java.io.IOException -import java.nio.charset.Charset -import java.util.concurrent.Executors -import java.util.{Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.duration._ @@ -169,36 +170,36 @@ object ResourceDetectUtils extends Logging { false } - def getResourceSize(configuration: Configuration, isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = { + def getResourceSize(kylinConfig: KylinConfig, configuration: Configuration, paths: Path*): Long = { val resourceSize = { - if (isConcurrencyFetchDataSourceSize) { - getResourceSizeWithTimeoutByConcurrency(Duration.Inf, configuration, paths: _*) + if (kylinConfig.isConcurrencyFetchDataSourceSize) { + getResourceSizeWithTimeoutByConcurrency(kylinConfig, Duration.Inf, configuration, paths: _*) } else { - getResourceSizBySerial(configuration, paths: _*) + getResourceSizBySerial(kylinConfig, configuration, paths: _*) } } resourceSize } - def getResourceSizBySerial(configuration: Configuration, paths: Path*): Long = { + def getResourceSizBySerial(kylinConfig: KylinConfig, configuration: Configuration, paths: Path*): Long = { paths.map(path => { QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, "Current step: get resource size.") val fs = path.getFileSystem(configuration) if (fs.exists(path)) { - HadoopUtil.getContentSummary(fs, path).getLength + HadoopUtil.getContentSummaryFromHdfsKylinConfig(fs, path, kylinConfig).getLength } else { 0L } }).sum } - def getResourceSizeWithTimeoutByConcurrency(timeout: Duration, configuration: Configuration, paths: Path*): Long = { - val kylinConfig = KylinConfig.getInstanceFromEnv + def getResourceSizeWithTimeoutByConcurrency(kylinConfig: KylinConfig, timeout: Duration, + configuration: Configuration, paths: Path*): Long = { val threadNumber = kylinConfig.getConcurrencyFetchDataSourceSizeThreadNumber logInfo(s"Get resource size concurrency, thread number is $threadNumber") val executor = Executors.newFixedThreadPool(threadNumber) implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor) - val futures: Seq[Future[Long]] = getResourceSize(configuration, executionContext, paths: _*) + val futures: Seq[Future[Long]] = getResourceSize(kylinConfig, configuration, executionContext, paths: _*) try { val combinedFuture = Future.sequence(futures) val results: Seq[Long] = ThreadUtils.awaitResult(combinedFuture, timeout) @@ -208,8 +209,8 @@ object ResourceDetectUtils extends Logging { } } - def getResourceSize(configuration: Configuration, executionContext: ExecutionContextExecutor, paths: Path*): Seq[Future[Long]] = { - val kylinConfig = KylinConfig.getInstanceFromEnv + def getResourceSize(kylinConfig: KylinConfig, configuration: Configuration, + executionContext: ExecutionContextExecutor, paths: Path*): Seq[Future[Long]] = { paths.map { path => Future { val fs = path.getFileSystem(configuration) @@ -222,8 +223,8 @@ object ResourceDetectUtils extends Logging { } } - def getResourceSize(isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = { - getResourceSize(HadoopUtil.getCurrentConfiguration, isConcurrencyFetchDataSourceSize, paths: _*) + def getResourceSize(kylinConfig: KylinConfig, paths: Path*): Long = { + getResourceSize(kylinConfig, HadoopUtil.getCurrentConfiguration, paths: _*) } def getMaxResourceSize(shareDir: Path): Long = {