Repository: kylin Updated Branches: refs/heads/master 4a1bc1945 -> 71fccaedb
fix ci Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/71fccaed Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/71fccaed Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/71fccaed Branch: refs/heads/master Commit: 71fccaedb2e9d5e288bef43e09050adb3b8757e7 Parents: 4a1bc19 Author: Hongbin Ma <mahong...@apache.org> Authored: Thu Apr 14 16:37:46 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Thu Apr 14 16:39:44 2016 +0800 ---------------------------------------------------------------------- .../job/impl/threadpool/DefaultScheduler.java | 3 +- .../impl/threadpool/DefaultSchedulerTest.java | 153 ++++++++++++++++++ .../kylin/cube/DictionaryManagerTest.java | 127 --------------- .../kylin/cube/ITDictionaryManagerTest.java | 127 +++++++++++++++ .../impl/threadpool/ITDefaultSchedulerTest.java | 154 ------------------- 5 files changed, 282 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/71fccaed/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 417e279..5e11041 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -182,9 +182,10 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti logger.debug("Closing zk connection"); try { shutdown(); - jobLock.unlock(); } catch (SchedulerException e) { logger.error("error shutdown scheduler", e); + } finally { + jobLock.unlock(); } } }); http://git-wip-us.apache.org/repos/asf/kylin/blob/71fccaed/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java new file mode 100644 index 0000000..df521f9 --- /dev/null +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@ -0,0 +1,153 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * / + */ + +package org.apache.kylin.job.impl.threadpool; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.kylin.job.BaseTestExecutable; +import org.apache.kylin.job.ErrorTestExecutable; +import org.apache.kylin.job.FailedTestExecutable; +import org.apache.kylin.job.SelfStopExecutable; +import org.apache.kylin.job.SucceedTestExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableState; +import org.junit.Assert; +import org.junit.Test; + +/** + */ +public class DefaultSchedulerTest extends BaseSchedulerTest { + + @Test + public void testSingleTaskJob() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + job.addTask(task1); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + } + + @Test + public void testSucceed() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new SucceedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); + } + + @Test + public void testSucceedAndFailed() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new FailedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); + } + + @Test + public void testSucceedAndError() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new ErrorTestExecutable(); + BaseTestExecutable task2 = new SucceedTestExecutable(); + job.addTask(task1); + job.addTask(task2); + jobService.addJob(job); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState()); + Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState()); + } + + @Test + public void testDiscard() throws Exception { + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SelfStopExecutable(); + job.addTask(task1); + jobService.addJob(job); + waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); + jobService.discardJob(job.getId()); + waitForJobFinish(job.getId()); + Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState()); + Thread.sleep(5000); + System.out.println(job); + } + + @SuppressWarnings("rawtypes") + @Test + public void testSchedulerPool() throws InterruptedException { + ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1); + final CountDownLatch countDownLatch = new CountDownLatch(3); + ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch.countDown(); + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS)); + assertTrue("future should still running", future.cancel(true)); + + final CountDownLatch countDownLatch2 = new CountDownLatch(3); + ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + countDownLatch2.countDown(); + throw new RuntimeException(); + } + }, 5, 5, TimeUnit.SECONDS); + assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS)); + assertFalse("future2 should has been stopped", future2.cancel(true)); + + final CountDownLatch countDownLatch3 = new CountDownLatch(3); + ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + countDownLatch3.countDown(); + throw new RuntimeException(); + } catch (Exception e) { + } + } + }, 5, 5, TimeUnit.SECONDS); + assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS)); + assertTrue("future3 should still running", future3.cancel(true)); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/71fccaed/kylin-it/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java deleted file mode 100644 index 620c850..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.cube; - -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.HashSet; -import java.util.Set; - -import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.common.util.LocalFileMetadataTestCase; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.dict.DictionaryInfo; -import org.apache.kylin.dict.DictionaryManager; -import org.apache.kylin.dict.DistinctColumnValuesProvider; -import org.apache.kylin.dimension.Dictionary; -import org.apache.kylin.engine.mr.DFSFileTable; -import org.apache.kylin.engine.mr.HadoopUtil; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.source.ReadableTable; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.collect.Sets; - -public class DictionaryManagerTest extends LocalFileMetadataTestCase { - - DictionaryManager dictMgr; - - @Before - public void setup() throws Exception { - createTestMetadata(); - } - - @After - public void after() throws Exception { - cleanupTestMetadata(); - } - - @Test - public void basic() throws Exception { - dictMgr = DictionaryManager.getInstance(getTestConfig()); - CubeDesc cubeDesc = CubeDescManager.getInstance(getTestConfig()).getCubeDesc("test_kylin_cube_without_slr_desc"); - TblColRef col = cubeDesc.findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME"); - - MockDistinctColumnValuesProvider mockupData = new MockDistinctColumnValuesProvider("A", "B", "C"); - - DictionaryInfo info1 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupData); - System.out.println(JsonUtil.writeValueAsIndentString(info1)); - - DictionaryInfo info2 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupData); - System.out.println(JsonUtil.writeValueAsIndentString(info2)); - - // test check duplicate - assertTrue(info1.getUuid() == info2.getUuid()); - assertTrue(info1 == dictMgr.getDictionaryInfo(info1.getResourcePath())); - assertTrue(info2 == dictMgr.getDictionaryInfo(info2.getResourcePath())); - assertTrue(info1.getDictionaryObject() == info2.getDictionaryObject()); - - // verify dictionary entries - @SuppressWarnings("unchecked") - Dictionary<String> dict = (Dictionary<String>) info1.getDictionaryObject(); - int id = 0; - for (String v : mockupData.set) { - assertEquals(id, dict.getIdFromValue(v, 0)); - assertEquals(v, dict.getValueFromId(id)); - id++; - } - - // test empty dictionary - MockDistinctColumnValuesProvider mockupEmpty = new MockDistinctColumnValuesProvider(); - DictionaryInfo info3 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupEmpty); - System.out.println(JsonUtil.writeValueAsIndentString(info3)); - assertEquals(0, info3.getCardinality()); - assertEquals(0, info3.getDictionaryObject().getSize()); - System.out.println(info3.getDictionaryObject().getMaxId()); - System.out.println(info3.getDictionaryObject().getMinId()); - System.out.println(info3.getDictionaryObject().getSizeOfId()); - } - - private static class MockDistinctColumnValuesProvider implements DistinctColumnValuesProvider { - - String tmpFilePath; - Set<String> set; - - public MockDistinctColumnValuesProvider(String... values) throws IOException { - File tmpFile = File.createTempFile("MockDistinctColumnValuesProvider", ".txt"); - PrintWriter out = new PrintWriter(tmpFile); - - set = Sets.newTreeSet(); - for (String value : values) { - out.println(value); - set.add(value); - } - out.close(); - - tmpFilePath = HadoopUtil.fixWindowsPath("file://" + tmpFile.getAbsolutePath()); - tmpFile.deleteOnExit(); - } - - @Override - public ReadableTable getDistinctValuesFor(TblColRef col) { - return new DFSFileTable(tmpFilePath, -1); - } - - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/71fccaed/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java new file mode 100644 index 0000000..c1f2369 --- /dev/null +++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.HashSet; +import java.util.Set; + +import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.LocalFileMetadataTestCase; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.dict.DictionaryInfo; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.dict.DistinctColumnValuesProvider; +import org.apache.kylin.dimension.Dictionary; +import org.apache.kylin.engine.mr.DFSFileTable; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.ReadableTable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class ITDictionaryManagerTest extends LocalFileMetadataTestCase { + + DictionaryManager dictMgr; + + @Before + public void setup() throws Exception { + createTestMetadata(); + } + + @After + public void after() throws Exception { + cleanupTestMetadata(); + } + + @Test + public void basic() throws Exception { + dictMgr = DictionaryManager.getInstance(getTestConfig()); + CubeDesc cubeDesc = CubeDescManager.getInstance(getTestConfig()).getCubeDesc("test_kylin_cube_without_slr_desc"); + TblColRef col = cubeDesc.findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME"); + + MockDistinctColumnValuesProvider mockupData = new MockDistinctColumnValuesProvider("A", "B", "C"); + + DictionaryInfo info1 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupData); + System.out.println(JsonUtil.writeValueAsIndentString(info1)); + + DictionaryInfo info2 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupData); + System.out.println(JsonUtil.writeValueAsIndentString(info2)); + + // test check duplicate + assertTrue(info1.getUuid() == info2.getUuid()); + assertTrue(info1 == dictMgr.getDictionaryInfo(info1.getResourcePath())); + assertTrue(info2 == dictMgr.getDictionaryInfo(info2.getResourcePath())); + assertTrue(info1.getDictionaryObject() == info2.getDictionaryObject()); + + // verify dictionary entries + @SuppressWarnings("unchecked") + Dictionary<String> dict = (Dictionary<String>) info1.getDictionaryObject(); + int id = 0; + for (String v : mockupData.set) { + assertEquals(id, dict.getIdFromValue(v, 0)); + assertEquals(v, dict.getValueFromId(id)); + id++; + } + + // test empty dictionary + MockDistinctColumnValuesProvider mockupEmpty = new MockDistinctColumnValuesProvider(); + DictionaryInfo info3 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupEmpty); + System.out.println(JsonUtil.writeValueAsIndentString(info3)); + assertEquals(0, info3.getCardinality()); + assertEquals(0, info3.getDictionaryObject().getSize()); + System.out.println(info3.getDictionaryObject().getMaxId()); + System.out.println(info3.getDictionaryObject().getMinId()); + System.out.println(info3.getDictionaryObject().getSizeOfId()); + } + + private static class MockDistinctColumnValuesProvider implements DistinctColumnValuesProvider { + + String tmpFilePath; + Set<String> set; + + public MockDistinctColumnValuesProvider(String... values) throws IOException { + File tmpFile = File.createTempFile("MockDistinctColumnValuesProvider", ".txt"); + PrintWriter out = new PrintWriter(tmpFile); + + set = Sets.newTreeSet(); + for (String value : values) { + out.println(value); + set.add(value); + } + out.close(); + + tmpFilePath = HadoopUtil.fixWindowsPath("file://" + tmpFile.getAbsolutePath()); + tmpFile.deleteOnExit(); + } + + @Override + public ReadableTable getDistinctValuesFor(TblColRef col) { + return new DFSFileTable(tmpFilePath, -1); + } + + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/71fccaed/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java deleted file mode 100644 index ad1ddd3..0000000 --- a/kylin-it/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. - * / - */ - -package org.apache.kylin.job.impl.threadpool; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.kylin.job.BaseTestExecutable; -import org.apache.kylin.job.ErrorTestExecutable; -import org.apache.kylin.job.FailedTestExecutable; -import org.apache.kylin.job.SelfStopExecutable; -import org.apache.kylin.job.SucceedTestExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.ExecutableState; -import org.junit.Assert; -import org.junit.Test; - -/** - */ -public class ITDefaultSchedulerTest extends BaseSchedulerTest { - - @Test - public void testSingleTaskJob() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SucceedTestExecutable(); - job.addTask(task1); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - } - - @Test - public void testSucceed() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SucceedTestExecutable(); - BaseTestExecutable task2 = new SucceedTestExecutable(); - job.addTask(task1); - job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState()); - } - - @Test - public void testSucceedAndFailed() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SucceedTestExecutable(); - BaseTestExecutable task2 = new FailedTestExecutable(); - job.addTask(task1); - job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); - } - - @Test - public void testSucceedAndError() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new ErrorTestExecutable(); - BaseTestExecutable task2 = new SucceedTestExecutable(); - job.addTask(task1); - job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState()); - } - - @Test - public void testDiscard() throws Exception { - DefaultChainedExecutable job = new DefaultChainedExecutable(); - BaseTestExecutable task1 = new SelfStopExecutable(); - job.addTask(task1); - jobService.addJob(job); - waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500); - jobService.discardJob(job.getId()); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState()); - Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState()); - Thread.sleep(5000); - System.out.println(job); - } - - @SuppressWarnings("rawtypes") - @Test - public void testSchedulerPool() throws InterruptedException { - ScheduledExecutorService fetchPool = Executors.newScheduledThreadPool(1); - final CountDownLatch countDownLatch = new CountDownLatch(3); - ScheduledFuture future = fetchPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - countDownLatch.countDown(); - } - }, 5, 5, TimeUnit.SECONDS); - assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(20, TimeUnit.SECONDS)); - assertTrue("future should still running", future.cancel(true)); - - final CountDownLatch countDownLatch2 = new CountDownLatch(3); - ScheduledFuture future2 = fetchPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - countDownLatch2.countDown(); - throw new RuntimeException(); - } - }, 5, 5, TimeUnit.SECONDS); - assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(20, TimeUnit.SECONDS)); - assertFalse("future2 should has been stopped", future2.cancel(true)); - - final CountDownLatch countDownLatch3 = new CountDownLatch(3); - ScheduledFuture future3 = fetchPool.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - countDownLatch3.countDown(); - throw new RuntimeException(); - } catch (Exception e) { - } - } - }, 5, 5, TimeUnit.SECONDS); - assertTrue("countDownLatch3 should reach zero in 15 secs", countDownLatch3.await(20, TimeUnit.SECONDS)); - assertTrue("future3 should still running", future3.cancel(true)); - } -}