This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new 35e1487e57 KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby 35e1487e57 is described below commit 35e1487e57cc43bba5e592dfcb4c41a2fb1e8a8b Author: zhaoliu4 <zhaol...@iflytek.com> AuthorDate: Mon Oct 31 11:04:08 2022 +0800 KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby --- .../apache/spark/sql/execution/KylinFileSourceScanExec.scala | 9 ++++++--- .../main/scala/org/apache/spark/application/JobWorkSpace.scala | 10 +--------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala index 61fef995fe..534bb5663b 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala @@ -162,9 +162,12 @@ class KylinFileSourceScanExec( f => FilePruner.getPartitionId(new Path(f.filePath)) } - val filePartitions = Seq.tabulate(shardSpec.numShards) { shardId => - FilePartition(shardId, filesToPartitionId.getOrElse(shardId, Nil).toArray) - } + var shardId = 0 + val filePartitions = new ArrayBuffer[FilePartition]() + filesToPartitionId.foreach(t => { + filePartitions += FilePartition(shardId, t._2.toArray) + shardId += 1 + }) if (SoftAffinityManager.usingSoftAffinity) { val start = System.currentTimeMillis() diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala index 8c2c30a53e..03029efef4 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala @@ -35,15 +35,7 @@ object JobWorkSpace extends Logging { val worker = new JobWorker(application, appArgs, eventLoop) val monitor = new JobMonitor(eventLoop) val workspace = new JobWorkSpace(eventLoop, monitor, worker) - - if (System.getProperty("spark.master").equals("yarn") && System.getProperty("spark.submit.deployMode").equals("cluster")) { - val res = workspace.run() - if (res != 0) { - System.exit(res) - } - } else { - System.exit(workspace.run()) - } + workspace.run() } catch { case throwable: Throwable => logError("Error occurred when init job workspace.", throwable)