This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new d7d2064  Fix storage clean up tool
d7d2064 is described below

commit d7d20645d797aa43ee41456d61a0a96bfd42d378
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Fri Jan 22 10:27:20 2021 +0800

    Fix storage clean up tool
---
 .../apache/kylin/rest/job/StorageCleanupJob.java   | 27 ++++++++--------------
 .../kylin/rest/job/StorageCleanupJobTest.java      | 27 +++++++++++++++++++---
 2 files changed, 34 insertions(+), 20 deletions(-)

diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java 
b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index cd7f9c5..cde1e53 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.HadoopUtil;
@@ -37,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -54,11 +56,13 @@ public class StorageCleanupJob extends AbstractApplication {
 
     protected boolean delete = false;
 
+    protected static final List<String> protectedDir = 
Arrays.asList("cube_statistics", "resources-jdbc");
+    protected static PathFilter pathFilter = status -> 
!protectedDir.contains(status.getName());
+
     public StorageCleanupJob() throws IOException {
         this(KylinConfig.getInstanceFromEnv(), 
HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration()));
     }
 
-
     public StorageCleanupJob(KylinConfig config, FileSystem fs) {
         this.config = config;
         this.fs = fs;
@@ -83,26 +87,14 @@ public class StorageCleanupJob extends AbstractApplication {
     public void cleanup() throws Exception {
         ProjectManager projectManager = ProjectManager.getInstance(config);
         CubeManager cubeManager = CubeManager.getInstance(config);
-
-        //clean up job temp files
-        List<String> projects = 
projectManager.listAllProjects().stream().map(ProjectInstance::getName).collect(Collectors.toList());
-        for (String project : projects) {
-            String tmpPath = config.getJobTmpDir(project);
-            if (delete) {
-                logger.info("Deleting HDFS path " + tmpPath);
-                if (fs.exists(new Path(tmpPath))) {
-                    fs.delete(new Path(tmpPath), true);
-                }
-            } else {
-                logger.info("Dry run, pending delete HDFS path " + tmpPath);
-            }
-        }
+        List<String> projects = 
projectManager.listAllProjects().stream().map(ProjectInstance::getName)
+                .collect(Collectors.toList());
 
         //clean up deleted projects and cubes
         List<CubeInstance> cubes = cubeManager.listAllCubes();
         Path metadataPath = new Path(config.getHdfsWorkingDirectory());
         if (fs.exists(metadataPath)) {
-            FileStatus[] projectStatus = fs.listStatus(metadataPath);
+            FileStatus[] projectStatus = fs.listStatus(metadataPath, 
pathFilter);
             if (projectStatus != null) {
                 for (FileStatus status : projectStatus) {
                     String projectName = status.getPath().getName();
@@ -114,7 +106,8 @@ public class StorageCleanupJob extends AbstractApplication {
                             logger.info("Dry run, pending delete HDFS path " + 
status.getPath());
                         }
                     } else {
-                        cleanupDeletedCubes(projectName, 
cubes.stream().map(CubeInstance::getName).collect(Collectors.toList()));
+                        cleanupDeletedCubes(projectName,
+                                
cubes.stream().map(CubeInstance::getName).collect(Collectors.toList()));
                     }
                 }
             }
diff --git 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
index daa9f4d..7cab98e 100644
--- 
a/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
+++ 
b/server-base/src/test/java/org/apache/kylin/rest/job/StorageCleanupJobTest.java
@@ -67,10 +67,10 @@ public class StorageCleanupJobTest {
         job.execute(new String[] { "--delete", "true" });
 
         ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
-        verify(mockFs, times(5)).delete(pathCaptor.capture(), eq(true));
+        verify(mockFs, times(4)).delete(pathCaptor.capture(), eq(true));
         ArrayList<Path> expected = Lists.newArrayList(
                 // Verify clean job temp directory
-                new Path(basePath + "/default/job_tmp"),
+                // new Path(basePath + "/default/job_tmp"),
 
                 //Verify clean dropped cube
                 new Path(basePath + "/default/parquet/dropped_cube"),
@@ -100,6 +100,12 @@ public class StorageCleanupJobTest {
         FileStatus project1 = mock(FileStatus.class);
         FileStatus project2 = mock(FileStatus.class);
 
+        FileStatus[] protectedStatuses = new FileStatus[2];
+        FileStatus cubeStatistics = mock(FileStatus.class);
+        FileStatus resourcesJdbc = mock(FileStatus.class);
+
+        FileStatus[] allStatuses = new FileStatus[4];
+
         // Remove job temp directory
         Path jobTmpPath = new Path(basePath + "/default/job_tmp");
         when(mockFs.exists(jobTmpPath)).thenReturn(true);
@@ -130,6 +136,20 @@ public class StorageCleanupJobTest {
         projectStatuses[0] = project1;
         projectStatuses[1] = project2;
 
+        Path cubeStatisticsPath = new Path(basePath + "/default/parquet");
+        Path resourcesJdbcPath = new Path(basePath + 
"/deleted_project/parquet");
+        when(cubeStatistics.getPath()).thenReturn(cubeStatisticsPath);
+        when(resourcesJdbc.getPath()).thenReturn(resourcesJdbcPath);
+        protectedStatuses[0] = cubeStatistics;
+        protectedStatuses[1] = resourcesJdbc;
+        when(mockFs.delete(cubeStatisticsPath, true)).thenReturn(true);
+        when(mockFs.delete(resourcesJdbcPath, true)).thenReturn(true);
+
+        allStatuses[0] = project1;
+        allStatuses[1] = project2;
+        allStatuses[2] = cubeStatistics;
+        allStatuses[3] = resourcesJdbc;
+
         Path defaultProjectParquetPath = new Path(basePath + 
"/default/parquet");
         Path deletedProjectParquetPath = new Path(basePath + 
"/deleted_project/parquet");
         when(mockFs.exists(defaultProjectParquetPath)).thenReturn(true);
@@ -138,6 +158,7 @@ public class StorageCleanupJobTest {
         when(mockFs.exists(basePath)).thenReturn(true);
         when(mockFs.listStatus(new Path(basePath + 
"/default/parquet/ci_left_join_cube"))).thenReturn(segmentStatuses);
         
when(mockFs.listStatus(defaultProjectParquetPath)).thenReturn(cubeStatuses);
-        when(mockFs.listStatus(basePath)).thenReturn(projectStatuses);
+        when(mockFs.listStatus(basePath)).thenReturn(allStatuses);
+        when(mockFs.listStatus(basePath, 
StorageCleanupJob.pathFilter)).thenReturn(projectStatuses);
     }
 }

Reply via email to