This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new d7d2064 Fix storage clean up tool d7d2064 is described below commit d7d20645d797aa43ee41456d61a0a96bfd42d378 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Fri Jan 22 10:27:20 2021 +0800 Fix storage clean up tool --- .../apache/kylin/rest/job/StorageCleanupJob.java | 27 ++++++++-------------- .../kylin/rest/job/StorageCleanupJobTest.java | 27 +++++++++++++++++++--- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java index cd7f9c5..cde1e53 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java +++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java @@ -24,6 +24,7 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractApplication; import org.apache.kylin.common.util.HadoopUtil; @@ -37,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -54,11 +56,13 @@ public class StorageCleanupJob extends AbstractApplication { protected boolean delete = false; + protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc"); + protected static PathFilter pathFilter = status -> !protectedDir.contains(status.getName()); + public StorageCleanupJob() throws IOException { this(KylinConfig.getInstanceFromEnv(), HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration())); } - public StorageCleanupJob(KylinConfig config, FileSystem fs) { this.config = config; this.fs = fs; @@ -83,26 +87,14 @@ public class StorageCleanupJob extends AbstractApplication { public void cleanup() throws Exception { ProjectManager projectManager = ProjectManager.getInstance(config); CubeManager cubeManager = CubeManager.getInstance(config); - - //clean up job temp files - List<String> projects = projectManager.listAllProjects().stream().map(ProjectInstance::getName).collect(Collectors.toList()); - for (String project : projects) { - String tmpPath = config.getJobTmpDir(project); - if (delete) { - logger.info("Deleting HDFS path " + tmpPath); - if (fs.exists(new Path(tmpPath))) { - fs.delete(new Path(tmpPath), true); - } - } else { - logger.info("Dry run, pending delete HDFS path " + tmpPath); - } - } + List<String> projects = projectManager.listAllProjects().stream().map(ProjectInstance::getName) + .collect(Collectors.toList()); //clean up deleted projects and cubes List<CubeInstance> cubes = cubeManager.listAllCubes(); Path metadataPath = new Path(config.getHdfsWorkingDirectory()); if (fs.exists(metadataPath)) { - FileStatus[] projectStatus = fs.listStatus(metadataPath); + FileStatus[] projectStatus = fs.listStatus(metadataPath, pathFilter); if (projectStatus != null) { for (FileStatus status : projectStatus) { String projectName = status.getPath().getName(); @@ -114,7 +106,8 @@ public class StorageCleanupJob extends AbstractApplication { logger.info("Dry run, pending delete HDFS path " + status.getPath()); } } else { - cleanupDeletedCubes(projectName, cubes.stream().map(CubeInstance::getName).collect(Collectors.toList())); + cleanupDeletedCubes(projectName, + cubes.stream().map(CubeInstance::getName).collect(Collectors.toList())); } } } diff --git a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java index daa9f4d..7cab98e 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java @@ -67,10 +67,10 @@ public class StorageCleanupJobTest { job.execute(new String[] { "--delete", "true" }); ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class); - verify(mockFs, times(5)).delete(pathCaptor.capture(), eq(true)); + verify(mockFs, times(4)).delete(pathCaptor.capture(), eq(true)); ArrayList<Path> expected = Lists.newArrayList( // Verify clean job temp directory - new Path(basePath + "/default/job_tmp"), + // new Path(basePath + "/default/job_tmp"), //Verify clean dropped cube new Path(basePath + "/default/parquet/dropped_cube"), @@ -100,6 +100,12 @@ public class StorageCleanupJobTest { FileStatus project1 = mock(FileStatus.class); FileStatus project2 = mock(FileStatus.class); + FileStatus[] protectedStatuses = new FileStatus[2]; + FileStatus cubeStatistics = mock(FileStatus.class); + FileStatus resourcesJdbc = mock(FileStatus.class); + + FileStatus[] allStatuses = new FileStatus[4]; + // Remove job temp directory Path jobTmpPath = new Path(basePath + "/default/job_tmp"); when(mockFs.exists(jobTmpPath)).thenReturn(true); @@ -130,6 +136,20 @@ public class StorageCleanupJobTest { projectStatuses[0] = project1; projectStatuses[1] = project2; + Path cubeStatisticsPath = new Path(basePath + "/default/parquet"); + Path resourcesJdbcPath = new Path(basePath + "/deleted_project/parquet"); + when(cubeStatistics.getPath()).thenReturn(cubeStatisticsPath); + when(resourcesJdbc.getPath()).thenReturn(resourcesJdbcPath); + protectedStatuses[0] = cubeStatistics; + protectedStatuses[1] = resourcesJdbc; + when(mockFs.delete(cubeStatisticsPath, true)).thenReturn(true); + when(mockFs.delete(resourcesJdbcPath, true)).thenReturn(true); + + allStatuses[0] = project1; + allStatuses[1] = project2; + allStatuses[2] = cubeStatistics; + allStatuses[3] = resourcesJdbc; + Path defaultProjectParquetPath = new Path(basePath + "/default/parquet"); Path deletedProjectParquetPath = new Path(basePath + "/deleted_project/parquet"); when(mockFs.exists(defaultProjectParquetPath)).thenReturn(true); @@ -138,6 +158,7 @@ public class StorageCleanupJobTest { when(mockFs.exists(basePath)).thenReturn(true); when(mockFs.listStatus(new Path(basePath + "/default/parquet/ci_left_join_cube"))).thenReturn(segmentStatuses); when(mockFs.listStatus(defaultProjectParquetPath)).thenReturn(cubeStatuses); - when(mockFs.listStatus(basePath)).thenReturn(projectStatuses); + when(mockFs.listStatus(basePath)).thenReturn(allStatuses); + when(mockFs.listStatus(basePath, StorageCleanupJob.pathFilter)).thenReturn(projectStatuses); } }