This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1d665a82829a82fc9a51181378aa87de1a98dea2
Author: huangsheng <huangshen...@163.com>
AuthorDate: Fri Jun 30 10:43:50 2023 +0800

    KYLIN-5762 Initialize job scheduler encounters NPE
---
 .../apache/kylin/rest/service/ProjectService.java  |  8 +++++++
 .../apache/kylin/common/constant/Constants.java    |  1 +
 .../org/apache/kylin/common/msg/CnMessage.java     |  6 +++++
 .../java/org/apache/kylin/common/msg/Message.java  |  7 ++++++
 .../job/impl/threadpool/NDefaultScheduler.java     | 21 ++++++++++-------
 .../apache/kylin/job/runners/FetcherRunner.java    |  3 +--
 .../job/impl/threadpool/NDefaultSchedulerTest.java | 21 +++++++++++++++++
 .../kylin/rest/service/ProjectServiceTest.java     | 26 ++++++++++++++++++++++
 8 files changed, 83 insertions(+), 10 deletions(-)

diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index d32d8b261e..416f577e0e 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -25,6 +25,7 @@ import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_PASS_
 import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_ENABLE_KEY;
 import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_NAME_KEY;
 import static 
org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_USER_KEY;
+import static 
org.apache.kylin.common.constant.Constants.KYLIN_JOB_MAX_CONCURRENT_JOBS;
 import static 
org.apache.kylin.common.constant.NonCustomProjectLevelConfig.DATASOURCE_TYPE;
 import static 
org.apache.kylin.common.exception.ServerErrorCode.DATABASE_NOT_EXIST;
 import static 
org.apache.kylin.common.exception.ServerErrorCode.DUPLICATE_PROJECT_NAME;
@@ -82,6 +83,7 @@ import org.apache.kylin.guava30.shaded.common.base.Strings;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.guava30.shaded.common.primitives.Ints;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -431,6 +433,12 @@ public class ProjectService extends BasicService {
         if (projectInstance == null) {
             throw new KylinException(PROJECT_NOT_EXIST, project);
         }
+        if (overrideKylinProps.containsKey(KYLIN_JOB_MAX_CONCURRENT_JOBS)) {
+            val maxConcurrentJobs = 
Ints.tryParse(overrideKylinProps.get(KYLIN_JOB_MAX_CONCURRENT_JOBS));
+            if (Objects.isNull(maxConcurrentJobs) || maxConcurrentJobs < 0)
+                throw new KylinException(INVALID_PARAMETER,
+                        
MsgPicker.getMsg().getIllegalNegative(KYLIN_JOB_MAX_CONCURRENT_JOBS));
+        }
         encryptJdbcPassInOverrideKylinProps(overrideKylinProps);
         projectManager.updateProject(project, copyForWrite -> 
copyForWrite.getOverrideKylinProps()
                 .putAll(KylinConfig.trimKVFromMap(overrideKylinProps)));
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java 
b/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java
index a49d012fa2..6f3a9e1f13 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java
+++ 
b/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java
@@ -48,6 +48,7 @@ public class Constants {
     public static final String KYLIN_SOURCE_JDBC_CONNECTION_URL_KEY = 
"kylin.source.jdbc.connection-url";
     public static final String KYLIN_SOURCE_JDBC_USER_KEY = 
"kylin.source.jdbc.user";
     public static final String KYLIN_SOURCE_JDBC_DRIVER_KEY = 
"kylin.source.jdbc.driver";
+    public static final String KYLIN_JOB_MAX_CONCURRENT_JOBS = 
"kylin.job.max-concurrent-jobs";
 
     public static final String UNLIMITED = "Unlimited";
     public static final String TRACE_ID = "traceId";
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java 
b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
index c62f52156d..ed626ec5d1 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
@@ -32,6 +32,7 @@ public class CnMessage extends Message {
     private static final String TASK_TIMEOUT = "执行超时";
 
     private static final String PARAMETER_EMPTY = "请输入参数 “%s” 的值。";
+    private static final String PARAMETER_MUST_BE_POSITIVE_NUMBER= "参数 %s 
的值必须为非负数.";
 
     protected CnMessage() {
 
@@ -1719,4 +1720,9 @@ public class CnMessage extends Message {
     public String getQueryNotRunningError() {
         return "该查询没有在运行,请检查";
     }
+
+    @Override
+    public String getIllegalNegative(String parameter) {
+        return String.format(PARAMETER_MUST_BE_POSITIVE_NUMBER, parameter);
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java 
b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index 79e267eab1..3dff08d642 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -47,6 +47,8 @@ public class Message {
     private static final String PROFILING_COLLECT_TIMEOUT = "Async profiler 
timeout";
 
     private static final String PARAMETER_EMPTY = "Please enter the value for 
the parameter '%s'.";
+    private static final String PARAMETER_MUST_BE_POSITIVE_NUMBER = "The value 
of %s must be a positive number.";
+
     private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, 
only support single `create view`, `drop view`,  `alter view`, `show create 
table`";
     private static final String DDL_VIEW_NAME_ERROR = "View names need to 
start with KYLIN_";
     private static final String DDL_VIEW_NAME_DUPLICATE_ERROR = "Logical View 
names is duplicate";
@@ -1621,4 +1623,9 @@ public class Message {
         return "Query is not running, please check.";
     }
 
+    public String getIllegalNegative(String parameter) {
+        return String.format(PARAMETER_MUST_BE_POSITIVE_NUMBER, parameter);
+
+    }
+
 }
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
 
b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
index 368785b51a..6d3d2e5742 100644
--- 
a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
+++ 
b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
@@ -29,13 +29,16 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import lombok.Setter;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
 import org.apache.kylin.common.util.SystemInfoCollector;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.base.Strings;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -43,18 +46,14 @@ import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.NExecutableManager;
 import org.apache.kylin.job.runners.FetcherRunner;
 import org.apache.kylin.job.runners.JobCheckRunner;
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.base.Strings;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-
-import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.Getter;
+import lombok.Setter;
 import lombok.SneakyThrows;
 import lombok.val;
 
@@ -153,7 +152,7 @@ public class NDefaultScheduler implements 
Scheduler<AbstractExecutable> {
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1,
                 new NamedThreadFactory("FetchJobWorker(project:" + project + 
")"));
-        int corePoolSize = getMaxConcurrentJobLimitByProject(config, 
jobEngineConfig, project);
+        int corePoolSize = 
getMaxConcurrentJobLimitByProjectForInitThread(config, jobEngineConfig, 
project);
         if (config.getAutoSetConcurrentJob()) {
             val availableMemoryRate = config.getMaxLocalConsumptionRatio();
             synchronized (NDefaultScheduler.class) {
@@ -232,6 +231,12 @@ public class NDefaultScheduler implements 
Scheduler<AbstractExecutable> {
         return 1.0 * memoryRemaining.availablePermits();
     }
 
+    public int getMaxConcurrentJobLimitByProjectForInitThread(KylinConfig 
config, JobEngineConfig jobEngineConfig,
+            String project) {
+        int jobLimit = getMaxConcurrentJobLimitByProject(config, 
jobEngineConfig, project);
+        return jobLimit <= 0 ? 1 : jobLimit;
+    }
+
     public int getMaxConcurrentJobLimitByProject(KylinConfig config, 
JobEngineConfig jobEngineConfig, String project) {
         ProjectInstance prjInstance = 
NProjectManager.getInstance(config).getProject(project);
         if (Strings.isNullOrEmpty(project) || prjInstance == null) {
diff --git 
a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java 
b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index 4494992d8c..31434a4761 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -80,7 +80,6 @@ public class FetcherRunner extends 
AbstractDefaultSchedulerRunner {
         }
         return false;
     }
-
     private boolean markErrorJob(String jobId) {
         try {
             return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
@@ -252,7 +251,7 @@ public class FetcherRunner extends 
AbstractDefaultSchedulerRunner {
     private void checkAndUpdateJobPoolNum() {
         final ThreadPoolExecutor pool = (ThreadPoolExecutor) jobPool;
         int maximumPoolSize = pool.getMaximumPoolSize();
-        int maxConcurrentJobLimit = 
nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+        int maxConcurrentJobLimit = 
nDefaultScheduler.getMaxConcurrentJobLimitByProjectForInitThread(context.getConfig(),
                 nDefaultScheduler.getJobEngineConfig(), project);
         int activeCount = pool.getActiveCount();
         if (maximumPoolSize == maxConcurrentJobLimit) {
diff --git 
a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
 
b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
index 3ab266d35e..f47060e229 100644
--- 
a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
+++ 
b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
@@ -1445,6 +1445,27 @@ public class NDefaultSchedulerTest extends 
BaseSchedulerTest {
 
         scheduler.shutdown();
         Assert.assertEquals(memory, 
NDefaultScheduler.getMemoryRemaining().availablePermits());
+
+        config.setProperty("kylin.job.max-concurrent-jobs", "0");
+        scheduler.init(new JobEngineConfig(config));
+        val memory2 = 
NDefaultScheduler.getMemoryRemaining().availablePermits();
+        val df2 = NDataflowManager.getInstance(getTestConfig(), 
project).getDataflow(modelId);
+        val job3 = generateJob(df2, project);
+        val job4 = generatePartial(df2, project);
+        executableManager.addJob(job3);
+        executableManager.addJob(job4);
+        val runningExecutables2 = 
executableManager.getRunningExecutables(project, modelId);
+        
runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.READY, 
runningExecutables2.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.READY, 
runningExecutables2.get(1).getStatus());
+        config.setProperty("kylin.job.max-concurrent-jobs", "2");
+        waitForJobByStatus(job3.getId(), 60000, null, executableManager);
+        waitForJobByStatus(job4.getId(), 60000, null, executableManager);
+        Assert.assertEquals(ExecutableState.SUCCEED, 
executableManager.getOutput(job1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, 
executableManager.getOutput(job2.getId()).getState());
+
+        scheduler.shutdown();
+        Assert.assertEquals(memory, 
NDefaultScheduler.getMemoryRemaining().availablePermits());
     }
 
     @Test
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 029b3c4fda..f47a64020b 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -529,6 +529,7 @@ public class ProjectServiceTest extends 
NLocalFileMetadataTestCase {
         testOverrideP.put(" testk1 ", " testv1 ");
         testOverrideP.put("tes   tk2", "test    v2");
         testOverrideP.put("      tes    tk3 ", "    t     estv3    ");
+        testOverrideP.put("kylin.job.max-concurrent-jobs", "10");
 
         projectService.updateProjectConfig(PROJECT, testOverrideP);
 
@@ -537,6 +538,31 @@ public class ProjectServiceTest extends 
NLocalFileMetadataTestCase {
         Assert.assertEquals("testv1", kylinConfigExt.get("testk1"));
         Assert.assertEquals("test    v2", kylinConfigExt.get("tes   tk2"));
         Assert.assertEquals("t     estv3", kylinConfigExt.get("tes    tk3"));
+        Assert.assertEquals("10", 
kylinConfigExt.get("kylin.job.max-concurrent-jobs"));
+
+        try {
+            Map<String, String> testOverrideWithIllegalValue = 
Maps.newLinkedHashMap();
+            
testOverrideWithIllegalValue.put("kylin.query.convert-sum-expression-enabled", 
"true");
+            testOverrideWithIllegalValue.put("kylin.job.max-concurrent-jobs", 
"test");
+            projectService.updateProjectConfig(PROJECT, 
testOverrideWithIllegalValue);
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertTrue(e.getMessage().contains("must be a positive 
number"));
+        }
+
+        try {
+            Map<String, String> testOverrideWithOutofRangeValue = 
Maps.newLinkedHashMap();
+            
testOverrideWithOutofRangeValue.put("kylin.query.convert-sum-expression-enabled",
 "true");
+            
testOverrideWithOutofRangeValue.put("kylin.job.max-concurrent-jobs", "-1");
+            projectService.updateProjectConfig(PROJECT, 
testOverrideWithOutofRangeValue);
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertTrue(e.getMessage().contains("must be a positive 
number"));
+        }
+
+
     }
 
     @Test

Reply via email to