KYLIN-1154 cleanup old jobs and job outputs
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/87cf6f32 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/87cf6f32 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/87cf6f32 Branch: refs/heads/master Commit: 87cf6f32e8302cd2ddd18e9758f72277b8233fed Parents: 363d765 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Nov 18 14:46:54 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Nov 18 14:47:22 2015 +0800 ---------------------------------------------------------------------- bin/metastore.sh | 1 - .../kylin/common/persistence/ResourceStore.java | 3 ++ .../common/util/HBaseMetadataTestCase.java | 7 +++- .../org/apache/kylin/job/dao/ExecutableDao.java | 12 +++---- .../job/hadoop/cube/MetadataCleanupJob.java | 37 +++++++++++++++----- 5 files changed, 43 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/87cf6f32/bin/metastore.sh ---------------------------------------------------------------------- diff --git a/bin/metastore.sh b/bin/metastore.sh index 39593d4..6d2279f 100755 --- a/bin/metastore.sh +++ b/bin/metastore.sh @@ -26,7 +26,6 @@ dir=$(dirname ${0}) -source ${dir}/check-env.sh if [ $1 == "backup" ] then http://git-wip-us.apache.org/repos/asf/kylin/blob/87cf6f32/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java index db70997..5375597 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java @@ -52,6 +52,9 @@ abstract public class ResourceStore { public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd"; public static final String TABLE_RESOURCE_ROOT = "/table"; public static final String HYBRID_RESOURCE_ROOT = "/hybrid"; + public static final String EXECUTE_PATH_ROOT = "/execute"; + public static final String EXECUTE_OUTPUT_ROOT = "/execute_output"; + private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>(); http://git-wip-us.apache.org/repos/asf/kylin/blob/87cf6f32/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java b/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java index 0fab2db..d2e3238 100644 --- a/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java +++ b/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java @@ -21,6 +21,7 @@ package org.apache.kylin.common.util; import org.apache.commons.lang.StringUtils; import java.io.File; +import java.io.IOException; /** * @author ysong1 @@ -30,7 +31,11 @@ public class HBaseMetadataTestCase extends AbstractKylinTestCase { static { if (useSandbox()) { try { - ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath()); + File sandboxFolder = new File("../examples/test_case_data/sandbox/"); + if (sandboxFolder.exists() == false) { + throw new IOException("The sandbox folder doesn't exist: " + sandboxFolder.getAbsolutePath()); + } + ClassUtil.addClasspath(sandboxFolder.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/87cf6f32/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java index 9b79abf..482f7a0 100644 --- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java +++ b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java @@ -43,8 +43,6 @@ public class ExecutableDao { private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class); private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class); private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>(); - public static final String JOB_PATH_ROOT = "/execute"; - public static final String JOB_OUTPUT_ROOT = "/execute_output"; private ResourceStore store; @@ -71,11 +69,11 @@ public class ExecutableDao { } private String pathOfJob(String uuid) { - return JOB_PATH_ROOT + "/" + uuid; + return ResourceStore.EXECUTE_PATH_ROOT + "/" + uuid; } private String pathOfJobOutput(String uuid) { - return JOB_OUTPUT_ROOT + "/" + uuid; + return ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + uuid; } private ExecutablePO readJobResource(String path) throws IOException { @@ -96,7 +94,7 @@ public class ExecutableDao { public List<ExecutableOutputPO> getJobOutputs() throws PersistentException { try { - ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT); + ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_ROOT); if (resources == null || resources.isEmpty()) { return Collections.emptyList(); } @@ -112,7 +110,7 @@ public class ExecutableDao { public List<ExecutablePO> getJobs() throws PersistentException { try { - final List<String> jobIds = store.listResources(JOB_PATH_ROOT); + final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_PATH_ROOT); if (jobIds == null || jobIds.isEmpty()) { return Collections.emptyList(); } @@ -128,7 +126,7 @@ public class ExecutableDao { public List<String> getJobIds() throws PersistentException { try { - ArrayList<String> resources = store.listResources(JOB_PATH_ROOT); + ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_PATH_ROOT); if (resources == null) { return Collections.emptyList(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/87cf6f32/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java index 6d06dcc..e46093c 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java @@ -18,11 +18,8 @@ package org.apache.kylin.job.hadoop.cube; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -30,12 +27,18 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.constant.JobStatusEnum; +import org.apache.kylin.job.dao.ExecutableDao; +import org.apache.kylin.job.dao.ExecutableOutputPO; +import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; public class MetadataCleanupJob extends AbstractHadoopJob { @@ -49,6 +52,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob { private KylinConfig config = null; public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days + public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days /* * (non-Javadoc) @@ -143,8 +147,25 @@ public class MetadataCleanupJob extends AbstractHadoopJob { } } + // delete old and completed jobs + ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv()); + List<ExecutablePO> allExecutable = executableDao.getJobs(); + for (ExecutablePO executable : allExecutable) { + long lastModified = executable.getLastModified(); + ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid()); + if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) { + toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + executable.getUuid()); + toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + executable.getUuid()); + + for (ExecutablePO task : executable.getTasks()) { + toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + task.getUuid()); + toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + task.getUuid()); + } + } + } + if (toDeleteResource.size() > 0) { - logger.info("The following resources have no reference, will be cleaned from metadata store: \n"); + logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n"); for (String s : toDeleteResource) { logger.info(s);