Repository: kylin
Updated Branches:
  refs/heads/master 5ea33f217 -> 0aea876f8


minor refactor


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0aea876f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0aea876f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0aea876f

Branch: refs/heads/master
Commit: 0aea876f800476b5741a72179044376cac178510
Parents: 5ea33f2
Author: Hongbin Ma <[email protected]>
Authored: Mon Jul 18 19:13:27 2016 +0800
Committer: Hongbin Ma <[email protected]>
Committed: Mon Jul 18 19:13:43 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/JobInstance.java  | 26 ----------------
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |  2 +-
 .../kylin/engine/mr/BatchMergeJobBuilder2.java  | 30 ++----------------
 .../org/apache/kylin/engine/mr/IMROutput2.java  |  6 ++--
 .../kylin/engine/mr/JobBuilderSupport.java      |  9 +++++-
 .../hbase/steps/HBaseMROutput2Transition.java   | 12 +++++---
 .../kylin/storage/hbase/steps/HBaseMRSteps.java | 32 ++++++++++++++++++--
 .../storage/hbase/util/CubeMigrationCLI.java    |  6 ++--
 .../storage/hbase/util/StorageCleanupJob.java   |  7 +++--
 9 files changed, 60 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java 
b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
index e5a4540..8dcdff6 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -27,7 +27,6 @@ import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -41,33 +40,8 @@ import com.google.common.collect.Lists;
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = 
Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = 
Visibility.NONE)
 public class JobInstance extends RootPersistentEntity implements 
Comparable<JobInstance> {
 
-    public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
-    public static final String YARN_APP_ID = "yarn_application_id";
     public static final String YARN_APP_URL = "yarn_application_tracking_url";
     public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-
-    public static String getStepIdentity(JobInstance jobInstance, JobStep 
jobStep) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + 
"." + jobStep.getSequenceID();
-    }
-
-    public static String getJobIdentity(JobInstance jobInstance) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
-    }
-
-    public static String getJobWorkingDir(JobInstance jobInstance, 
JobEngineConfig engineConfig) {
-        return getJobWorkingDir(jobInstance.getUuid(), 
engineConfig.getHdfsWorkingDirectory());
-    }
-
-    public static String getJobWorkingDir(String jobUuid, String 
hdfsWorkdingDir) {
-        if (jobUuid == null || jobUuid.equals("")) {
-            throw new IllegalArgumentException("jobUuid can't be null or 
empty");
-        }
-        return hdfsWorkdingDir + JOB_WORKING_DIR_PREFIX + jobUuid;
-    }
 
     @JsonProperty("name")
     private String name;

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index afa601c..93ae1e4 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -66,7 +66,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport 
{
         // Phase 3: Build Cube
         addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, 
only selected algorithm will execute
         result.addTask(createInMemCubingStep(jobId, cuboidRootPath)); // inmem 
cubing, only selected algorithm will execute
-        outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
+        outputSide.addStepPhase3_BuildCube(result);
 
         // Phase 4: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 06b7528..33081c7 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -20,11 +20,8 @@ package org.apache.kylin.engine.mr;
 
 import java.util.List;
 
-import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
@@ -48,18 +45,15 @@ public class BatchMergeJobBuilder2 extends 
JobBuilderSupport {
     public CubingJob build() {
         logger.info("MR_V2 new job to MERGE segment " + seg);
 
-        final CubeSegment cubeSegment = (CubeSegment) seg;
+        final CubeSegment cubeSegment = seg;
         final CubingJob result = CubingJob.createMergeJob(cubeSegment, 
submitter, config);
         final String jobId = result.getId();
-        final String cuboidRootPath = getCuboidRootPath(jobId);
 
         final List<CubeSegment> mergingSegments = 
cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
         Preconditions.checkState(mergingSegments.size() > 1, "there should be 
more than 2 segments to merge");
         final List<String> mergingSegmentIds = Lists.newArrayList();
-        final List<String> mergingCuboidPaths = Lists.newArrayList();
         for (CubeSegment merging : mergingSegments) {
             mergingSegmentIds.add(merging.getUuid());
-            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
         }
 
         // Phase 1: Merge Dictionary
@@ -68,9 +62,7 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // Phase 2: Merge Cube Files
-        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
-        result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, 
cuboidRootPath));
-        outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
+        outputSide.addStepPhase2_BuildCube(seg, mergingSegments, result);
 
         // Phase 3: Update Metadata & Cleanup
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, 
jobId));
@@ -91,26 +83,8 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport 
{
         return result;
     }
 
-    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, 
String inputPath, String outputPath) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, 
seg.getName());
-        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
-        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, 
"Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(getMergeCuboidJob());
-        return mergeCuboidDataStep;
-    }
-
     protected Class<? extends AbstractHadoopJob> getMergeCuboidJob() {
         return MergeCuboidJob.class;
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 844eb07..603f207 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -21,6 +21,8 @@ package org.apache.kylin.engine.mr;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
+import java.util.List;
+
 public interface IMROutput2 {
 
     /** Return a helper to participate in batch cubing job flow. */
@@ -47,7 +49,7 @@ public interface IMROutput2 {
          * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension 
value with
          * dictionary encoding; Mx is measure value serialization form.
          */
-        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, 
String cuboidRootPath);
+        public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
 
         /** Add step that does any necessary clean up. */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
@@ -76,7 +78,7 @@ public interface IMROutput2 {
          * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension 
value with
          * dictionary encoding; Mx is measure value serialization form.
          */
-        public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, 
String cuboidRootPath);
+        public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> 
mergingSegments, DefaultChainedExecutable jobFlow);
 
         /** Add step that does any necessary clean up. */
         public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 010eeeb..3e9aff6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -178,7 +178,14 @@ public class JobBuilderSupport {
     // 
----------------------------------------------------------------------------
 
     public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
-        return conf.getHdfsWorkingDirectory() + "kylin-" + jobId;
+        return getJobWorkingDir(conf.getHdfsWorkingDirectory(), jobId);
+    }
+
+    public static String getJobWorkingDir(String hdfsDir, String jobId) {
+        if (!hdfsDir.endsWith("/")) {
+            hdfsDir = hdfsDir + "/";
+        }
+        return hdfsDir + "kylin-" + jobId;
     }
 
     public static StringBuilder appendExecCmdParameters(StringBuilder buf, 
String paraName, String paraValue) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 7bb3647..c4df354 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -18,8 +18,11 @@
 
 package org.apache.kylin.storage.hbase.steps;
 
+import java.util.List;
+
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +53,8 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
 
             @Override
-            public void addStepPhase3_BuildCube(DefaultChainedExecutable 
jobFlow, String cuboidRootPath) {
-                
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, 
jobFlow.getId()));
+            public void addStepPhase3_BuildCube(DefaultChainedExecutable 
jobFlow) {
+                
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
 
@@ -73,8 +76,9 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             }
 
             @Override
-            public void addStepPhase2_BuildCube(DefaultChainedExecutable 
jobFlow, String cuboidRootPath) {
-                
jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, 
jobFlow.getId()));
+            public void addStepPhase2_BuildCube(CubeSegment seg, 
List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
+                jobFlow.addTask(steps.createMergeCuboidDataStep(seg, 
mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
+                
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 8a8a750..0914827 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -21,10 +21,12 @@ package org.apache.kylin.storage.hbase.steps;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
@@ -49,7 +51,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         // create htable step
         jobFlow.addTask(createCreateHTableStep(jobId));
         // generate hfiles step
-        jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath, jobId));
+        jobFlow.addTask(createConvertCuboidToHfileStep(jobId));
         // bulk load step
         jobFlow.addTask(createBulkLoadStep(jobId));
     }
@@ -95,7 +97,33 @@ public class HBaseMRSteps extends JobBuilderSupport {
         return createHtableStep;
     }
 
-    public MapReduceExecutable createConvertCuboidToHfileStep(String 
cuboidRootPath, String jobId) {
+    public MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, 
List<CubeSegment> mergingSegments, String jobID, Class<? extends 
AbstractHadoopJob> clazz) {
+
+        final List<String> mergingCuboidPaths = Lists.newArrayList();
+        for (CubeSegment merging : mergingSegments) {
+            mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
+        }
+        String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+        String outputPath = getCuboidRootPath(jobID);
+
+        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+        
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_NAME, 
seg.getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, formattedPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, 
"Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+        mergeCuboidDataStep.setMapReduceJobClass(clazz);
+        return mergeCuboidDataStep;
+    }
+
+    public MapReduceExecutable createConvertCuboidToHfileStep(String jobId) {
+        String cuboidRootPath = getCuboidRootPath(jobId);
         String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" 
: "/") + "*";
 
         MapReduceExecutable createHFilesStep = new MapReduceExecutable();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 32aa4b0..541c173 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -52,7 +52,7 @@ import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.lookup.SnapshotManager;
 import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -189,8 +189,8 @@ public class CubeMigrationCLI {
         for (CubeSegment segment : cube.getSegments()) {
 
             String jobUuid = segment.getLastBuildJobID();
-            String src = JobInstance.getJobWorkingDir(jobUuid, 
srcConfig.getHdfsWorkingDirectory());
-            String tgt = JobInstance.getJobWorkingDir(jobUuid, 
dstConfig.getHdfsWorkingDirectory());
+            String src = 
JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), 
jobUuid);
+            String tgt = 
JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), 
jobUuid);
 
             operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] 
{ src, tgt }));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0aea876f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index eb000e9..249f506 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -49,6 +49,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -187,7 +188,7 @@ public class StorageCleanupJob extends AbstractApplication {
         for (FileStatus status : fStatus) {
             String path = status.getPath().getName();
             // System.out.println(path);
-            if (path.startsWith(JobInstance.JOB_WORKING_DIR_PREFIX)) {
+            if (path.startsWith("kylin-")) {
                 String kylinJobPath = engineConfig.getHdfsWorkingDirectory() + 
path;
                 allHdfsPathsNeedToBeDeleted.add(kylinJobPath);
             }
@@ -198,7 +199,7 @@ public class StorageCleanupJob extends AbstractApplication {
             // only remove FINISHED and DISCARDED job intermediate files
             final ExecutableState state = 
executableManager.getOutput(jobId).getState();
             if (!state.isFinalState()) {
-                String path = JobInstance.getJobWorkingDir(jobId, 
engineConfig.getHdfsWorkingDirectory());
+                String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(), 
jobId);
                 allHdfsPathsNeedToBeDeleted.remove(path);
                 logger.info("Skip " + path + " from deletion list, as the path 
belongs to job " + jobId + " with status " + state);
             }
@@ -209,7 +210,7 @@ public class StorageCleanupJob extends AbstractApplication {
             for (CubeSegment seg : cube.getSegments()) {
                 String jobUuid = seg.getLastBuildJobID();
                 if (jobUuid != null && jobUuid.equals("") == false) {
-                    String path = JobInstance.getJobWorkingDir(jobUuid, 
engineConfig.getHdfsWorkingDirectory());
+                    String path = 
JobBuilderSupport.getJobWorkingDir(engineConfig.getHdfsWorkingDirectory(),jobUuid);
                     allHdfsPathsNeedToBeDeleted.remove(path);
                     logger.info("Skip " + path + " from deletion list, as the 
path belongs to segment " + seg + " of cube " + cube.getName());
                 }

Reply via email to