KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9615b4ea Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9615b4ea Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9615b4ea Branch: refs/heads/KYLIN-2006 Commit: 9615b4ea6f817606c93df73dcafdcb151f4e8632 Parents: 637581f Author: Li Yang <liy...@apache.org> Authored: Tue Nov 8 15:47:33 2016 +0800 Committer: Li Yang <liy...@apache.org> Committed: Tue Nov 8 17:02:38 2016 +0800 ---------------------------------------------------------------------- .../kylin/job/common/ShellExecutable.java | 2 +- .../kylin/job/execution/AbstractExecutable.java | 37 +- .../job/execution/DefaultChainedExecutable.java | 1 - .../kylin/job/execution/ExecutableManager.java | 372 ++++++++++++++++++ .../job/impl/threadpool/DefaultScheduler.java | 2 +- .../kylin/job/manager/ExecutableManager.java | 377 ------------------- .../apache/kylin/job/ExecutableManagerTest.java | 2 +- .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- .../org/apache/kylin/engine/mr/CubingJob.java | 2 +- .../engine/mr/common/MapReduceExecutable.java | 16 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +- .../kylin/engine/mr/steps/InMemCuboidJob.java | 2 +- .../engine/mr/steps/SaveStatisticsStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 2 +- .../kylin/provision/BuildCubeWithEngine.java | 2 +- .../kylin/provision/BuildCubeWithStream.java | 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../storage/hbase/util/StorageCleanupJob.java | 2 +- .../apache/kylin/tool/JobInstanceExtractor.java | 2 +- .../apache/kylin/tool/StorageCleanupJob.java | 2 +- 21 files changed, 419 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java index 111c1ba..a68f242 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java @@ -52,7 +52,7 @@ public class ShellExecutable extends AbstractExecutable { logger.info("executing:" + getCmd()); final ShellExecutableLogger logger = new ShellExecutableLogger(); final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger); - executableManager.addJobInfo(getId(), logger.getInfo()); + getManager().addJobInfo(getId(), logger.getInfo()); return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond()); } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 90e4d3c..f7b8a7c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -32,7 +32,6 @@ import org.apache.kylin.common.util.MailService; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.PersistentException; import org.apache.kylin.job.impl.threadpool.DefaultContext; -import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,43 +52,51 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); protected int retry = 0; + private KylinConfig config; private String name; private String id; private Map<String, String> params = Maps.newHashMap(); - protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - public AbstractExecutable() { setId(UUID.randomUUID().toString()); } + + void initConfig(KylinConfig config) { + Preconditions.checkState(this.config == null || this.config == config); + this.config = config; + } + + protected ExecutableManager getManager() { + return ExecutableManager.getInstance(config); + } protected void onExecuteStart(ExecutableContext executableContext) { Map<String, String> info = Maps.newHashMap(); info.put(START_TIME, Long.toString(System.currentTimeMillis())); - executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, info, null); + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null); } protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) { setEndTime(System.currentTimeMillis()); if (!isDiscarded()) { if (result.succeed()) { - executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output()); + getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output()); } else { - executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); + getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output()); } } } protected void onExecuteError(Throwable exception, ExecutableContext executableContext) { if (!isDiscarded()) { - executableManager.addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis())); + getManager().addJobInfo(getId(), END_TIME, Long.toString(System.currentTimeMillis())); String output = null; if (exception != null) { final StringWriter out = new StringWriter(); exception.printStackTrace(new PrintWriter(out)); output = out.toString(); } - executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, output); + getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output); } } @@ -190,7 +197,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { @Override public final ExecutableState getStatus() { - return executableManager.getOutput(this.getId()).getState(); + return getManager().getOutput(this.getId()).getState(); } @Override @@ -211,7 +218,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } public final long getLastModified() { - return executableManager.getOutput(getId()).getLastModified(); + return getOutput().getLastModified(); } public final void setSubmitter(String submitter) { @@ -298,11 +305,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent { @Override public final Output getOutput() { - return executableManager.getOutput(getId()); + return getManager().getOutput(getId()); } protected long getExtraInfoAsLong(String key, long defaultValue) { - return getExtraInfoAsLong(executableManager.getOutput(getId()), key, defaultValue); + return getExtraInfoAsLong(getOutput(), key, defaultValue); } public static long getStartTime(Output output) { @@ -334,11 +341,11 @@ public abstract class AbstractExecutable implements Executable, Idempotent { } protected final void addExtraInfo(String key, String value) { - executableManager.addJobInfo(getId(), key, value); + getManager().addJobInfo(getId(), key, value); } protected final Map<String, String> getExtraInfo() { - return executableManager.getOutput(getId()).getExtra(); + return getOutput().getExtra(); } public final void setStartTime(long time) { @@ -366,7 +373,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { * * */ protected final boolean isDiscarded() { - final ExecutableState status = executableManager.getOutput(getId()).getState(); + final ExecutableState status = getOutput().getState(); return status == ExecutableState.DISCARDED; } http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 39a5f4f..edc8189 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.exception.ExecuteException; -import org.apache.kylin.job.manager.ExecutableManager; import com.google.common.collect.Lists; import com.google.common.collect.Maps; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java new file mode 100644 index 0000000..0901443 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.execution; + +import java.lang.reflect.Constructor; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutableOutputPO; +import org.apache.kylin.job.dao.ExecutablePO; +import org.apache.kylin.job.exception.IllegalStateTranferException; +import org.apache.kylin.job.exception.PersistentException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + */ +public class ExecutableManager { + + private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); + private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>(); + + private final KylinConfig config; + private final ExecutableDao executableDao; + + public static ExecutableManager getInstance(KylinConfig config) { + ExecutableManager r = CACHE.get(config); + if (r == null) { + synchronized (ExecutableManager.class) { + r = CACHE.get(config); + if (r == null) { + r = new ExecutableManager(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + } + } + } + return r; + } + + private ExecutableManager(KylinConfig config) { + logger.info("Using metadata url: " + config); + this.config = config; + this.executableDao = ExecutableDao.getInstance(config); + } + + public void addJob(AbstractExecutable executable) { + try { + executable.initConfig(config); + executableDao.addJob(parse(executable)); + addJobOutput(executable); + } catch (PersistentException e) { + logger.error("fail to submit job:" + executable.getId(), e); + throw new RuntimeException(e); + } + } + + private void addJobOutput(AbstractExecutable executable) throws PersistentException { + ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); + executableOutputPO.setUuid(executable.getId()); + executableDao.addJobOutput(executableOutputPO); + if (executable instanceof DefaultChainedExecutable) { + for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) { + addJobOutput(subTask); + } + } + } + + //for ut + public void deleteJob(String jobId) { + try { + executableDao.deleteJob(jobId); + } catch (PersistentException e) { + logger.error("fail to delete job:" + jobId, e); + throw new RuntimeException(e); + } + } + + public AbstractExecutable getJob(String uuid) { + try { + return parseTo(executableDao.getJob(uuid)); + } catch (PersistentException e) { + logger.error("fail to get job:" + uuid, e); + throw new RuntimeException(e); + } + } + + public Output getOutput(String uuid) { + try { + final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid); + Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid); + return parseOutput(jobOutput); + } catch (PersistentException e) { + logger.error("fail to get job output:" + uuid, e); + throw new RuntimeException(e); + } + } + + private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { + final DefaultOutput result = new DefaultOutput(); + result.setExtra(jobOutput.getInfo()); + result.setState(ExecutableState.valueOf(jobOutput.getStatus())); + result.setVerboseMsg(jobOutput.getContent()); + result.setLastModified(jobOutput.getLastModified()); + return result; + } + + public Map<String, Output> getAllOutputs() { + try { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); + HashMap<String, Output> result = Maps.newHashMap(); + for (ExecutableOutputPO jobOutput : jobOutputs) { + result.put(jobOutput.getId(), parseOutput(jobOutput)); + } + return result; + } catch (PersistentException e) { + logger.error("fail to get all job output:", e); + throw new RuntimeException(e); + } + } + + public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) { + try { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis); + HashMap<String, Output> result = Maps.newHashMap(); + for (ExecutableOutputPO jobOutput : jobOutputs) { + result.put(jobOutput.getId(), parseOutput(jobOutput)); + } + return result; + } catch (PersistentException e) { + logger.error("fail to get all job output:", e); + throw new RuntimeException(e); + } + } + + public List<AbstractExecutable> getAllExecutables() { + try { + List<AbstractExecutable> ret = Lists.newArrayList(); + for (ExecutablePO po : executableDao.getJobs()) { + try { + AbstractExecutable ae = parseTo(po); + ret.add(ae); + } catch (IllegalArgumentException e) { + logger.error("error parsing one executabePO: ", e); + } + } + return ret; + } catch (PersistentException e) { + logger.error("error get All Jobs", e); + throw new RuntimeException(e); + } + } + + public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) { + try { + List<AbstractExecutable> ret = Lists.newArrayList(); + for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) { + try { + AbstractExecutable ae = parseTo(po); + ret.add(ae); + } catch (IllegalArgumentException e) { + logger.error("error parsing one executabePO: ", e); + } + } + return ret; + } catch (PersistentException e) { + logger.error("error get All Jobs", e); + throw new RuntimeException(e); + } + } + + public List<String> getAllJobIds() { + try { + return executableDao.getJobIds(); + } catch (PersistentException e) { + logger.error("error get All Job Ids", e); + throw new RuntimeException(e); + } + } + + public void updateAllRunningJobsToError() { + try { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); + for (ExecutableOutputPO executableOutputPO : jobOutputs) { + if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { + executableOutputPO.setStatus(ExecutableState.ERROR.toString()); + executableDao.updateJobOutput(executableOutputPO); + } + } + } catch (PersistentException e) { + logger.error("error reset job status from RUNNING to ERROR", e); + throw new RuntimeException(e); + } + } + + public void resumeAllRunningJobs() { + try { + final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); + for (ExecutableOutputPO executableOutputPO : jobOutputs) { + if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { + executableOutputPO.setStatus(ExecutableState.READY.toString()); + executableDao.updateJobOutput(executableOutputPO); + } + } + } catch (PersistentException e) { + logger.error("error reset job status from RUNNING to READY", e); + throw new RuntimeException(e); + } + } + + public void resumeJob(String jobId) { + AbstractExecutable job = getJob(jobId); + if (job == null) { + return; + } + if (job instanceof DefaultChainedExecutable) { + List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); + for (AbstractExecutable task : tasks) { + if (task.getStatus() == ExecutableState.ERROR) { + updateJobOutput(task.getId(), ExecutableState.READY, null, null); + break; + } + } + } + updateJobOutput(jobId, ExecutableState.READY, null, null); + } + + public void discardJob(String jobId) { + AbstractExecutable job = getJob(jobId); + if (job instanceof DefaultChainedExecutable) { + List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); + for (AbstractExecutable task : tasks) { + if (!task.getStatus().isFinalState()) { + updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null); + } + } + } + updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); + } + + public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) { + try { + final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); + Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId); + ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus()); + if (newStatus != null && oldStatus != newStatus) { + if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) { + throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId); + } + jobOutput.setStatus(newStatus.toString()); + } + if (info != null) { + jobOutput.setInfo(info); + } + if (output != null) { + jobOutput.setContent(output); + } + executableDao.updateJobOutput(jobOutput); + logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); + } catch (PersistentException e) { + logger.error("error change job:" + jobId + " to " + newStatus.toString()); + throw new RuntimeException(e); + } + } + + //for migration only + //TODO delete when migration finished + public void resetJobOutput(String jobId, ExecutableState state, String output) { + try { + final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); + jobOutput.setStatus(state.toString()); + if (output != null) { + jobOutput.setContent(output); + } + executableDao.updateJobOutput(jobOutput); + } catch (PersistentException e) { + throw new RuntimeException(e); + } + } + + public void addJobInfo(String id, Map<String, String> info) { + if (info == null) { + return; + } + try { + ExecutableOutputPO output = executableDao.getJobOutput(id); + Preconditions.checkArgument(output != null, "there is no related output for job id:" + id); + output.getInfo().putAll(info); + executableDao.updateJobOutput(output); + } catch (PersistentException e) { + logger.error("error update job info, id:" + id + " info:" + info.toString()); + throw new RuntimeException(e); + } + } + + public void addJobInfo(String id, String key, String value) { + Map<String, String> info = Maps.newHashMap(); + info.put(key, value); + addJobInfo(id, info); + } + + private static ExecutablePO parse(AbstractExecutable executable) { + ExecutablePO result = new ExecutablePO(); + result.setName(executable.getName()); + result.setUuid(executable.getId()); + result.setType(executable.getClass().getName()); + result.setParams(executable.getParams()); + if (executable instanceof ChainedExecutable) { + List<ExecutablePO> tasks = Lists.newArrayList(); + for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) { + tasks.add(parse(task)); + } + result.setTasks(tasks); + } + return result; + } + + private AbstractExecutable parseTo(ExecutablePO executablePO) { + if (executablePO == null) { + logger.warn("executablePO is null"); + return null; + } + String type = executablePO.getType(); + try { + Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class); + Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor(); + AbstractExecutable result = constructor.newInstance(); + result.initConfig(config); + result.setId(executablePO.getUuid()); + result.setName(executablePO.getName()); + result.setParams(executablePO.getParams()); + List<ExecutablePO> tasks = executablePO.getTasks(); + if (tasks != null && !tasks.isEmpty()) { + Preconditions.checkArgument(result instanceof ChainedExecutable); + for (ExecutablePO subTask : tasks) { + ((ChainedExecutable) result).addTask(parseTo(subTask)); + } + } + return result; + } catch (ReflectiveOperationException e) { + throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java index 1ea3be0..9d5f7ba 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java @@ -36,10 +36,10 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.JobLock; -import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java deleted file mode 100644 index d42b924..0000000 --- a/core-job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.manager; - -import java.lang.reflect.Constructor; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.ClassUtil; -import org.apache.kylin.job.dao.ExecutableDao; -import org.apache.kylin.job.dao.ExecutableOutputPO; -import org.apache.kylin.job.dao.ExecutablePO; -import org.apache.kylin.job.exception.IllegalStateTranferException; -import org.apache.kylin.job.exception.PersistentException; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.job.execution.ChainedExecutable; -import org.apache.kylin.job.execution.DefaultChainedExecutable; -import org.apache.kylin.job.execution.DefaultOutput; -import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.execution.Output; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - */ -public class ExecutableManager { - - private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class); - private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>(); - @SuppressWarnings("unused") - private final KylinConfig config; - - private ExecutableDao executableDao; - - public static ExecutableManager getInstance(KylinConfig config) { - ExecutableManager r = CACHE.get(config); - if (r == null) { - synchronized (ExecutableManager.class) { - r = CACHE.get(config); - if (r == null) { - r = new ExecutableManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; - } - - private ExecutableManager(KylinConfig config) { - logger.info("Using metadata url: " + config); - this.config = config; - this.executableDao = ExecutableDao.getInstance(config); - } - - public void addJob(AbstractExecutable executable) { - try { - executableDao.addJob(parse(executable)); - addJobOutput(executable); - } catch (PersistentException e) { - logger.error("fail to submit job:" + executable.getId(), e); - throw new RuntimeException(e); - } - } - - private void addJobOutput(AbstractExecutable executable) throws PersistentException { - ExecutableOutputPO executableOutputPO = new ExecutableOutputPO(); - executableOutputPO.setUuid(executable.getId()); - executableDao.addJobOutput(executableOutputPO); - if (executable instanceof DefaultChainedExecutable) { - for (AbstractExecutable subTask : ((DefaultChainedExecutable) executable).getTasks()) { - addJobOutput(subTask); - } - } - } - - //for ut - public void deleteJob(String jobId) { - try { - executableDao.deleteJob(jobId); - } catch (PersistentException e) { - logger.error("fail to delete job:" + jobId, e); - throw new RuntimeException(e); - } - } - - public AbstractExecutable getJob(String uuid) { - try { - return parseTo(executableDao.getJob(uuid)); - } catch (PersistentException e) { - logger.error("fail to get job:" + uuid, e); - throw new RuntimeException(e); - } - } - - public Output getOutput(String uuid) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid); - return parseOutput(jobOutput); - } catch (PersistentException e) { - logger.error("fail to get job output:" + uuid, e); - throw new RuntimeException(e); - } - } - - private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) { - final DefaultOutput result = new DefaultOutput(); - result.setExtra(jobOutput.getInfo()); - result.setState(ExecutableState.valueOf(jobOutput.getStatus())); - result.setVerboseMsg(jobOutput.getContent()); - result.setLastModified(jobOutput.getLastModified()); - return result; - } - - public Map<String, Output> getAllOutputs() { - try { - final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); - HashMap<String, Output> result = Maps.newHashMap(); - for (ExecutableOutputPO jobOutput : jobOutputs) { - result.put(jobOutput.getId(), parseOutput(jobOutput)); - } - return result; - } catch (PersistentException e) { - logger.error("fail to get all job output:", e); - throw new RuntimeException(e); - } - } - - public Map<String, Output> getAllOutputs(long timeStartInMillis, long timeEndInMillis) { - try { - final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(timeStartInMillis, timeEndInMillis); - HashMap<String, Output> result = Maps.newHashMap(); - for (ExecutableOutputPO jobOutput : jobOutputs) { - result.put(jobOutput.getId(), parseOutput(jobOutput)); - } - return result; - } catch (PersistentException e) { - logger.error("fail to get all job output:", e); - throw new RuntimeException(e); - } - } - - public List<AbstractExecutable> getAllExecutables() { - try { - List<AbstractExecutable> ret = Lists.newArrayList(); - for (ExecutablePO po : executableDao.getJobs()) { - try { - AbstractExecutable ae = parseTo(po); - ret.add(ae); - } catch (IllegalArgumentException e) { - logger.error("error parsing one executabePO: ", e); - } - } - return ret; - } catch (PersistentException e) { - logger.error("error get All Jobs", e); - throw new RuntimeException(e); - } - } - - public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long timeEndInMillis) { - try { - List<AbstractExecutable> ret = Lists.newArrayList(); - for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) { - try { - AbstractExecutable ae = parseTo(po); - ret.add(ae); - } catch (IllegalArgumentException e) { - logger.error("error parsing one executabePO: ", e); - } - } - return ret; - } catch (PersistentException e) { - logger.error("error get All Jobs", e); - throw new RuntimeException(e); - } - } - - public List<String> getAllJobIds() { - try { - return executableDao.getJobIds(); - } catch (PersistentException e) { - logger.error("error get All Job Ids", e); - throw new RuntimeException(e); - } - } - - public void updateAllRunningJobsToError() { - try { - final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); - for (ExecutableOutputPO executableOutputPO : jobOutputs) { - if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { - executableOutputPO.setStatus(ExecutableState.ERROR.toString()); - executableDao.updateJobOutput(executableOutputPO); - } - } - } catch (PersistentException e) { - logger.error("error reset job status from RUNNING to ERROR", e); - throw new RuntimeException(e); - } - } - - public void resumeAllRunningJobs() { - try { - final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs(); - for (ExecutableOutputPO executableOutputPO : jobOutputs) { - if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) { - executableOutputPO.setStatus(ExecutableState.READY.toString()); - executableDao.updateJobOutput(executableOutputPO); - } - } - } catch (PersistentException e) { - logger.error("error reset job status from RUNNING to READY", e); - throw new RuntimeException(e); - } - } - - public void resumeJob(String jobId) { - AbstractExecutable job = getJob(jobId); - if (job == null) { - return; - } - if (job instanceof DefaultChainedExecutable) { - List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); - for (AbstractExecutable task : tasks) { - if (task.getStatus() == ExecutableState.ERROR) { - updateJobOutput(task.getId(), ExecutableState.READY, null, null); - break; - } - } - } - updateJobOutput(jobId, ExecutableState.READY, null, null); - } - - public void discardJob(String jobId) { - AbstractExecutable job = getJob(jobId); - if (job instanceof DefaultChainedExecutable) { - List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks(); - for (AbstractExecutable task : tasks) { - if (!task.getStatus().isFinalState()) { - updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null); - } - } - } - updateJobOutput(jobId, ExecutableState.DISCARDED, null, null); - } - - public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId); - ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus()); - if (newStatus != null && oldStatus != newStatus) { - if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) { - throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus + ", job id: " + jobId); - } - jobOutput.setStatus(newStatus.toString()); - } - if (info != null) { - jobOutput.setInfo(info); - } - if (output != null) { - jobOutput.setContent(output); - } - executableDao.updateJobOutput(jobOutput); - logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus); - } catch (PersistentException e) { - logger.error("error change job:" + jobId + " to " + newStatus.toString()); - throw new RuntimeException(e); - } - } - - //for migration only - //TODO delete when migration finished - public void resetJobOutput(String jobId, ExecutableState state, String output) { - try { - final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId); - jobOutput.setStatus(state.toString()); - if (output != null) { - jobOutput.setContent(output); - } - executableDao.updateJobOutput(jobOutput); - } catch (PersistentException e) { - throw new RuntimeException(e); - } - } - - public void addJobInfo(String id, Map<String, String> info) { - if (info == null) { - return; - } - try { - ExecutableOutputPO output = executableDao.getJobOutput(id); - Preconditions.checkArgument(output != null, "there is no related output for job id:" + id); - output.getInfo().putAll(info); - executableDao.updateJobOutput(output); - } catch (PersistentException e) { - logger.error("error update job info, id:" + id + " info:" + info.toString()); - throw new RuntimeException(e); - } - } - - public void addJobInfo(String id, String key, String value) { - Map<String, String> info = Maps.newHashMap(); - info.put(key, value); - addJobInfo(id, info); - } - - private static ExecutablePO parse(AbstractExecutable executable) { - ExecutablePO result = new ExecutablePO(); - result.setName(executable.getName()); - result.setUuid(executable.getId()); - result.setType(executable.getClass().getName()); - result.setParams(executable.getParams()); - if (executable instanceof ChainedExecutable) { - List<ExecutablePO> tasks = Lists.newArrayList(); - for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) { - tasks.add(parse(task)); - } - result.setTasks(tasks); - } - return result; - } - - private static AbstractExecutable parseTo(ExecutablePO executablePO) { - if (executablePO == null) { - logger.warn("executablePO is null"); - return null; - } - String type = executablePO.getType(); - try { - Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class); - Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor(); - AbstractExecutable result = constructor.newInstance(); - result.setId(executablePO.getUuid()); - result.setName(executablePO.getName()); - result.setParams(executablePO.getParams()); - List<ExecutablePO> tasks = executablePO.getTasks(); - if (tasks != null && !tasks.isEmpty()) { - Preconditions.checkArgument(result instanceof ChainedExecutable); - for (ExecutablePO subTask : tasks) { - ((ChainedExecutable) result).addTask(parseTo(subTask)); - } - } - return result; - } catch (ReflectiveOperationException e) { - throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java index 1eed361..2868f08 100644 --- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java @@ -31,8 +31,8 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ChainedExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.Executable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.manager.ExecutableManager; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java index 97c9f8d..fdf5252 100644 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java @@ -26,9 +26,9 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.lock.MockJobLock; -import org.apache.kylin.job.manager.ExecutableManager; import org.junit.After; import org.junit.Before; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 1a0113d..bce0433 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -277,7 +277,7 @@ public class CubingJob extends DefaultChainedExecutable { } for (AbstractExecutable child : tasks) { - Output output = executableManager.getOutput(child.getId()); + Output output = getManager().getOutput(child.getId()); String value = output.getExtra().get(key); if (value != null) return value; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 7ccd524..a26d4ff 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -38,6 +38,7 @@ import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.job.execution.Output; @@ -63,11 +64,11 @@ public class MapReduceExecutable extends AbstractExecutable { @Override protected void onExecuteStart(ExecutableContext executableContext) { - final Output output = executableManager.getOutput(getId()); + final Output output = getOutput(); if (output.getExtra().containsKey(START_TIME)) { final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID); if (mrJobId == null) { - executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); return; } try { @@ -77,7 +78,7 @@ public class MapReduceExecutable extends AbstractExecutable { //remove previous mr job info super.onExecuteStart(executableContext); } else { - executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); + getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null); } } catch (IOException e) { logger.warn("error get hadoop status"); @@ -99,7 +100,8 @@ public class MapReduceExecutable extends AbstractExecutable { Preconditions.checkNotNull(params); try { Job job; - final Map<String, String> extra = executableManager.getOutput(getId()).getExtra(); + ExecutableManager mgr = getManager(); + final Map<String, String> extra = mgr.getOutput(getId()).getExtra(); if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) { Configuration conf = HadoopUtil.getCurrentConfiguration(); job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID))); @@ -149,19 +151,19 @@ public class MapReduceExecutable extends AbstractExecutable { JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output); if (status == JobStepStatusEnum.KILLED) { - executableManager.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin"); + mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin"); return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin"); } if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) { final long waitTime = System.currentTimeMillis() - getStartTime(); setMapReduceWaitTime(waitTime); } - executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo()); + mgr.addJobInfo(getId(), hadoopCmdOutput.getInfo()); status = newStatus; if (status.isComplete()) { final Map<String, String> info = hadoopCmdOutput.getInfo(); readCounters(hadoopCmdOutput, info); - executableManager.addJobInfo(getId(), info); + mgr.addJobInfo(getId(), info); if (status == JobStepStatusEnum.FINISHED) { return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java index 6b0c86e..9edc82e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java @@ -43,7 +43,7 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.JobException; -import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java index 013f2c9..a5ea1e9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java @@ -40,7 +40,7 @@ import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.CubeStatsReader; -import org.apache.kylin.job.manager.ExecutableManager; +import org.apache.kylin.job.execution.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java index 8777af7..23e81bc 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java @@ -122,7 +122,7 @@ public class SaveStatisticsStep extends AbstractExecutable { } logger.info("The cube algorithm for " + seg + " is " + alg); - CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); cubingJob.setAlgorithm(alg); } http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index 4e1be57..f7af42e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -59,7 +59,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams())); final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams())); - CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); long sourceCount = cubingJob.findSourceRecordCount(); long sourceSizeBytes = cubingJob.findSourceSizeBytes(); long cubeSizeBytes = cubingJob.findCubeSizeBytes(); http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java index 6e8e5ed..d2fa73e 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java @@ -54,7 +54,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable { return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + CubingExecutableUtil.getSegmentId(this.getParams())); } - CubingJob cubingJob = (CubingJob) executableManager.getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); + CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams())); long cubeSizeBytes = cubingJob.findCubeSizeBytes(); // collect source statistics http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index 156b1f6..c3902a2 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -52,9 +52,9 @@ import org.apache.kylin.job.DeployUtil; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.source.ISource; import org.apache.kylin.source.SourceFactory; import org.apache.kylin.source.SourcePartition; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java index 9804292..c2f53e1 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java @@ -49,9 +49,9 @@ import org.apache.kylin.job.DeployUtil; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.job.streaming.Kafka10DataLoader; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.streaming.StreamingConfig; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java index 170c395..9f14deb 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java @@ -31,9 +31,9 @@ import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.metadata.streaming.StreamingManager; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.badquery.BadQueryHistoryManager; import org.apache.kylin.metadata.project.ProjectInstance; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index e66eaec..08471a3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -52,8 +52,8 @@ import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java index 1023a8b..068dbda 100644 --- a/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java +++ b/tool/src/main/java/org/apache/kylin/tool/JobInstanceExtractor.java @@ -36,9 +36,9 @@ import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.JobStatusEnum; import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.metadata.realization.RealizationType; http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java index 4252e74..3c0ce1b 100644 --- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -53,8 +53,8 @@ import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; -import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.source.hive.HiveClientFactory; import org.apache.kylin.source.hive.HiveCmdBuilder;