This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6cb5d889e7d3961460ba8374f0fac82a97153012 Author: sibingzhang <74443791+sibingzh...@users.noreply.github.com> AuthorDate: Fri Oct 28 15:33:09 2022 +0800 KYLIN-5363 Fix the problem of job metadata change when the number of segments in parallel build is too large from 7124 Co-authored-by: sibing.zhang <1948879...@qq.com> --- .../org/apache/kylin/job/dao/NExecutableDao.java | 25 ++++++++++++ .../kylin/job/execution/NExecutableManager.java | 17 ++++++-- .../kylin/job/execution/DagExecutableTest.java | 4 ++ .../org/apache/kylin/rest/service/JobService.java | 1 + .../kylin/rest/service/DagJobServiceTest.java | 4 ++ .../apache/kylin/rest/service/JobErrorTest.java | 4 +- .../apache/kylin/rest/service/JobServiceTest.java | 27 ++++++++++++- .../org/apache/kylin/rest/service/StageTest.java | 47 ++++++++++++++++++++++ .../kylin/engine/spark/job/NSparkExecutable.java | 5 ++- 9 files changed, 127 insertions(+), 7 deletions(-) diff --git a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java index 5c353d989c..f95779e96f 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/dao/NExecutableDao.java @@ -19,7 +19,9 @@ package org.apache.kylin.job.dao; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -61,6 +63,8 @@ public class NExecutableDao { private CachedCrudAssist<ExecutablePO> crud; + private Map<String, ExecutablePO> updating = new HashMap<>(); + private NExecutableDao(KylinConfig config, String project) { logger.trace("Using metadata url: {}", config); this.project = project; @@ -126,6 +130,27 @@ public class NExecutableDao { } } + public void updateJobWithoutSave(String uuid, Predicate<ExecutablePO> updater) { + ExecutablePO executablePO; + if (updating.containsKey(uuid)) { + executablePO = updating.get(uuid); + } else { + ExecutablePO executablePOFromCache = getJobByUuid(uuid); + Preconditions.checkNotNull(executablePOFromCache); + val copyForWrite = JsonUtil.copyBySerialization(executablePOFromCache, JOB_SERIALIZER, null); + updating.put(uuid, copyForWrite); + executablePO = copyForWrite; + } + updater.test(executablePO); + } + + public void saveUpdatedJob() { + for (ExecutablePO executablePO : updating.values()) { + crud.save(executablePO); + } + updating = new HashMap<>(); + } + private ResourceStore getStore() { return ResourceStore.getKylinMetaStore(config); } diff --git a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java index 00dfcd6a1c..a161751468 100644 --- a/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java +++ b/src/core-job/src/main/java/org/apache/kylin/job/execution/NExecutableManager.java @@ -281,6 +281,10 @@ public class NExecutableManager { } } + public void saveUpdatedJob() { + executableDao.saveUpdatedJob(); + } + public Set<String> getYarnApplicationJobs(String id) { ExecutablePO executablePO = executableDao.getJobByUuid(id); String appIds = executablePO.getOutput().getInfo().getOrDefault(YARN_APP_IDS, ""); @@ -785,6 +789,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null)); } + saveUpdatedJob(); } } }); @@ -866,6 +871,7 @@ public class NExecutableManager { .forEach(stage -> // when restart, reset stage updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.READY, null, null, true)); } + saveUpdatedJob(); } } @@ -908,6 +914,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUICIDAL, null, null)); } + saveUpdatedJob(); } } }); @@ -933,6 +940,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null)); } + saveUpdatedJob(); } } }); @@ -963,6 +971,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null)); } + saveUpdatedJob(); } } }); @@ -1002,6 +1011,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.PAUSED, null, null)); } + saveUpdatedJob(); } } }); @@ -1158,7 +1168,7 @@ public class NExecutableManager { public void updateStageStatus(String taskOrJobId, String segmentId, ExecutableState newStatus, Map<String, String> updateInfo, String failedMsg, Boolean isRestart) { val jobId = extractJobId(taskOrJobId); - executableDao.updateJob(jobId, job -> { + executableDao.updateJobWithoutSave(jobId, job -> { final List<Map<String, List<ExecutablePO>>> collect = job.getTasks().stream()// .map(ExecutablePO::getStagesMap)// .filter(MapUtils::isNotEmpty)// @@ -1253,6 +1263,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.SUCCEED, null, null)); } + saveUpdatedJob(); } } }); @@ -1278,6 +1289,7 @@ public class NExecutableManager { .forEach(stage -> // updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.ERROR, null, null)); } + saveUpdatedJob(); } } }); @@ -1371,8 +1383,7 @@ public class NExecutableManager { } val thread = scheduler.getContext().getRunningJobThread(executable); if (thread != null) { - logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", - executable.getDisplayName()); + logger.info("Interrupt Job [{}] thread and remove in ExecutableContext", executable.getDisplayName()); thread.interrupt(); scheduler.getContext().removeRunningJob(executable); } diff --git a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java index f39969755d..9357a55159 100644 --- a/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java +++ b/src/core-job/src/test/java/org/apache/kylin/job/execution/DagExecutableTest.java @@ -644,6 +644,7 @@ class DagExecutableTest { manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null); manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null); manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null); + manager.saveUpdatedJob(); await().pollDelay(Duration.ONE_SECOND).until(() -> true); manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED); manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED); @@ -652,6 +653,7 @@ class DagExecutableTest { manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null); await().pollDelay(Duration.ONE_SECOND).until(() -> true); manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null); + manager.saveUpdatedJob(); val taskDuration = task.getTaskDurationToTest(task); val expected = AbstractExecutable.getDuration(stage1.getOutput(task.getId())) @@ -687,6 +689,7 @@ class DagExecutableTest { manager.updateStageStatus(stage1.getId(), task.getId(), ExecutableState.RUNNING, null, null); manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.RUNNING, null, null); manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.RUNNING, null, null); + manager.saveUpdatedJob(); await().pollDelay(Duration.ONE_SECOND).until(() -> true); manager.updateJobOutput(job.getId(), ExecutableState.SUCCEED); manager.updateJobOutput(task.getId(), ExecutableState.SUCCEED); @@ -695,6 +698,7 @@ class DagExecutableTest { manager.updateStageStatus(stage2.getId(), task.getId(), ExecutableState.SUCCEED, null, null); await().pollDelay(Duration.ONE_SECOND).until(() -> true); manager.updateStageStatus(stage3.getId(), task.getId(), ExecutableState.SUCCEED, null, null); + manager.saveUpdatedJob(); val taskDuration = task.getTaskDurationToTest(task); val expected = task.getDuration(); diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java index 22ea2b774a..08ef663424 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -987,6 +987,7 @@ public class JobService extends BasicService implements JobSupporter { EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { val executableManager = getManager(NExecutableManager.class, project); executableManager.updateStageStatus(taskId, segmentId, newStatus, updateInfo, errMsg); + executableManager.saveUpdatedJob(); return null; }, project, UnitOfWork.DEFAULT_MAX_RETRY, UnitOfWork.DEFAULT_EPOCH_ID, jobId); } diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java index 2acffd1281..bbbed42b0b 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/DagJobServiceTest.java @@ -165,6 +165,7 @@ class DagJobServiceTest { EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { manager.updateJobOutput(task1.getId(), ExecutableState.ERROR); manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null); + manager.saveUpdatedJob(); return null; }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID); @@ -175,6 +176,7 @@ class DagJobServiceTest { EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { manager.updateJobOutput(task3.getId(), ExecutableState.ERROR); manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null); + manager.saveUpdatedJob(); return null; }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID); @@ -239,6 +241,7 @@ class DagJobServiceTest { EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { manager.updateJobOutput(task1.getId(), ExecutableState.ERROR); manager.updateStageStatus(stage11.getId(), task1.getId(), ExecutableState.ERROR, null, null); + manager.saveUpdatedJob(); return null; }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID); @@ -249,6 +252,7 @@ class DagJobServiceTest { EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { manager.updateJobOutput(task3.getId(), ExecutableState.ERROR); manager.updateStageStatus(stage31.getId(), task3.getId(), ExecutableState.ERROR, null, null); + manager.saveUpdatedJob(); return null; }, DEFAULT_PROJECT, 1, UnitOfWork.DEFAULT_EPOCH_ID); diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java index d08373b613..c9c52fc563 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobErrorTest.java @@ -387,7 +387,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase { manager.addJob(executable); var output = manager.getOutput(executable.getId()); - final long[] duration = {AbstractExecutable.getDuration(output)}; + final long[] duration = { AbstractExecutable.getDuration(output) }; Assert.assertEquals(0, duration[0]); ((DefaultOutput) output).setStartTime(System.currentTimeMillis()); @@ -442,6 +442,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null); manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null); manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null); + manager.saveUpdatedJob(); val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum(); @@ -487,6 +488,7 @@ public class JobErrorTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.RUNNING, null, null); manager.updateStageStatus(logicStep2.getId(), null, ExecutableState.RUNNING, null, null); manager.updateStageStatus(logicStep3.getId(), null, ExecutableState.RUNNING, null, null); + manager.saveUpdatedJob(); val durationWithoutWaiteTime = executable.getDurationFromStepOrStageDurationSum(); diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java index e29d23a875..ee6db5a9be 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/JobServiceTest.java @@ -578,6 +578,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output"); manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); @@ -587,6 +588,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); @@ -596,6 +598,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Map<String, String> info = Maps.newHashMap(); info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -603,6 +606,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -610,6 +614,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -617,18 +622,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); assertTrue(1 == successLogicStep); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); assertTrue(1 == successLogicStep); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -665,6 +673,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output"); manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); @@ -674,6 +683,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); @@ -683,6 +693,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Map<String, String> info = Maps.newHashMap(); info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -690,6 +701,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -697,6 +709,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -704,18 +717,21 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); assertTrue(0.5 == successLogicStep); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); assertTrue(0.5 == successLogicStep); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap(); successLogicStep = ExecutableResponse.calculateSuccessStageInTaskMap(sparkExecutable, buildSteps); @@ -751,6 +767,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SKIP, null, "test output"); manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); var buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); @@ -760,6 +777,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); manager.updateStageStatus(logicStep3.getId(), segmentId, ExecutableState.ERROR, null, "test output", true); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); @@ -769,6 +787,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { Map<String, String> info = Maps.newHashMap(); info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "1"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true); @@ -778,6 +797,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "8"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true); @@ -787,6 +807,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "10"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true); @@ -796,6 +817,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { info.put(NBatchConstants.P_INDEX_SUCCESS_COUNT, "12"); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, info, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true); @@ -804,6 +826,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { assertTrue(1 == successLogicStep); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true); @@ -812,6 +835,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { assertTrue(1 == successLogicStep); manager.updateStageStatus(logicStep2.getId(), segmentId, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); buildSteps = ((ChainedStageExecutable) ((ChainedExecutable) manager.getJob(executable.getId())).getTasks() .get(0)).getStagesMap().get(segmentId); successLogicStep = ExecutableResponse.calculateSuccessStage(sparkExecutable, segmentId, buildSteps, true); @@ -863,6 +887,7 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { assertEquals(logicStep.getId(), logicStepBase.getId()); manager.updateStageStatus(logicStep.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); + manager.saveUpdatedJob(); List<ExecutableStepResponse> jobDetail = jobService.getJobDetail(project, executable.getId()); assertEquals(1, jobDetail.size()); @@ -883,8 +908,8 @@ public class JobServiceTest extends NLocalFileMetadataTestCase { assertTrue(logicStepResponse2.getExecStartTime() < System.currentTimeMillis()); manager.updateStageStatus(logicStep.getId(), segmentId2, ExecutableState.RUNNING, null, "test output"); - manager.updateStageStatus(logicStep.getId(), null, ExecutableState.SUCCEED, null, "test output"); + manager.saveUpdatedJob(); jobDetail = jobService.getJobDetail(project, executable.getId()); assertEquals(1, jobDetail.size()); diff --git a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java index c030785854..bf63baa1e2 100644 --- a/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java +++ b/src/job-service/src/test/java/org/apache/kylin/rest/service/StageTest.java @@ -18,6 +18,7 @@ package org.apache.kylin.rest.service; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.kylin.engine.spark.job.step.NStageForBuild; @@ -60,7 +61,9 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.test.util.ReflectionTestUtils; import java.io.IOException; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -204,6 +207,7 @@ public class StageTest extends NLocalFileMetadataTestCase { manager.addJob(executable); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false); + manager.saveUpdatedJob(); manager.updateStagePaused(executable); @@ -256,6 +260,7 @@ public class StageTest extends NLocalFileMetadataTestCase { manager.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, null, null); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.RUNNING, null, null, false); + manager.saveUpdatedJob(); manager.updateJobOutput(executable.getId(), ExecutableState.SUCCEED, null, null, null); manager.makeStageSuccess(sparkExecutable.getId()); @@ -295,12 +300,14 @@ public class StageTest extends NLocalFileMetadataTestCase { manager.addJob(executable); manager.updateStageStatus(logicStep1.getId(), segmentId, ExecutableState.SUCCEED, null, errMsg); + manager.saveUpdatedJob(); var output1 = manager.getOutput(logicStep1.getId(), segmentId); Assert.assertEquals(ExecutableState.SUCCEED, output1.getState()); Assert.assertEquals(output1.getShortErrMsg(), errMsg); Assert.assertTrue(MapUtils.isEmpty(output1.getExtra())); manager.updateStageStatus(logicStep1.getId(), null, ExecutableState.ERROR, null, errMsg); + manager.saveUpdatedJob(); output1 = manager.getOutput(logicStep1.getId(), segmentId); Assert.assertEquals(ExecutableState.SUCCEED, output1.getState()); Assert.assertEquals(output1.getShortErrMsg(), errMsg); @@ -316,6 +323,45 @@ public class StageTest extends NLocalFileMetadataTestCase { Assert.assertTrue(MapUtils.isEmpty(outputLogicStep2.getExtra())); } + @Test + public void testUpdateStageStatusNoSaveCache() { + val segmentId = RandomUtil.randomUUIDStr(); + val segmentId2 = RandomUtil.randomUUIDStr(); + + val manager = NExecutableManager.getInstance(jobService.getConfig(), getProject()); + val executable = new SucceedChainedTestExecutable(); + + executable.setId(RandomUtil.randomUUIDStr()); + + val sparkExecutable = new NSparkExecutable(); + sparkExecutable.setParam(NBatchConstants.P_SEGMENT_IDS, segmentId + "," + segmentId2); + sparkExecutable.setId(RandomUtil.randomUUIDStr()); + executable.addTask(sparkExecutable); + + val build1 = new NStageForBuild(); + val build2 = new NStageForBuild(); + val build3 = new NStageForBuild(); + sparkExecutable.addStage(build1); + sparkExecutable.addStage(build2); + sparkExecutable.addStage(build3); + sparkExecutable.setStageMap(); + + manager.addJob(executable); + + List<AbstractExecutable> tasks = executable.getTasks(); + tasks.forEach(task -> { + final Map<String, List<StageBase>> tasksMap = ((ChainedStageExecutable) task).getStagesMap(); + for (Map.Entry<String, List<StageBase>> entry : tasksMap.entrySet()) { + Optional.ofNullable(entry.getValue()).orElse(Lists.newArrayList())// + .forEach(stage -> // + manager.updateStageStatus(stage.getId(), entry.getKey(), ExecutableState.DISCARDED, null, null)); + } + manager.saveUpdatedJob(); + }); + + Assert.assertEquals(1, manager.getAllJobs().get(0).getMvcc()); + } + @Test public void testSetStageOutput() { NExecutableManager manager = NExecutableManager.getInstance(jobService.getConfig(), getProject()); @@ -445,6 +491,7 @@ public class StageTest extends NLocalFileMetadataTestCase { manager.updateStageStatus(stage1.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); manager.updateStageStatus(stage2.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); manager.updateStageStatus(stage3.getId(), segmentId, ExecutableState.RUNNING, null, "test output"); + manager.saveUpdatedJob(); manager.makeStageError(executable.getId()); var job = manager.getJob(executable.getId()); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 5517f3b2c4..a842c926ca 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -206,8 +206,9 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage public void waiteForResourceStart(ExecutableContext context) { // mark waiteForResource stage start EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { - getExecutableManager(getProject()) // - .updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null); + NExecutableManager manager = getExecutableManager(getProject()); + manager.updateStageStatus(getId() + "_00", null, ExecutableState.RUNNING, null, null); + manager.saveUpdatedJob(); return 0; }, project, UnitOfWork.DEFAULT_MAX_RETRY, context.getEpochId(), getTempLockName()); }