This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 1d665a82829a82fc9a51181378aa87de1a98dea2 Author: huangsheng <huangshen...@163.com> AuthorDate: Fri Jun 30 10:43:50 2023 +0800 KYLIN-5762 Initialize job scheduler encounters NPE --- .../apache/kylin/rest/service/ProjectService.java | 8 +++++++ .../apache/kylin/common/constant/Constants.java | 1 + .../org/apache/kylin/common/msg/CnMessage.java | 6 +++++ .../java/org/apache/kylin/common/msg/Message.java | 7 ++++++ .../job/impl/threadpool/NDefaultScheduler.java | 21 ++++++++++------- .../apache/kylin/job/runners/FetcherRunner.java | 3 +-- .../job/impl/threadpool/NDefaultSchedulerTest.java | 21 +++++++++++++++++ .../kylin/rest/service/ProjectServiceTest.java | 26 ++++++++++++++++++++++ 8 files changed, 83 insertions(+), 10 deletions(-) diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java index d32d8b261e..416f577e0e 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -25,6 +25,7 @@ import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_PASS_ import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_ENABLE_KEY; import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_SOURCE_NAME_KEY; import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_USER_KEY; +import static org.apache.kylin.common.constant.Constants.KYLIN_JOB_MAX_CONCURRENT_JOBS; import static org.apache.kylin.common.constant.NonCustomProjectLevelConfig.DATASOURCE_TYPE; import static org.apache.kylin.common.exception.ServerErrorCode.DATABASE_NOT_EXIST; import static org.apache.kylin.common.exception.ServerErrorCode.DUPLICATE_PROJECT_NAME; @@ -82,6 +83,7 @@ import org.apache.kylin.guava30.shaded.common.base.Strings; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.guava30.shaded.common.primitives.Ints; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.NExecutableManager; @@ -431,6 +433,12 @@ public class ProjectService extends BasicService { if (projectInstance == null) { throw new KylinException(PROJECT_NOT_EXIST, project); } + if (overrideKylinProps.containsKey(KYLIN_JOB_MAX_CONCURRENT_JOBS)) { + val maxConcurrentJobs = Ints.tryParse(overrideKylinProps.get(KYLIN_JOB_MAX_CONCURRENT_JOBS)); + if (Objects.isNull(maxConcurrentJobs) || maxConcurrentJobs < 0) + throw new KylinException(INVALID_PARAMETER, + MsgPicker.getMsg().getIllegalNegative(KYLIN_JOB_MAX_CONCURRENT_JOBS)); + } encryptJdbcPassInOverrideKylinProps(overrideKylinProps); projectManager.updateProject(project, copyForWrite -> copyForWrite.getOverrideKylinProps() .putAll(KylinConfig.trimKVFromMap(overrideKylinProps))); diff --git a/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java b/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java index a49d012fa2..6f3a9e1f13 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/constant/Constants.java @@ -48,6 +48,7 @@ public class Constants { public static final String KYLIN_SOURCE_JDBC_CONNECTION_URL_KEY = "kylin.source.jdbc.connection-url"; public static final String KYLIN_SOURCE_JDBC_USER_KEY = "kylin.source.jdbc.user"; public static final String KYLIN_SOURCE_JDBC_DRIVER_KEY = "kylin.source.jdbc.driver"; + public static final String KYLIN_JOB_MAX_CONCURRENT_JOBS = "kylin.job.max-concurrent-jobs"; public static final String UNLIMITED = "Unlimited"; public static final String TRACE_ID = "traceId"; diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java index c62f52156d..ed626ec5d1 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java @@ -32,6 +32,7 @@ public class CnMessage extends Message { private static final String TASK_TIMEOUT = "执行超时"; private static final String PARAMETER_EMPTY = "请输入参数 “%s” 的值。"; + private static final String PARAMETER_MUST_BE_POSITIVE_NUMBER= "参数 %s 的值必须为非负数."; protected CnMessage() { @@ -1719,4 +1720,9 @@ public class CnMessage extends Message { public String getQueryNotRunningError() { return "该查询没有在运行,请检查"; } + + @Override + public String getIllegalNegative(String parameter) { + return String.format(PARAMETER_MUST_BE_POSITIVE_NUMBER, parameter); + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java index 79e267eab1..3dff08d642 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java @@ -47,6 +47,8 @@ public class Message { private static final String PROFILING_COLLECT_TIMEOUT = "Async profiler timeout"; private static final String PARAMETER_EMPTY = "Please enter the value for the parameter '%s'."; + private static final String PARAMETER_MUST_BE_POSITIVE_NUMBER = "The value of %s must be a positive number."; + private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop view`, `alter view`, `show create table`"; private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KYLIN_"; private static final String DDL_VIEW_NAME_DUPLICATE_ERROR = "Logical View names is duplicate"; @@ -1621,4 +1623,9 @@ public class Message { return "Query is not running, please check."; } + public String getIllegalNegative(String parameter) { + return String.format(PARAMETER_MUST_BE_POSITIVE_NUMBER, parameter); + + } + } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java index 368785b51a..6d3d2e5742 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java @@ -29,13 +29,16 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import lombok.Setter; import org.apache.commons.lang3.RandomUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.ExecutorServiceUtil; import org.apache.kylin.common.util.NamedThreadFactory; import org.apache.kylin.common.util.SystemInfoCollector; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.base.Strings; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; @@ -43,18 +46,14 @@ import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.NExecutableManager; import org.apache.kylin.job.runners.FetcherRunner; import org.apache.kylin.job.runners.JobCheckRunner; +import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.base.Strings; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; - -import org.apache.kylin.metadata.epoch.EpochManager; import lombok.Getter; +import lombok.Setter; import lombok.SneakyThrows; import lombok.val; @@ -153,7 +152,7 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { //load all executable, set them to a consistent status fetcherPool = Executors.newScheduledThreadPool(1, new NamedThreadFactory("FetchJobWorker(project:" + project + ")")); - int corePoolSize = getMaxConcurrentJobLimitByProject(config, jobEngineConfig, project); + int corePoolSize = getMaxConcurrentJobLimitByProjectForInitThread(config, jobEngineConfig, project); if (config.getAutoSetConcurrentJob()) { val availableMemoryRate = config.getMaxLocalConsumptionRatio(); synchronized (NDefaultScheduler.class) { @@ -232,6 +231,12 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { return 1.0 * memoryRemaining.availablePermits(); } + public int getMaxConcurrentJobLimitByProjectForInitThread(KylinConfig config, JobEngineConfig jobEngineConfig, + String project) { + int jobLimit = getMaxConcurrentJobLimitByProject(config, jobEngineConfig, project); + return jobLimit <= 0 ? 1 : jobLimit; + } + public int getMaxConcurrentJobLimitByProject(KylinConfig config, JobEngineConfig jobEngineConfig, String project) { ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project); if (Strings.isNullOrEmpty(project) || prjInstance == null) { diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java index 4494992d8c..31434a4761 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java @@ -80,7 +80,6 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { } return false; } - private boolean markErrorJob(String jobId) { try { return EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { @@ -252,7 +251,7 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { private void checkAndUpdateJobPoolNum() { final ThreadPoolExecutor pool = (ThreadPoolExecutor) jobPool; int maximumPoolSize = pool.getMaximumPoolSize(); - int maxConcurrentJobLimit = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(), + int maxConcurrentJobLimit = nDefaultScheduler.getMaxConcurrentJobLimitByProjectForInitThread(context.getConfig(), nDefaultScheduler.getJobEngineConfig(), project); int activeCount = pool.getActiveCount(); if (maximumPoolSize == maxConcurrentJobLimit) { diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java index 3ab266d35e..f47060e229 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java @@ -1445,6 +1445,27 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest { scheduler.shutdown(); Assert.assertEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits()); + + config.setProperty("kylin.job.max-concurrent-jobs", "0"); + scheduler.init(new JobEngineConfig(config)); + val memory2 = NDefaultScheduler.getMemoryRemaining().availablePermits(); + val df2 = NDataflowManager.getInstance(getTestConfig(), project).getDataflow(modelId); + val job3 = generateJob(df2, project); + val job4 = generatePartial(df2, project); + executableManager.addJob(job3); + executableManager.addJob(job4); + val runningExecutables2 = executableManager.getRunningExecutables(project, modelId); + runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime)); + Assert.assertEquals(ExecutableState.READY, runningExecutables2.get(0).getStatus()); + Assert.assertEquals(ExecutableState.READY, runningExecutables2.get(1).getStatus()); + config.setProperty("kylin.job.max-concurrent-jobs", "2"); + waitForJobByStatus(job3.getId(), 60000, null, executableManager); + waitForJobByStatus(job4.getId(), 60000, null, executableManager); + Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(job1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(job2.getId()).getState()); + + scheduler.shutdown(); + Assert.assertEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits()); } @Test diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java index 029b3c4fda..f47a64020b 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java @@ -529,6 +529,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { testOverrideP.put(" testk1 ", " testv1 "); testOverrideP.put("tes tk2", "test v2"); testOverrideP.put(" tes tk3 ", " t estv3 "); + testOverrideP.put("kylin.job.max-concurrent-jobs", "10"); projectService.updateProjectConfig(PROJECT, testOverrideP); @@ -537,6 +538,31 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { Assert.assertEquals("testv1", kylinConfigExt.get("testk1")); Assert.assertEquals("test v2", kylinConfigExt.get("tes tk2")); Assert.assertEquals("t estv3", kylinConfigExt.get("tes tk3")); + Assert.assertEquals("10", kylinConfigExt.get("kylin.job.max-concurrent-jobs")); + + try { + Map<String, String> testOverrideWithIllegalValue = Maps.newLinkedHashMap(); + testOverrideWithIllegalValue.put("kylin.query.convert-sum-expression-enabled", "true"); + testOverrideWithIllegalValue.put("kylin.job.max-concurrent-jobs", "test"); + projectService.updateProjectConfig(PROJECT, testOverrideWithIllegalValue); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof KylinException); + Assert.assertTrue(e.getMessage().contains("must be a positive number")); + } + + try { + Map<String, String> testOverrideWithOutofRangeValue = Maps.newLinkedHashMap(); + testOverrideWithOutofRangeValue.put("kylin.query.convert-sum-expression-enabled", "true"); + testOverrideWithOutofRangeValue.put("kylin.job.max-concurrent-jobs", "-1"); + projectService.updateProjectConfig(PROJECT, testOverrideWithOutofRangeValue); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof KylinException); + Assert.assertTrue(e.getMessage().contains("must be a positive number")); + } + + } @Test