KYLIN-1726 allow job discard itself Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aff2df59 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aff2df59 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aff2df59
Branch: refs/heads/master Commit: aff2df5987e98ee9fd64d4803a8a2dea90013e40 Parents: aa30880 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Sep 13 10:28:03 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Sep 14 16:34:36 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeManager.java | 14 ++----- .../kylin/job/execution/AbstractExecutable.java | 2 + .../job/execution/DefaultChainedExecutable.java | 2 + .../kylin/job/execution/ExecuteResult.java | 4 ++ .../kylin/job/DiscardedTestExecutable.java | 41 ++++++++++++++++++++ .../impl/threadpool/DefaultSchedulerTest.java | 16 ++++++++ 6 files changed, 68 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/aff2df59/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 11eabce..d494fcc 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -322,34 +322,26 @@ public class CubeManager implements IRealizationProvider { Iterator<CubeSegment> iterator = newSegs.iterator(); while (iterator.hasNext()) { CubeSegment currentSeg = iterator.next(); - boolean found = false; for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) { if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) { + logger.info("Remove segment " + currentSeg.toString()); + toRemoveResources.add(currentSeg.getStatisticsResourcePath()); iterator.remove(); - toRemoveResources.add(toRemoveSeg.getStatisticsResourcePath()); - found = true; + break; } } - if (found == false) { - logger.error("Segment '" + currentSeg.getName() + "' doesn't exist for remove."); - } } } if (update.getToUpdateSegs() != null) { for (CubeSegment segment : update.getToUpdateSegs()) { - boolean found = false; for (int i = 0; i < newSegs.size(); i++) { if (newSegs.get(i).getUuid().equals(segment.getUuid())) { newSegs.set(i, segment); - found = true; break; } } - if (found == false) { - logger.error("Segment '" + segment.getName() + "' doesn't exist for update."); - } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/aff2df59/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 90e4d3c..b4ca469 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -74,6 +74,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent { if (!isDiscarded()) { if (result.succeed()) { executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output()); + } else if (result.discarded()) { + executableManager.updateJobOutput(getId(), ExecutableState.DISCARDED, null, result.output()); } else { executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aff2df59/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 39a5f4f..5a57b05 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -119,6 +119,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai } else { jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); } + } else if (result.discarded()) { + jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, result.output()); } else { setEndTime(System.currentTimeMillis()); jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); http://git-wip-us.apache.org/repos/asf/kylin/blob/aff2df59/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java index 760a574..2347e7d 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java @@ -49,6 +49,10 @@ public final class ExecuteResult { return state == State.SUCCEED; } + public boolean discarded() { + return state == State.DISCARDED; + } + public String output() { return output; } http://git-wip-us.apache.org/repos/asf/kylin/blob/aff2df59/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java new file mode 100644 index 0000000..9362e18 --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +/** + */ +public class DiscardedTestExecutable extends BaseTestExecutable { + + public DiscardedTestExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + return new ExecuteResult(ExecuteResult.State.DISCARDED, "discarded"); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/aff2df59/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index df521f9..2baf10a 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import org.apache.kylin.job.DiscardedTestExecutable; import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; import org.apache.kylin.job.FailedTestExecutable; @@ -83,6 +84,21 @@ public class DefaultSchedulerTest extends BaseSchedulerTest { } @Test + public void testSucceedAndDiscarded() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new DiscardedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task2.getId()).getState()); + } + + + @Test public void testSucceedAndError() throws Exception { DefaultChainedExecutable job = new DefaultChainedExecutable(); BaseTestExecutable task1 = new ErrorTestExecutable();