Repository: kylin
Updated Branches:
  refs/heads/master c77183343 -> 76dc04971


KYLIN-3194 use BrokenExecutable to tolerate missing job classes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f797c840
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f797c840
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f797c840

Branch: refs/heads/master
Commit: f797c84044f52f9c351ad680bc3a8a3482781121
Parents: c771833
Author: Li Yang <liy...@apache.org>
Authored: Wed Jan 24 15:10:07 2018 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Sun Feb 4 08:44:22 2018 +0800

----------------------------------------------------------------------
 .../kylin/job/execution/AbstractExecutable.java |   2 +-
 .../kylin/job/execution/BrokenExecutable.java   |  47 +++++++
 .../kylin/job/execution/ExecutableManager.java  | 124 ++++---------------
 .../apache/kylin/job/ExecutableManagerTest.java |  16 +--
 .../d9a2b721-9916-4607-8047-148ceb2473b1        |  14 +++
 .../apache/kylin/rest/service/JobService.java   |   9 +-
 6 files changed, 102 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 1a84871..91283f0 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -259,7 +259,7 @@ public abstract class AbstractExecutable implements 
Executable, Idempotent {
     }
 
     @Override
-    public final ExecutableState getStatus() {
+    public ExecutableState getStatus() {
         ExecutableManager manager = getManager();
         return manager.getOutput(this.getId()).getState();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
new file mode 100644
index 0000000..7bb4fe2
--- /dev/null
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/BrokenExecutable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.execution;
+
+import org.apache.kylin.job.exception.ExecuteException;
+
+/**
+ * A special Executable used to indicate any executable whose metadata is 
broken.
+ */
+public class BrokenExecutable extends AbstractExecutable {
+    
+    public BrokenExecutable() {
+        super();
+    }
+
+    @Override
+    public String getName() {
+        return "[BROKEN] " + super.getName();
+    }
+    
+    @Override
+    public ExecutableState getStatus() {
+        return ExecutableState.DISCARDED;
+    }
+    
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        throw new UnsupportedOperationException();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 83582d2..0069c03 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -23,7 +23,6 @@ import static 
org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.IllegalFormatException;
 import java.util.List;
@@ -239,45 +238,6 @@ public class ExecutableManager {
         }
     }
 
-    /**
-     * Since ExecutableManager will instantiate all AbstractExecutable class 
by Class.forName(), but for each version release,
-     * new classes are introduced, old classes are deprecated, renamed or 
removed. The Class.forName() will throw out
-     * ClassNotFoundException. This API is used to retrieve the Executable 
Object list, not for calling the object method,
-     * so we could just instance the parent common class instead of the 
concrete class. It will tolerate the class missing issue.
-     *
-     * @param timeStartInMillis
-     * @param timeEndInMillis
-     * @param expectedClass
-     * @return
-     */
-    public List<AbstractExecutable> getAllAbstractExecutables(long 
timeStartInMillis, long timeEndInMillis,
-            Class<? extends AbstractExecutable> expectedClass) {
-        try {
-            List<AbstractExecutable> ret = Lists.newArrayList();
-            for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, 
timeEndInMillis)) {
-                try {
-                    AbstractExecutable ae = parseToAbstract(po, expectedClass);
-                    ret.add(ae);
-                } catch (IllegalArgumentException e) {
-                    logger.error("error parsing one executabePO: ", e);
-                }
-            }
-            return ret;
-        } catch (PersistentException e) {
-            logger.error("error get All Jobs", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public AbstractExecutable getAbstractExecutable(String uuid, Class<? 
extends AbstractExecutable> expectedClass) {
-        try {
-            return parseToAbstract(executableDao.getJob(uuid), expectedClass);
-        } catch (PersistentException e) {
-            logger.error("fail to get job:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
     public List<String> getAllJobIds() {
         try {
             return executableDao.getJobIds();
@@ -536,70 +496,40 @@ public class ExecutableManager {
             return null;
         }
         String type = executablePO.getType();
-        try {
-            Class<? extends AbstractExecutable> clazz = 
ClassUtil.forName(type, AbstractExecutable.class);
-            Constructor<? extends AbstractExecutable> constructor = 
clazz.getConstructor();
-            AbstractExecutable result = constructor.newInstance();
-            result.initConfig(config);
-            result.setId(executablePO.getUuid());
-            result.setName(executablePO.getName());
-            result.setParams(executablePO.getParams());
-            List<ExecutablePO> tasks = executablePO.getTasks();
-            if (tasks != null && !tasks.isEmpty()) {
-                Preconditions.checkArgument(result instanceof 
ChainedExecutable);
-                for (ExecutablePO subTask : tasks) {
-                    ((ChainedExecutable) result).addTask(parseTo(subTask));
-                }
+        AbstractExecutable result = newExecutable(type);
+        result.initConfig(config);
+        result.setId(executablePO.getUuid());
+        result.setName(executablePO.getName());
+        result.setParams(executablePO.getParams());
+        List<ExecutablePO> tasks = executablePO.getTasks();
+        if (tasks != null && !tasks.isEmpty()) {
+            Preconditions.checkArgument(result instanceof ChainedExecutable);
+            for (ExecutablePO subTask : tasks) {
+                ((ChainedExecutable) result).addTask(parseTo(subTask));
             }
-            List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck();
-            if (tasksForCheck != null && !tasksForCheck.isEmpty()) {
-                Preconditions.checkArgument(result instanceof 
CheckpointExecutable);
-                for (ExecutablePO subTaskForCheck : tasksForCheck) {
-                    ((CheckpointExecutable) 
result).addTaskForCheck(parseTo(subTaskForCheck));
-                }
+        }
+        List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck();
+        if (tasksForCheck != null && !tasksForCheck.isEmpty()) {
+            Preconditions.checkArgument(result instanceof 
CheckpointExecutable);
+            for (ExecutablePO subTaskForCheck : tasksForCheck) {
+                ((CheckpointExecutable) 
result).addTaskForCheck(parseTo(subTaskForCheck));
             }
-            return result;
-        } catch (ReflectiveOperationException e) {
-            throw new IllegalStateException("cannot parse this job:" + 
executablePO.getId(), e);
         }
+        return result;
     }
 
-    private AbstractExecutable parseToAbstract(ExecutablePO executablePO,
-            Class<? extends AbstractExecutable> expectedClass) {
-        if (executablePO == null) {
-            logger.warn("executablePO is null");
-            return null;
+    private AbstractExecutable newExecutable(String type) {
+        Class<? extends AbstractExecutable> clazz;
+        try {
+            clazz = ClassUtil.forName(type, AbstractExecutable.class);
+        } catch (ClassNotFoundException ex) {
+            clazz = BrokenExecutable.class;
+            logger.error("Unknown executable type '" + type + "', using 
BrokenExecutable");
         }
-        String type = executablePO.getType();
         try {
-            Class<? extends AbstractExecutable> clazz = null;
-            try {
-                clazz = ClassUtil.forName(type, AbstractExecutable.class);
-            } catch (ClassNotFoundException e) {
-                clazz = ClassUtil.forName(expectedClass.getName(), 
AbstractExecutable.class);
-            }
-            Constructor<? extends AbstractExecutable> constructor = 
clazz.getConstructor();
-            AbstractExecutable result = constructor.newInstance();
-            result.initConfig(config);
-            result.setId(executablePO.getUuid());
-            result.setName(executablePO.getName());
-            result.setParams(executablePO.getParams());
-            List<ExecutablePO> tasks = executablePO.getTasks();
-            if (tasks != null && !tasks.isEmpty()) {
-                Preconditions.checkArgument(result instanceof 
ChainedExecutable);
-                for (ExecutablePO subTask : tasks) {
-                    AbstractExecutable parseToTask = null;
-                    try {
-                        parseToTask = parseTo(subTask);
-                    } catch (IllegalStateException e) {
-                        parseToTask = parseToAbstract(subTask, 
DefaultChainedExecutable.class);
-                    }
-                    ((ChainedExecutable) result).addTask(parseToTask);
-                }
-            }
-            return result;
-        } catch (ReflectiveOperationException e) {
-            throw new IllegalStateException("cannot parse this job:" + 
executablePO.getId(), e);
+            return clazz.getConstructor().newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to instantiate " + clazz, e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java 
b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
index faea9a4..73f0410 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
@@ -47,12 +47,6 @@ public class ExecutableManagerTest extends 
LocalFileMetadataTestCase {
     public void setup() throws Exception {
         createTestMetadata();
         service = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-        for (String jobId : service.getAllJobIds()) {
-            System.out.println("deleting " + jobId);
-            service.deleteJob(jobId);
-        }
-
     }
 
     @After
@@ -63,13 +57,21 @@ public class ExecutableManagerTest extends 
LocalFileMetadataTestCase {
     @Test
     public void test() throws Exception {
         assertNotNull(service);
+        
+        // all existing are broken jobs
+        List<AbstractExecutable> existing = service.getAllExecutables();
+        for (AbstractExecutable exec : existing) {
+            assertEquals("BrokenExecutable", exec.getClass().getSimpleName());
+            assertEquals(ExecutableState.DISCARDED, exec.getStatus());
+        }
+        
         BaseTestExecutable executable = new SucceedTestExecutable();
         executable.setParam("test1", "test1");
         executable.setParam("test2", "test2");
         executable.setParam("test3", "test3");
         service.addJob(executable);
         List<AbstractExecutable> result = service.getAllExecutables();
-        assertEquals(1, result.size());
+        assertEquals(existing.size() + 1, result.size());
         AbstractExecutable another = service.getJob(executable.getId());
         assertJobEqual(executable, another);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1
 
b/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1
new file mode 100644
index 0000000..fb8ff30
--- /dev/null
+++ 
b/examples/test_case_data/localmeta/execute/d9a2b721-9916-4607-8047-148ceb2473b1
@@ -0,0 +1,14 @@
+{
+  "uuid" : "d9a2b721-9916-4607-8047-148ceb2473b1",
+  "last_modified" : 1516778161249,
+  "version" : "2.3.0",
+  "name" : null,
+  "tasks" : null,
+  "tasks_check" : null,
+  "type" : "org.apache.kylin.job.BadClassName",
+  "params" : {
+    "test2" : "test2",
+    "test3" : "test3",
+    "test1" : "test1"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/f797c840/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index e447031..5f80b84 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -725,9 +725,9 @@ public class JobService extends BasicService implements 
InitializingBean {
     public List<CubingJob> innerSearchCubingJobs(final String cubeName, final 
String jobName,
             final Set<ExecutableState> statusList, long timeStartInMillis, 
long timeEndInMillis,
             final Map<String, Output> allOutputs, final boolean 
nameExactMatch, final String projectName) {
-        List<CubingJob> results = Lists.newArrayList(FluentIterable.from(
-                
getExecutableManager().getAllAbstractExecutables(timeStartInMillis, 
timeEndInMillis, CubingJob.class))
-                .filter(new Predicate<AbstractExecutable>() {
+        List<CubingJob> results = Lists.newArrayList(
+                
FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, 
timeEndInMillis))
+                        .filter(new Predicate<AbstractExecutable>() {
                     @Override
                     public boolean apply(AbstractExecutable executable) {
                         if (executable instanceof CubingJob) {
@@ -824,8 +824,7 @@ public class JobService extends BasicService implements 
InitializingBean {
         List<CheckpointExecutable> results = Lists
                 .newArrayList(
                         FluentIterable
-                                
.from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis,
-                                        timeEndInMillis, 
CheckpointExecutable.class))
+                                
.from(getExecutableManager().getAllExecutables(timeStartInMillis, 
timeEndInMillis))
                                 .filter(new Predicate<AbstractExecutable>() {
                                     @Override
                                     public boolean apply(AbstractExecutable 
executable) {

Reply via email to