This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new 35e1487e57 KYLIN-5285 the number of filePartitions of FileScanRDD when 
used shardby
35e1487e57 is described below

commit 35e1487e57cc43bba5e592dfcb4c41a2fb1e8a8b
Author: zhaoliu4 <zhaol...@iflytek.com>
AuthorDate: Mon Oct 31 11:04:08 2022 +0800

    KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby
    
    KYLIN-5285 the number of filePartitions of FileScanRDD when used shardby
---
 .../apache/spark/sql/execution/KylinFileSourceScanExec.scala   |  9 ++++++---
 .../main/scala/org/apache/spark/application/JobWorkSpace.scala | 10 +---------
 2 files changed, 7 insertions(+), 12 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 61fef995fe..534bb5663b 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -162,9 +162,12 @@ class KylinFileSourceScanExec(
         f => FilePruner.getPartitionId(new Path(f.filePath))
       }
 
-    val filePartitions = Seq.tabulate(shardSpec.numShards) { shardId =>
-      FilePartition(shardId, filesToPartitionId.getOrElse(shardId, 
Nil).toArray)
-    }
+    var shardId = 0
+    val filePartitions = new ArrayBuffer[FilePartition]()
+    filesToPartitionId.foreach(t => {
+      filePartitions += FilePartition(shardId, t._2.toArray)
+      shardId += 1
+    })
 
     if (SoftAffinityManager.usingSoftAffinity) {
       val start = System.currentTimeMillis()
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
index 8c2c30a53e..03029efef4 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -35,15 +35,7 @@ object JobWorkSpace extends Logging {
       val worker = new JobWorker(application, appArgs, eventLoop)
       val monitor = new JobMonitor(eventLoop)
       val workspace = new JobWorkSpace(eventLoop, monitor, worker)
-
-      if (System.getProperty("spark.master").equals("yarn") && 
System.getProperty("spark.submit.deployMode").equals("cluster")) {
-        val res = workspace.run()
-        if (res != 0) {
-          System.exit(res)
-        }
-      } else {
-        System.exit(workspace.run())
-      }
+      workspace.run()
     } catch {
       case throwable: Throwable =>
         logError("Error occurred when init job workspace.", throwable)

Reply via email to