This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 30c484c KYLIN-3619 Some job won't clean up temp directory after finished 30c484c is described below commit 30c484c33c2ddc728e4bc522dc00c97f7999c4b8 Author: Enwei Jiao <enwei.j...@kyligence.io> AuthorDate: Fri Oct 19 09:35:03 2018 +0800 KYLIN-3619 Some job won't clean up temp directory after finished --- .../org/apache/kylin/common/KylinConfigBase.java | 4 + .../common/persistence/AutoDeleteDirectory.java | 58 +++++++++++++ .../persistence/AutoDeleteDirectoryTest.java | 39 +++++++++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 35 ++++---- .../kylin/engine/mr/common/JobRelatedMetaUtil.java | 38 +++++---- .../kylin/rest/controller/DiagnosisController.java | 24 +++--- .../kylin/rest/service/DiagnosisService.java | 11 +-- .../hive/cardinality/HiveColumnCardinalityJob.java | 97 +++++++++++----------- 8 files changed, 205 insertions(+), 101 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e54d722..5577307 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -103,6 +103,10 @@ abstract public class KylinConfigBase implements Serializable { return getKylinHome() + File.separator + "spark"; } + public static String getTempDir() { + return System.getProperty("java.io.tmpdir"); + } + // backward compatibility check happens when properties is loaded or updated static BackwardCompatibilityConfig BCC = new BackwardCompatibilityConfig(); diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java b/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java new file mode 100644 index 0000000..a496ba8 --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/AutoDeleteDirectory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.persistence; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +public class AutoDeleteDirectory implements Closeable { + + private final File tempFile; + + public AutoDeleteDirectory(File file) { + tempFile = file; + } + public AutoDeleteDirectory(String prefix, String suffix) { + try { + tempFile = File.createTempFile(prefix, suffix); + org.apache.commons.io.FileUtils.forceDelete(tempFile); // we need a directory, so delete the file first + tempFile.mkdirs(); + } catch (IOException e) { + throw new RuntimeException("create temp file " + prefix + "****" + suffix + " failed", e); + } + } + + public String getAbsolutePath() { + return tempFile.getAbsolutePath(); + } + + public AutoDeleteDirectory child(String child) { + return new AutoDeleteDirectory(new File(tempFile, child)); + } + + public File getFile() { + return tempFile; + } + + @Override + public void close() throws IOException { + org.apache.commons.io.FileUtils.forceDelete(tempFile); + } +} diff --git a/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java b/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java new file mode 100644 index 0000000..2bf717c --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/persistence/AutoDeleteDirectoryTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.persistence; + +import java.io.File; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Test; + +public class AutoDeleteDirectoryTest { + + @Test + public void testBasic() throws IOException { + File tempFile = null; + try (AutoDeleteDirectory autoTempFile = new AutoDeleteDirectory("test", "")) { + Assert.assertTrue(autoTempFile.getFile().isDirectory()); + Assert.assertEquals(0, autoTempFile.getFile().listFiles().length); + tempFile = autoTempFile.getFile(); + } + Assert.assertTrue(!tempFile.exists()); + } +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 8873f30..6a9158d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -622,24 +622,29 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } protected void cleanupTempConfFile(Configuration conf) { - String tempMetaFileString = conf.get("tmpfiles"); - logger.trace("tempMetaFileString is : " + tempMetaFileString); - if (tempMetaFileString != null) { - if (tempMetaFileString.startsWith("file://")) { - tempMetaFileString = tempMetaFileString.substring("file://".length()); - File tempMetaFile = new File(tempMetaFileString); - if (tempMetaFile.exists()) { - try { - FileUtils.forceDelete(tempMetaFile.getParentFile()); - - } catch (IOException e) { - logger.warn("error when deleting " + tempMetaFile, e); + String[] tempfiles = StringUtils.split(conf.get("tmpfiles"), ","); + if (tempfiles == null) { + return; + } + for (String tempMetaFileString : tempfiles) { + logger.trace("tempMetaFileString is : " + tempMetaFileString); + if (tempMetaFileString != null) { + if (tempMetaFileString.startsWith("file://")) { + tempMetaFileString = tempMetaFileString.substring("file://".length()); + File tempMetaFile = new File(tempMetaFileString); + if (tempMetaFile.exists()) { + try { + FileUtils.forceDelete(tempMetaFile.getParentFile()); + + } catch (IOException e) { + logger.warn("error when deleting " + tempMetaFile, e); + } + } else { + logger.info("" + tempMetaFileString + " does not exist"); } } else { - logger.info("" + tempMetaFileString + " does not exist"); + logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString); } - } else { - logger.info("tempMetaFileString is not starting with file:// :" + tempMetaFileString); } } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java index 64469a0..d1c88ab 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobRelatedMetaUtil.java @@ -18,9 +18,9 @@ package org.apache.kylin.engine.mr.common; -import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; +import org.apache.kylin.common.persistence.AutoDeleteDirectory; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.ResourceTool; @@ -41,6 +41,9 @@ import java.util.Set; public class JobRelatedMetaUtil { private static final Logger logger = LoggerFactory.getLogger(JobRelatedMetaUtil.class); + private JobRelatedMetaUtil() { + } + public static Set<String> collectCubeMetadata(CubeInstance cube) { // cube, model_desc, cube_desc, table Set<String> dumpList = new LinkedHashSet<>(); @@ -77,25 +80,26 @@ public class JobRelatedMetaUtil { public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl) throws IOException { - File tmp = File.createTempFile("kylin_job_meta", ""); - FileUtils.forceDelete(tmp); // we need a directory, so delete the file first - File metaDir = new File(tmp, "meta"); - metaDir.mkdirs(); + try (AutoDeleteDirectory tmpDir = new AutoDeleteDirectory("kylin_job_meta", ""); + AutoDeleteDirectory metaDir = tmpDir.child("meta")) { + // dump metadata + JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir.getFile(), dumpList); - // dump metadata - dumpResources(kylinConfig, metaDir, dumpList); + // dump metadata + dumpResources(kylinConfig, metaDir.getFile(), dumpList); - // write kylin.properties - Properties props = kylinConfig.exportToProperties(); - props.setProperty("kylin.metadata.url", metadataUrl); - File kylinPropsFile = new File(metaDir, "kylin.properties"); - try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) { - props.store(os, kylinPropsFile.getAbsolutePath()); - } + // write kylin.properties + Properties props = kylinConfig.exportToProperties(); + props.setProperty("kylin.metadata.url", metadataUrl); + File kylinPropsFile = new File(metaDir.getFile(), "kylin.properties"); + try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) { + props.store(os, kylinPropsFile.getAbsolutePath()); + } - KylinConfig dstConfig = KylinConfig.createKylinConfig(props); - //upload metadata - ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); + KylinConfig dstConfig = KylinConfig.createKylinConfig(props); + //upload metadata + ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig); + } } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java index 108ec5a..57af711 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/DiagnosisController.java @@ -25,6 +25,7 @@ import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.apache.kylin.common.persistence.AutoDeleteDirectory; import org.apache.kylin.metadata.badquery.BadQueryEntry; import org.apache.kylin.metadata.badquery.BadQueryHistory; import org.apache.kylin.rest.exception.InternalErrorException; @@ -72,17 +73,18 @@ public class DiagnosisController extends BasicController { /** * Get diagnosis information for project */ - @RequestMapping(value = "/project/{project}/download", method = { RequestMethod.GET }, produces = { "application/json" }) + @RequestMapping(value = "/project/{project}/download", method = { RequestMethod.GET }, produces = { + "application/json" }) @ResponseBody - public void dumpProjectDiagnosisInfo(@PathVariable String project, final HttpServletRequest request, final HttpServletResponse response) { - String filePath; - try { - filePath = dgService.dumpProjectDiagnosisInfo(project); + public void dumpProjectDiagnosisInfo(@PathVariable String project, final HttpServletRequest request, + final HttpServletResponse response) { + try (AutoDeleteDirectory diagDir = new AutoDeleteDirectory("diag_project", "'")) { + String filePath = dgService.dumpProjectDiagnosisInfo(project, diagDir.getFile()); + setDownloadResponse(filePath, response); } catch (IOException e) { throw new InternalErrorException("Failed to dump project diagnosis info. " + e.getMessage(), e); } - setDownloadResponse(filePath, response); } /** @@ -90,15 +92,15 @@ public class DiagnosisController extends BasicController { */ @RequestMapping(value = "/job/{jobId}/download", method = { RequestMethod.GET }, produces = { "application/json" }) @ResponseBody - public void dumpJobDiagnosisInfo(@PathVariable String jobId, final HttpServletRequest request, final HttpServletResponse response) { - String filePath; - try { - filePath = dgService.dumpJobDiagnosisInfo(jobId); + public void dumpJobDiagnosisInfo(@PathVariable String jobId, final HttpServletRequest request, + final HttpServletResponse response) { + try (AutoDeleteDirectory diagDir = new AutoDeleteDirectory("diag_job", "'")) { + String filePath = dgService.dumpJobDiagnosisInfo(jobId, diagDir.getFile()); + setDownloadResponse(filePath, response); } catch (IOException e) { throw new InternalErrorException("Failed to dump job diagnosis info. " + e.getMessage(), e); } - setDownloadResponse(filePath, response); } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java index 57900eb..528858b 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/DiagnosisService.java @@ -42,17 +42,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.google.common.collect.Lists; -import com.google.common.io.Files; @Component("diagnosisService") public class DiagnosisService extends BasicService { private static final Logger logger = LoggerFactory.getLogger(DiagnosisService.class); - protected File getDumpDir() { - return Files.createTempDir(); - } - @Autowired private AclEvaluate aclEvaluate; @@ -85,17 +80,15 @@ public class DiagnosisService extends BasicService { return getBadQueryHistoryManager().getBadQueriesForProject(project); } - public String dumpProjectDiagnosisInfo(String project) throws IOException { + public String dumpProjectDiagnosisInfo(String project, File exportPath) throws IOException { aclEvaluate.checkProjectOperationPermission(project); - File exportPath = getDumpDir(); String[] args = { project, exportPath.getAbsolutePath() }; runDiagnosisCLI(args); return getDiagnosisPackageName(exportPath); } - public String dumpJobDiagnosisInfo(String jobId) throws IOException { + public String dumpJobDiagnosisInfo(String jobId, File exportPath) throws IOException { aclEvaluate.checkProjectOperationPermission(jobService.getJobInstance(jobId)); - File exportPath = getDumpDir(); String[] args = { jobId, exportPath.getAbsolutePath() }; runDiagnosisCLI(args); return getDiagnosisPackageName(exportPath); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java index f51fce0..89764c9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/HiveColumnCardinalityJob.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.source.hive.cardinality; @@ -53,68 +53,67 @@ public class HiveColumnCardinalityJob extends AbstractHadoopJob { protected static final Option OPTION_TABLE = OptionBuilder.withArgName("table name").hasArg().isRequired(true) .withDescription("The hive table name").create("table"); - public HiveColumnCardinalityJob() { - } - @Override public int run(String[] args) throws Exception { + try { + Options options = new Options(); - Options options = new Options(); - - options.addOption(OPTION_PROJECT); - options.addOption(OPTION_TABLE); - options.addOption(OPTION_OUTPUT_PATH); - - parseOptions(options, args); + options.addOption(OPTION_PROJECT); + options.addOption(OPTION_TABLE); + options.addOption(OPTION_OUTPUT_PATH); - // start job - String jobName = JOB_TITLE + getOptionsAsString(); - logger.info("Starting: " + jobName); - Configuration conf = getConf(); + parseOptions(options, args); - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); - conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); + // start job + String jobName = JOB_TITLE + getOptionsAsString(); + logger.info("Starting: {}", jobName); + Configuration conf = getConf(); - job = Job.getInstance(conf, jobName); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + JobEngineConfig jobEngineConfig = new JobEngineConfig(kylinConfig); + conf.addResource(new Path(jobEngineConfig.getHadoopJobConfFilePath(null))); - setJobClasspath(job, kylinConfig); + job = Job.getInstance(conf, jobName); - String project = getOptionValue(OPTION_PROJECT); - String table = getOptionValue(OPTION_TABLE); - job.getConfiguration().set(BatchConstants.CFG_PROJECT_NAME, project); - job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table); + setJobClasspath(job, kylinConfig); - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); - FileOutputFormat.setOutputPath(job, output); - job.getConfiguration().set("dfs.blocksize", "67108864"); - job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); + String project = getOptionValue(OPTION_PROJECT); + String table = getOptionValue(OPTION_TABLE); + job.getConfiguration().set(BatchConstants.CFG_PROJECT_NAME, project); + job.getConfiguration().set(BatchConstants.CFG_TABLE_NAME, table); - // Mapper - IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project, - getOptionValue(OPTION_CUBING_JOB_ID)); - tableInputFormat.configureJob(job); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set("dfs.blocksize", "67108864"); + job.getConfiguration().set("mapreduce.output.fileoutputformat.compress", "false"); - job.setMapperClass(ColumnCardinalityMapper.class); - job.setMapOutputKeyClass(IntWritable.class); - job.setMapOutputValueClass(BytesWritable.class); + // Mapper + IMRTableInputFormat tableInputFormat = MRUtil.getTableInputFormat(table, project, + getOptionValue(OPTION_CUBING_JOB_ID)); + tableInputFormat.configureJob(job); - // Reducer - only one - job.setReducerClass(ColumnCardinalityReducer.class); - job.setOutputFormatClass(TextOutputFormat.class); - job.setOutputKeyClass(IntWritable.class); - job.setOutputValueClass(LongWritable.class); - job.setNumReduceTasks(1); + job.setMapperClass(ColumnCardinalityMapper.class); + job.setMapOutputKeyClass(IntWritable.class); + job.setMapOutputValueClass(BytesWritable.class); - this.deletePath(job.getConfiguration(), output); + // Reducer - only one + job.setReducerClass(ColumnCardinalityReducer.class); + job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputKeyClass(IntWritable.class); + job.setOutputValueClass(LongWritable.class); + job.setNumReduceTasks(1); - logger.info("Going to submit HiveColumnCardinalityJob for table '" + table + "'"); + this.deletePath(job.getConfiguration(), output); - TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(table, project); - attachTableMetadata(tableDesc, job.getConfiguration()); - int result = waitForCompletion(job); + logger.info("Going to submit HiveColumnCardinalityJob for table '{}'", table); - return result; + TableDesc tableDesc = TableMetadataManager.getInstance(kylinConfig).getTableDesc(table, project); + attachTableMetadata(tableDesc, job.getConfiguration()); + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } } }