This is an automated email from the ASF dual-hosted git repository.
jlfsdtc pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 1ddc347b31 KYLIN-6022 internalTableLoading Job support parallel build
1ddc347b31 is described below
commit 1ddc347b314643bea77bba144052f89c44279594
Author: Zhong.Zhu <[email protected]>
AuthorDate: Mon Nov 11 14:06:52 2024 +0800
KYLIN-6022 internalTableLoading Job support parallel build
---
.../kylin/job/handler/InternalTableJobHandler.java | 17 -----------------
.../kylin/rest/service/InternalTableServiceTest.java | 9 ++++++---
.../engine/spark/builder/InternalTableLoader.scala | 5 +----
3 files changed, 7 insertions(+), 24 deletions(-)
diff --git
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
index 831cae95fb..d127ccb76c 100644
---
a/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
+++
b/src/core-job/src/main/java/org/apache/kylin/job/handler/InternalTableJobHandler.java
@@ -18,15 +18,7 @@
package org.apache.kylin.job.handler;
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.job.domain.JobInfo;
import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableManager;
-import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.factory.JobFactory;
import org.apache.kylin.job.factory.JobFactoryConstant;
import org.apache.kylin.job.manager.JobManager;
@@ -71,15 +63,6 @@ public class InternalTableJobHandler extends
AbstractJobHandler {
@Override
protected void checkBeforeHandle(JobParam jobParam) {
String project = jobParam.getProject();
- String table = jobParam.getTable();
- List<JobInfo> existingJobs =
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
- .fetchNotFinalJobsByTypes(project,
Lists.newArrayList(JobTypeEnum.INTERNAL_TABLE_BUILD.name(),
- JobTypeEnum.INTERNAL_TABLE_REFRESH.name(),
JobTypeEnum.INTERNAL_TABLE_DELETE_PARTITION.name()),
- Lists.newArrayList(table));
- ExecutableManager execMgr =
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
- if (CollectionUtils.isNotEmpty(existingJobs)) {
- existingJobs.forEach(jobInfo ->
execMgr.discardJob(jobInfo.getJobId()));
- }
JobManager.checkStorageQuota(project);
}
diff --git
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
index 331e16bd6b..4d3f5db4da 100644
---
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
+++
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/InternalTableServiceTest.java
@@ -418,16 +418,19 @@ public class InternalTableServiceTest extends
AbstractTestCase {
Assertions.assertTrue(count > 0);
// refresh all loaded table
- response = internalTableService.loadIntoInternalTable(PROJECT,
table.getName(), table.getDatabase(), false,
- true, "", "", null);
+ endDate = "1325779200000"; // 2012-01-06
+ response = internalTableService.loadIntoInternalTable(PROJECT,
table.getName(), table.getDatabase(), true, true,
+ startDate, endDate, null);
Assert.assertFalse(response.getJobs().isEmpty());
+ jobId = response.getJobs().get(0).getJobId();
+ waitJobToFinished(config, jobId);
// check refresh time out of loaded range
Assertions.assertThrows(Exception.class, () ->
internalTableService.loadIntoInternalTable(PROJECT,
table.getName(), table.getDatabase(), false, true,
"1316556800000", "", null));// 2011-09-21 ~ ~
Assertions.assertThrows(Exception.class, () ->
internalTableService.loadIntoInternalTable(PROJECT,
table.getName(), table.getDatabase(), false, true,
"1326556800000", "", null));// 2012-01-15 ~ ~
Assertions.assertThrows(Exception.class, () ->
internalTableService.loadIntoInternalTable(PROJECT,
- table.getName(), table.getDatabase(), false, true, "",
endDate, null));// ~ ~ 2012-01-07
+ table.getName(), table.getDatabase(), false, true, "",
"1325865600000", null));// ~ ~ 2012-01-07
Assertions.assertThrows(Exception.class, () ->
internalTableService.loadIntoInternalTable(PROJECT,
table.getName(), table.getDatabase(), false, true, startDate,
"1326556800000", null));// 2012-01-01 ~ 2012-01-15
Assertions.assertThrows(Exception.class, () ->
internalTableService.loadIntoInternalTable(PROJECT,
diff --git
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
index 4849a6ddab..497ee08b56 100644
---
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
+++
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/InternalTableLoader.scala
@@ -72,10 +72,7 @@ class InternalTableLoader extends Logging {
val bucketNum = table.getBucketNumber
val primaryKey = table.getTblProperties.get(NBatchConstants.P_PRIMARY_KEY)
val orderByKey = table.getTblProperties.get(NBatchConstants.P_ORDER_BY_KEY)
- val outPutMode = isRefresh match {
- case "true" => OVERWRITE
- case "false" => APPEND
- }
+ val outPutMode = OVERWRITE
var writer = sourceData.write.option(STORAGE_POLICY, storagePolicy)
if (tablePartition != null) {