This is an automated email from the ASF dual-hosted git repository.

jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 1ddc347b31 KYLIN-6022 internalTableLoading Job support parallel build
1ddc347b31 is described below

commit 1ddc347b314643bea77bba144052f89c44279594
Author: Zhong.Zhu <[email protected]>
AuthorDate: Mon Nov 11 14:06:52 2024 +0800

    KYLIN-6022 internalTableLoading Job support parallel build
---
 .../kylin/job/handler/InternalTableJobHandler.java      | 17 -----------------
 .../kylin/rest/service/InternalTableServiceTest.java    |  9 ++++++---
 .../engine/spark/builder/InternalTableLoader.scala      |  5 +----
 3 files changed, 7 insertions(+), 24 deletions(-)

diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
index 831cae95fb..d127ccb76c 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
@@ -18,15 +18,7 @@
 
 package org.apache.kylin.job.handler;
 
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.job.domain.JobInfo;
 import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableManager;
-import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.factory.JobFactory;
 import org.apache.kylin.job.factory.JobFactoryConstant;
 import org.apache.kylin.job.manager.JobManager;
@@ -71,15 +63,6 @@ public class InternalTableJobHandler extends 
AbstractJobHandler {
     @Override
     protected void checkBeforeHandle(JobParam jobParam) {
         String project = jobParam.getProject();
-        String table = jobParam.getTable();
-        List<JobInfo> existingJobs = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
-                .fetchNotFinalJobsByTypes(project, 
Lists.newArrayList(JobTypeEnum.INTERNAL_TABLE_BUILD.name(),
-                        JobTypeEnum.INTERNAL_TABLE_REFRESH.name(), 
JobTypeEnum.INTERNAL_TABLE_DELETE_PARTITION.name()),
-                        Lists.newArrayList(table));
-        ExecutableManager execMgr = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        if (CollectionUtils.isNotEmpty(existingJobs)) {
-            existingJobs.forEach(jobInfo -> 
execMgr.discardJob(jobInfo.getJobId()));
-        }
         JobManager.checkStorageQuota(project);
     }
 
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
index 331e16bd6b..4d3f5db4da 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
@@ -418,16 +418,19 @@ public class InternalTableServiceTest extends 
AbstractTestCase {
         Assertions.assertTrue(count > 0);
 
         // refresh all loaded table
-        response = internalTableService.loadIntoInternalTable(PROJECT, 
table.getName(), table.getDatabase(), false,
-                true, "", "", null);
+        endDate = "1325779200000"; // 2012-01-06
+        response = internalTableService.loadIntoInternalTable(PROJECT, 
table.getName(), table.getDatabase(), true, true,
+                startDate, endDate, null);
         Assert.assertFalse(response.getJobs().isEmpty());
+        jobId = response.getJobs().get(0).getJobId();
+        waitJobToFinished(config, jobId);
         // check refresh time out of loaded range
         Assertions.assertThrows(Exception.class, () -> 
internalTableService.loadIntoInternalTable(PROJECT,
                 table.getName(), table.getDatabase(), false, true, 
"1316556800000", "", null));// 2011-09-21 ~ ~
         Assertions.assertThrows(Exception.class, () -> 
internalTableService.loadIntoInternalTable(PROJECT,
                 table.getName(), table.getDatabase(), false, true, 
"1326556800000", "", null));// 2012-01-15 ~ ~
         Assertions.assertThrows(Exception.class, () -> 
internalTableService.loadIntoInternalTable(PROJECT,
-                table.getName(), table.getDatabase(), false, true, "", 
endDate, null));// ~ ~ 2012-01-07
+                table.getName(), table.getDatabase(), false, true, "", 
"1325865600000", null));// ~ ~ 2012-01-07
         Assertions.assertThrows(Exception.class, () -> 
internalTableService.loadIntoInternalTable(PROJECT,
                 table.getName(), table.getDatabase(), false, true, startDate, 
"1326556800000", null));// 2012-01-01 ~ 2012-01-15
         Assertions.assertThrows(Exception.class, () -> 
internalTableService.loadIntoInternalTable(PROJECT,
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 4849a6ddab..497ee08b56 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -72,10 +72,7 @@ class InternalTableLoader extends Logging {
     val bucketNum = table.getBucketNumber
     val primaryKey = table.getTblProperties.get(NBatchConstants.P_PRIMARY_KEY)
     val orderByKey = table.getTblProperties.get(NBatchConstants.P_ORDER_BY_KEY)
-    val outPutMode = isRefresh match {
-      case "true" => OVERWRITE
-      case "false" => APPEND
-    }
+    val outPutMode = OVERWRITE
     var writer = sourceData.write.option(STORAGE_POLICY, storagePolicy)
 
     if (tablePartition != null) {

Reply via email to