Repository: kylin Updated Branches: refs/heads/master c77183343 -> 76dc04971
KYLIN-3194 use BrokenExecutable to tolerate missing job classes Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f797c840 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f797c840 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f797c840 Branch: refs/heads/master Commit: f797c84044f52f9c351ad680bc3a8a3482781121 Parents: c771833 Author: Li Yang <liy...@apache.org> Authored: Wed Jan 24 15:10:07 2018 +0800 Committer: Li Yang <liy...@apache.org> Committed: Sun Feb 4 08:44:22 2018 +0800 ---------------------------------------------------------------------- .../kylin/job/execution/AbstractExecutable.java | 2 +- .../kylin/job/execution/BrokenExecutable.java | 47 +++++++ .../kylin/job/execution/ExecutableManager.java | 124 ++++--------------- .../apache/kylin/job/ExecutableManagerTest.java | 16 +-- .../d9a2b721-9916-4607-8047-148ceb2473b1 | 14 +++ .../apache/kylin/rest/service/JobService.java | 9 +- 6 files changed, 102 insertions(+), 110 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 1a84871..91283f0 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -259,7 +259,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } @Override - public final ExecutableState getStatus() { + public ExecutableState getStatus() { ExecutableManager manager = getManager(); return manager.getOutput(this.getId()).getState(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java new file mode 100644 index 0000000..7bb4fe2 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.execution; + +import org.apache.kylin.job.exception.ExecuteException; + +/** + * A special Executable used to indicate any executable whose metadata is broken. + */ +public class BrokenExecutable extends AbstractExecutable { + + public BrokenExecutable() { + super(); + } + + @Override + public String getName() { + return "[BROKEN] " + super.getName(); + } + + @Override + public ExecutableState getStatus() { + return ExecutableState.DISCARDED; + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + throw new UnsupportedOperationException(); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 83582d2..0069c03 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -23,7 +23,6 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID; import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL; import java.io.IOException; -import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.IllegalFormatException; import java.util.List; @@ -239,45 +238,6 @@ public class ExecutableManager { } } - /** - * Since ExecutableManager will instantiate all AbstractExecutable class by Class.forName(), but for each version release, - * new classes are introduced, old classes are deprecated, renamed or removed. The Class.forName() will throw out - * ClassNotFoundException. This API is used to retrieve the Executable Object list, not for calling the object method, - * so we could just instance the parent common class instead of the concrete class. It will tolerate the class missing issue. - * - * @param timeStartInMillis - * @param timeEndInMillis - * @param expectedClass - * @return - */ - public List<AbstractExecutable> getAllAbstractExecutables(long timeStartInMillis, long timeEndInMillis, - Class<? extends AbstractExecutable> expectedClass) { - try { - List<AbstractExecutable> ret = Lists.newArrayList(); - for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) { - try { - AbstractExecutable ae = parseToAbstract(po, expectedClass); - ret.add(ae); - } catch (IllegalArgumentException e) { - logger.error("error parsing one executabePO: ", e); - } - } - return ret; - } catch (PersistentException e) { - logger.error("error get All Jobs", e); - throw new RuntimeException(e); - } - } - - public AbstractExecutable getAbstractExecutable(String uuid, Class<? extends AbstractExecutable> expectedClass) { - try { - return parseToAbstract(executableDao.getJob(uuid), expectedClass); - } catch (PersistentException e) { - logger.error("fail to get job:" + uuid, e); - throw new RuntimeException(e); - } - } - public List<String> getAllJobIds() { try { return executableDao.getJobIds(); @@ -536,70 +496,40 @@ public class ExecutableManager { return null; } String type = executablePO.getType(); - try { - Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class); - Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor(); - AbstractExecutable result = constructor.newInstance(); - result.initConfig(config); - result.setId(executablePO.getUuid()); - result.setName(executablePO.getName()); - result.setParams(executablePO.getParams()); - List<ExecutablePO> tasks = executablePO.getTasks(); - if (tasks != null && !tasks.isEmpty()) { - Preconditions.checkArgument(result instanceof ChainedExecutable); - for (ExecutablePO subTask : tasks) { - ((ChainedExecutable) result).addTask(parseTo(subTask)); - } + AbstractExecutable result = newExecutable(type); + result.initConfig(config); + result.setId(executablePO.getUuid()); + result.setName(executablePO.getName()); + result.setParams(executablePO.getParams()); + List<ExecutablePO> tasks = executablePO.getTasks(); + if (tasks != null && !tasks.isEmpty()) { + Preconditions.checkArgument(result instanceof ChainedExecutable); + for (ExecutablePO subTask : tasks) { + ((ChainedExecutable) result).addTask(parseTo(subTask)); } - List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck(); - if (tasksForCheck != null && !tasksForCheck.isEmpty()) { - Preconditions.checkArgument(result instanceof CheckpointExecutable); - for (ExecutablePO subTaskForCheck : tasksForCheck) { - ((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck)); - } + } + List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck(); + if (tasksForCheck != null && !tasksForCheck.isEmpty()) { + Preconditions.checkArgument(result instanceof CheckpointExecutable); + for (ExecutablePO subTaskForCheck : tasksForCheck) { + ((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck)); } - return result; - } catch (ReflectiveOperationException e) { - throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e); } + return result; } - private AbstractExecutable parseToAbstract(ExecutablePO executablePO, - Class<? extends AbstractExecutable> expectedClass) { - if (executablePO == null) { - logger.warn("executablePO is null"); - return null; + private AbstractExecutable newExecutable(String type) { + Class<? extends AbstractExecutable> clazz; + try { + clazz = ClassUtil.forName(type, AbstractExecutable.class); + } catch (ClassNotFoundException ex) { + clazz = BrokenExecutable.class; + logger.error("Unknown executable type '" + type + "', using BrokenExecutable"); } - String type = executablePO.getType(); try { - Class<? extends AbstractExecutable> clazz = null; - try { - clazz = ClassUtil.forName(type, AbstractExecutable.class); - } catch (ClassNotFoundException e) { - clazz = ClassUtil.forName(expectedClass.getName(), AbstractExecutable.class); - } - Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor(); - AbstractExecutable result = constructor.newInstance(); - result.initConfig(config); - result.setId(executablePO.getUuid()); - result.setName(executablePO.getName()); - result.setParams(executablePO.getParams()); - List<ExecutablePO> tasks = executablePO.getTasks(); - if (tasks != null && !tasks.isEmpty()) { - Preconditions.checkArgument(result instanceof ChainedExecutable); - for (ExecutablePO subTask : tasks) { - AbstractExecutable parseToTask = null; - try { - parseToTask = parseTo(subTask); - } catch (IllegalStateException e) { - parseToTask = parseToAbstract(subTask, DefaultChainedExecutable.class); - } - ((ChainedExecutable) result).addTask(parseToTask); - } - } - return result; - } catch (ReflectiveOperationException e) { - throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e); + return clazz.getConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate " + clazz, e); } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java index faea9a4..73f0410 100644 --- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java @@ -47,12 +47,6 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase { public void setup() throws Exception { createTestMetadata(); service = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - - for (String jobId : service.getAllJobIds()) { - System.out.println("deleting " + jobId); - service.deleteJob(jobId); - } - } @After @@ -63,13 +57,21 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase { @Test public void test() throws Exception { assertNotNull(service); + + // all existing are broken jobs + List<AbstractExecutable> existing = service.getAllExecutables(); + for (AbstractExecutable exec : existing) { + assertEquals("BrokenExecutable", exec.getClass().getSimpleName()); + assertEquals(ExecutableState.DISCARDED, exec.getStatus()); + } + BaseTestExecutable executable = new SucceedTestExecutable(); executable.setParam("test1", "test1"); executable.setParam("test2", "test2"); executable.setParam("test3", "test3"); service.addJob(executable); List<AbstractExecutable> result = service.getAllExecutables(); - assertEquals(1, result.size()); + assertEquals(existing.size() + 1, result.size()); AbstractExecutable another = service.getJob(executable.getId()); assertJobEqual(executable, another); http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1 ---------------------------------------------------------------------- diff --git a/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1 b/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1 new file mode 100644 index 0000000..fb8ff30 --- /dev/null +++ b/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1 @@ -0,0 +1,14 @@ +{ + "uuid" : "d9a2b721-9916-4607-8047-148ceb2473b1", + "last_modified" : 1516778161249, + "version" : "2.3.0", + "name" : null, + "tasks" : null, + "tasks_check" : null, + "type" : "org.apache.kylin.job.BadClassName", + "params" : { + "test2" : "test2", + "test3" : "test3", + "test1" : "test1" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index e447031..5f80b84 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -725,9 +725,9 @@ public class JobService extends BasicService implements InitializingBean { public List<CubingJob> innerSearchCubingJobs(final String cubeName, final String jobName, final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) { - List<CubingJob> results = Lists.newArrayList(FluentIterable.from( - getExecutableManager().getAllAbstractExecutables(timeStartInMillis, timeEndInMillis, CubingJob.class)) - .filter(new Predicate<AbstractExecutable>() { + List<CubingJob> results = Lists.newArrayList( + FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)) + .filter(new Predicate<AbstractExecutable>() { @Override public boolean apply(AbstractExecutable executable) { if (executable instanceof CubingJob) { @@ -824,8 +824,7 @@ public class JobService extends BasicService implements InitializingBean { List<CheckpointExecutable> results = Lists .newArrayList( FluentIterable - .from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis, - timeEndInMillis, CheckpointExecutable.class)) + .from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis)) .filter(new Predicate<AbstractExecutable>() { @Override public boolean apply(AbstractExecutable executable) {