This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5bca044690dabb78e18e86890d4283bd67b015ae Author: Hang Jia <754332...@qq.com> AuthorDate: Thu Oct 27 10:57:59 2022 +0800 KYLIN-5349 Support project-level configuration of concurrent task limits --- .../job/impl/threadpool/NDefaultScheduler.java | 19 ++++-- .../apache/kylin/job/runners/FetcherRunner.java | 27 +++++++- .../job/impl/threadpool/NDefaultSchedulerTest.java | 79 +++++++++++++++++++++- .../kylin/rest/service/ModelBuildService.java | 17 ++++- .../kylin/rest/service/ModelServiceBuildTest.java | 46 +++++++++++++ 5 files changed, 177 insertions(+), 11 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java index d3efe67bb9..8be23b4c19 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java @@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.Setter; import org.apache.commons.lang3.RandomUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.ExecutorServiceUtil; import org.apache.kylin.common.util.NamedThreadFactory; +import org.apache.kylin.common.util.SystemInfoCollector; import org.apache.kylin.job.Scheduler; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; @@ -43,16 +45,17 @@ import org.apache.kylin.job.runners.FetcherRunner; import org.apache.kylin.job.runners.JobCheckRunner; import org.apache.kylin.job.runners.LicenseCapacityCheckRunner; import org.apache.kylin.job.runners.QuotaStorageCheckRunner; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; -import org.apache.kylin.common.util.SystemInfoCollector; -import org.apache.kylin.metadata.epoch.EpochManager; +import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.kylin.metadata.epoch.EpochManager; import lombok.Getter; import lombok.SneakyThrows; import lombok.val; @@ -152,7 +155,7 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { //load all executable, set them to a consistent status fetcherPool = Executors.newScheduledThreadPool(1, new NamedThreadFactory("FetchJobWorker(project:" + project + ")")); - int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit(); + int corePoolSize = getMaxConcurrentJobLimitByProject(config, jobEngineConfig, project); if (config.getAutoSetConcurrentJob()) { val availableMemoryRate = config.getMaxLocalConsumptionRatio(); synchronized (NDefaultScheduler.class) { @@ -238,4 +241,12 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> { return 1.0 * memoryRemaining.availablePermits(); } + public int getMaxConcurrentJobLimitByProject(KylinConfig config, JobEngineConfig jobEngineConfig, String project) { + ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project); + if (Strings.isNullOrEmpty(project) || prjInstance == null) { + return jobEngineConfig.getMaxConcurrentJobLimit(); + } + return prjInstance.getConfig().getMaxConcurrentJobLimit(); + } + } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java index ab659c48a8..425a228f74 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java @@ -20,6 +20,7 @@ package org.apache.kylin.job.runners; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.kylin.common.KylinConfig; @@ -103,6 +104,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { reSchedule = false; return; } + checkAndUpdateJobPoolNum(); + val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); Map<String, Executable> runningJobs = context.getRunningJobs(); @@ -236,7 +239,9 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { } private boolean isJobPoolFull() { - if (context.getRunningJobs().size() >= nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) { + int corePoolSize = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(), + nDefaultScheduler.getJobEngineConfig(), project); + if (context.getRunningJobs().size() >= corePoolSize) { logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time."); return true; } @@ -246,4 +251,24 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner { void scheduleNext() { fetcherPool.schedule(this, 0, TimeUnit.SECONDS); } + + private void checkAndUpdateJobPoolNum() { + final ThreadPoolExecutor pool = (ThreadPoolExecutor) jobPool; + int maximumPoolSize = pool.getMaximumPoolSize(); + int maxConcurrentJobLimit = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(), + nDefaultScheduler.getJobEngineConfig(), project); + int activeCount = pool.getActiveCount(); + if (maximumPoolSize == maxConcurrentJobLimit) { + return; + } + if (maximumPoolSize < maxConcurrentJobLimit) { + pool.setCorePoolSize(maxConcurrentJobLimit); + pool.setMaximumPoolSize(maxConcurrentJobLimit); + return; + } + if (activeCount <= maxConcurrentJobLimit) { + pool.setCorePoolSize(maxConcurrentJobLimit); + pool.setMaximumPoolSize(maxConcurrentJobLimit); + } + } } diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java index 77990e29ca..f6602d2cd1 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java @@ -71,10 +71,10 @@ import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NDataflowUpdate; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; -import org.apache.kylin.metadata.epoch.EpochManager; import org.apache.kylin.metadata.model.ManagementType; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.assertj.core.api.Assertions; import org.awaitility.core.ConditionTimeoutException; @@ -93,6 +93,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.kylin.metadata.epoch.EpochManager; import lombok.val; import lombok.var; @@ -940,8 +941,7 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest { Assert.assertEquals(RealizationStatusEnum.ONLINE, updateDf.getStatus()); } - private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented, - JobTypeEnum indexBuild) { + private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented, JobTypeEnum indexBuild) { val dfMgr = NDataflowManager.getInstance(getTestConfig(), project); val modelMgr = NDataModelManager.getInstance(getTestConfig(), project); modelMgr.updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> { @@ -2026,4 +2026,77 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest { scheduler.getContext().setReachQuotaLimit(false); } } + + @Test + @Repeat(3) + public void testProjectConcurrentJobLimit() { + String project = "heterogeneous_segment"; + String modelId = "747f864b-9721-4b97-acde-0aa8e8656cba"; + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty("kylin.job.max-concurrent-jobs", "1"); + config.setProperty("kylin.engine.driver-memory-base", "512"); + + val scheduler = NDefaultScheduler.getInstance(project); + val originExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + val executableManager = Mockito.spy(originExecutableManager); + executableManager.deleteAllJob(); + Mockito.doAnswer(invocation -> { + String jobId = invocation.getArgument(0); + originExecutableManager.destroyProcess(jobId); + return null; + }).when(executableManager).destroyProcess(Mockito.anyString()); + + scheduler.init(new JobEngineConfig(config)); + val projectManager = NProjectManager.getInstance(config); + + if (!scheduler.hasStarted()) { + throw new RuntimeException("scheduler has not been started"); + } + int memory = NDefaultScheduler.getMemoryRemaining().availablePermits(); + val df = NDataflowManager.getInstance(getTestConfig(), project).getDataflow(modelId); + val job1 = generateJob(df, project); + val job2 = generatePartial(df, project); + executableManager.addJob(job1); + executableManager.addJob(job2); + waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager); + Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits()); + var runningExecutables = executableManager.getRunningExecutables(project, modelId); + runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime)); + Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus()); + Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus()); + + projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "2"); + Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits()); + val job3 = generateJob(df, project); + executableManager.addJob(job3); + waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager); + waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager); + runningExecutables = executableManager.getRunningExecutables(project, modelId); + runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime)); + Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus()); + Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus()); + Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus()); + + projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "1"); + waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager); + waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager); + + runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime)); + Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus()); + Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus()); + Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus()); + + waitForJobByStatus(job1.getId(), 60000, null, executableManager); + runningExecutables = executableManager.getRunningExecutables(project, modelId); + Assert.assertEquals(2, runningExecutables.size()); + runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime)); + Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus()); + Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus()); + + scheduler.shutdown(); + Assert.assertEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits()); + + Assert.assertEquals(1, + scheduler.getMaxConcurrentJobLimitByProject(config, scheduler.getJobEngineConfig(), "xxxxx")); + } } diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java index 5548fd30d5..93724c8c26 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java @@ -66,6 +66,8 @@ import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.util.MultiPartitionUtil; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.sourceusage.SourceUsageManager; import org.apache.kylin.query.util.PushDownUtil; import org.apache.kylin.rest.aspect.Transaction; @@ -85,6 +87,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -471,7 +474,7 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil String segmentId, Set<Long> partitionIds, int priority, String yarnQueue, Object tag) { val jobIds = Lists.<String> newArrayList(); if (parallelBuild) { - checkConcurrentSubmit(partitionIds.size()); + checkConcurrentSubmit(partitionIds.size(), project); partitionIds.forEach(partitionId -> { val jobParam = new JobParam(Sets.newHashSet(segmentId), null, modelId, getUsername(), Sets.newHashSet(partitionId), null).withPriority(priority).withYarnQueue(yarnQueue) @@ -490,14 +493,22 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil return JobInfoResponse.of(jobIds, JobTypeEnum.SUB_PARTITION_BUILD.toString()); } - private void checkConcurrentSubmit(int partitionSize) { - int runningJobLimit = getConfig().getMaxConcurrentJobLimit(); + private void checkConcurrentSubmit(int partitionSize, String project) { + int runningJobLimit = getMaxConcurrentJobLimitByProject(getConfig(), project); int submitJobLimit = runningJobLimit * 5; if (partitionSize > submitJobLimit) { throw new KylinException(JOB_CONCURRENT_SUBMIT_LIMIT, submitJobLimit); } } + public int getMaxConcurrentJobLimitByProject(KylinConfig config, String project) { + ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project); + if (Strings.isNullOrEmpty(project) || prjInstance == null) { + return config.getMaxConcurrentJobLimit(); + } + return prjInstance.getConfig().getMaxConcurrentJobLimit(); + } + @Override @Transaction(project = 0) public void refreshSegments(String project, String table, String refreshStart, String refreshEnd, diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java index 3dad56193f..6c9ea32d8b 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.TimeZone; import java.util.stream.Collectors; @@ -125,6 +126,7 @@ import org.mockito.Mockito; import org.springframework.test.util.ReflectionTestUtils; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.val; @@ -1647,4 +1649,48 @@ public class ModelServiceBuildTest extends SourceTestCase { Assert.assertThrows(KylinException.class, () -> DateFormat.proposeDateFormat("not_exits")); } + @Test + public void testGetMaxConcurrentJobLimitByProject() { + String project = getProject(); + val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"; + val segmentId = "ff839b0b-2c23-4420-b332-0df70e36c343"; + val buildPartitions = Lists.<String[]> newArrayList(); + buildPartitions.add(new String[] { "ASIA" }); + buildPartitions.add(new String[] { "EUROPE" }); + buildPartitions.add(new String[] { "MIDDLE EAST" }); + buildPartitions.add(new String[] { "AMERICA" }); + buildPartitions.add(new String[] { "MOROCCO" }); + buildPartitions.add(new String[] { "INDONESIA" }); + + overwriteSystemProp("kylin.job.max-concurrent-jobs", "1"); + Assert.assertEquals(1, + modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project)); + try { + modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId, buildPartitions, true, + false, 0, null, null); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof KylinException); + Assert.assertEquals(JOB_CONCURRENT_SUBMIT_LIMIT.getMsg(5), e.getMessage()); + Assert.assertEquals(0, getRunningExecutables(getProject(), modelId).size()); + } + + val segmentId2 = "d2edf0c5-5eb2-4968-9ad5-09efbf659324"; + Map<String, String> testOverrideP = Maps.newLinkedHashMap(); + testOverrideP.put("kylin.job.max-concurrent-jobs", "2"); + projectService.updateProjectConfig(project, testOverrideP); + Assert.assertEquals(2, + modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project)); + try { + modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId2, buildPartitions, true, + false, 0, null, null); + } catch (Exception e) { + Assert.fail(); + } + Assert.assertEquals(6, getRunningExecutables(getProject(), modelId).size()); + + Assert.assertEquals(1, + modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), "xxxxx")); + } + }