This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 0dc4c06641966e4fb2975f1ff738de65ff5d846c
Author: yaqian.zhang <598593...@qq.com>
AuthorDate: Tue Mar 23 17:52:42 2021 +0800

    KYLIN-4966 Refresh the existing segment according to the new cuboid list in 
kylin4
---
 .../apache/kylin/cube/model/CubeBuildTypeEnum.java |   5 +
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../org/apache/kylin/metadata/model/TableDesc.java |   1 -
 .../spark/SparkBatchCubingEngineParquet.java       |   3 +-
 .../engine/spark/job/FilterRecommendCuboidJob.java | 104 ++++++
 .../kylin/engine/spark/job/JobStepFactory.java     |  10 +-
 .../apache/kylin/engine/spark/job/JobStepType.java |   4 +-
 .../engine/spark/job/NResourceDetectStep.java      |  40 +-
 .../NSparkBatchOptimizeJobCheckpointBuilder.java   |  88 +++++
 .../spark/job/NSparkCleanupHdfsStorageStep.java    |  90 +++++
 .../kylin/engine/spark/job/NSparkCubingUtil.java   |   7 +-
 ...esourceDetectStep.java => NSparkLocalStep.java} |  40 +-
 .../engine/spark/job/NSparkOptimizingJob.java      | 108 ++++++
 .../engine/spark/job/NSparkOptimizingStep.java     |  94 +++++
 .../job/NSparkUpdateCubeInfoAfterOptimizeStep.java |  66 ++++
 .../kylin/engine/spark/job/OptimizeBuildJob.java}  | 411 +++++++++------------
 .../job/ResourceDetectBeforeOptimizingJob.java     | 108 ++++++
 .../engine/spark/utils/UpdateMetadataUtil.java     |  84 ++++-
 .../kylin/engine/spark/job/BuildJobInfos.scala     |  39 ++
 .../kylin/engine/spark/job/CubeBuildJob.java       |  16 +-
 .../engine/spark/job/CuboidStatisticsJob.scala     |   9 +
 .../kylin/engine/spark/job/LogJobInfoUtils.scala   |  32 ++
 .../engine/spark/job/ParentSourceChooser.scala     |  33 +-
 .../spark/job/ResourceDetectBeforeCubingJob.java   |   8 +-
 .../engine/spark/metadata/cube/ManagerHub.java     |  13 +-
 .../kylin/engine/spark/metadata/MetaData.scala     |   4 +
 .../engine/spark/metadata/MetadataConverter.scala  |  21 +-
 .../kylin/engine/spark2/NOptimizeJobTest.java      | 151 ++++++++
 .../kylin/rest/controller/CubeController.java      |   1 +
 .../org/apache/kylin/rest/service/JobService.java  |  13 +-
 30 files changed, 1248 insertions(+), 356 deletions(-)

diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
index 6a14025..5b502b1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeBuildTypeEnum.java
@@ -38,6 +38,11 @@ public enum CubeBuildTypeEnum {
     REFRESH,
 
     /**
+     * optimize segments
+     */
+    OPTIMIZE,
+
+    /**
      * checkpoint for set of other jobs
      */
     CHECKPOINT
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 5ef7a69..0d5e482 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -94,5 +94,6 @@ public final class ExecutableConstants {
     public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = 
"mergedict";
     //kylin on parquetv2
     public static final String STEP_NAME_DETECT_RESOURCE = "Detect Resource";
+    public static final String STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID = 
"Build recommend cuboid from parent cuboid";
 
 }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 588cf1e..8124875 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -26,7 +26,6 @@ import java.util.Locale;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringSplitter;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
index ad8e68b..b6f4bb8 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark;
 
+import org.apache.kylin.engine.spark.job.NSparkOptimizingJob;
 import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -50,7 +51,7 @@ public class SparkBatchCubingEngineParquet implements 
IBatchCubingEngine {
 
     @Override
     public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment 
optimizeSegment, String submitter) {
-        return null;
+        return NSparkOptimizingJob.optimize(optimizeSegment, submitter);
     }
 
     @Override
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/FilterRecommendCuboidJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/FilterRecommendCuboidJob.java
new file mode 100644
index 0000000..b9c46d2
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/FilterRecommendCuboidJob.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+public class FilterRecommendCuboidJob extends SparkApplication {
+    protected static final Logger logger = 
LoggerFactory.getLogger(FilterRecommendCuboidJob.class);
+
+    private long baseCuboid;
+    private Set<Long> recommendCuboids;
+
+    private FileSystem fs = HadoopUtil.getWorkingFileSystem();
+    private Configuration conf = HadoopUtil.getCurrentConfiguration();
+
+    public FilterRecommendCuboidJob() {
+
+    }
+
+    public String getCuboidRootPath(CubeSegment segment) {
+        return 
PathManager.getSegmentParquetStoragePath(segment.getCubeInstance(), 
segment.getName(),
+                segment.getStorageLocationIdentifier());
+    }
+
+    @Override
+    protected void doExecute() throws Exception {
+        infos.clearReusedCuboids();
+        final CubeManager mgr = CubeManager.getInstance(config);
+        final CubeInstance cube = 
mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
+        final CubeSegment optimizeSegment = 
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+        CubeSegment oldSegment = 
optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
+        Preconditions.checkNotNull(oldSegment,
+                "cannot find the original segment to be optimized by " + 
optimizeSegment);
+
+        
infos.recordReusedCuboids(Collections.singleton(cube.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING)));
+
+        baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+        recommendCuboids = cube.getCuboidsRecommend();
+
+        Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map 
could not be null");
+
+        Path originalCuboidPath = new Path(getCuboidRootPath(oldSegment));
+
+        try {
+            for (FileStatus cuboid : fs.listStatus(originalCuboidPath)) {
+                String cuboidId = cuboid.getPath().getName();
+                if (cuboidId.equals(String.valueOf(baseCuboid)) || 
recommendCuboids.contains(Long.valueOf(cuboidId))) {
+                    Path optimizeCuboidPath = new 
Path(getCuboidRootPath(optimizeSegment) + "/" + cuboidId);
+                    FileUtil.copy(fs, cuboid.getPath(), fs, 
optimizeCuboidPath, false, true, conf);
+                    logger.info("Copy cuboid {} storage from original segment 
to optimized segment", cuboidId);
+                }
+            }
+        } catch (IOException e) {
+            logger.error("Failed to filter cuboid", e);
+            throw e;
+        }
+    }
+
+    public static void main(String[] args) {
+        FilterRecommendCuboidJob filterRecommendCuboidJob = new 
FilterRecommendCuboidJob();
+        filterRecommendCuboidJob.execute(args);
+    }
+
+    @Override
+    protected String generateInfo() {
+        return LogJobInfoUtils.filterRecommendCuboidJobInfo();
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
index 4e0be04..ead4223 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.job;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
 public class JobStepFactory {
@@ -41,13 +42,20 @@ public class JobStepFactory {
         case MERGING:
             step = new NSparkMergingStep(config.getSparkMergeClassName());
             break;
+        case OPTIMIZING:
+            step = new NSparkOptimizingStep(OptimizeBuildJob.class.getName());
+            break;
         case CLEAN_UP_AFTER_MERGE:
             step = new NSparkUpdateMetaAndCleanupAfterMergeStep();
             break;
+        case FILTER_RECOMMEND_CUBOID:
+            step = new NSparkLocalStep();
+            
step.setSparkSubmitClassName(FilterRecommendCuboidJob.class.getName());
+            
step.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION);
+            break;
         default:
             throw new IllegalArgumentException();
         }
-
         step.setParams(parent.getParams());
         step.setProject(parent.getProject());
         step.setTargetSubject(parent.getTargetSubject());
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
index a81312a..3b4142d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
@@ -21,5 +21,7 @@ package org.apache.kylin.engine.spark.job;
 public enum JobStepType {
     RESOURCE_DETECT,
 
-    CLEAN_UP_AFTER_MERGE, CUBING, MERGING
+    CLEAN_UP_AFTER_MERGE, CUBING, MERGING, OPTIMIZING,
+
+    FILTER_RECOMMEND_CUBOID
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
index 2aed71e..b366437 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
@@ -18,19 +18,10 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-public class NResourceDetectStep extends NSparkExecutable {
-
-    private final static String[] excludedSparkConf = new String[] 
{"spark.executor.cores",
-            "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions",
-            "spark.executor.instances", "spark.executor.memory", 
"spark.executor.extraClassPath"};
+public class NResourceDetectStep extends NSparkLocalStep {
 
     // called by reflection
     public NResourceDetectStep() {
@@ -44,33 +35,12 @@ public class NResourceDetectStep extends NSparkExecutable {
             
this.setSparkSubmitClassName(ResourceDetectBeforeMergingJob.class.getName());
         /*} else if (parent instanceof NTableSamplingJob) {
             
this.setSparkSubmitClassName(ResourceDetectBeforeSampling.class.getName());
-        */} else {
+        */
+        } else if (parent instanceof NSparkOptimizingJob) {
+            
this.setSparkSubmitClassName(ResourceDetectBeforeOptimizingJob.class.getName());
+        } else {
             throw new IllegalArgumentException("Unsupported resource detect 
for " + parent.getName() + " job");
         }
         this.setName(ExecutableConstants.STEP_NAME_DETECT_RESOURCE);
     }
-
-    @Override
-    protected Set<String> getMetadataDumpList(KylinConfig config) {
-        AbstractExecutable parent = getParentExecutable();
-        if (parent instanceof DefaultChainedExecutable) {
-            return ((DefaultChainedExecutable) 
parent).getMetadataDumpList(config);
-        }
-        throw new IllegalStateException("Unsupported resource detect for non 
chained executable!");
-    }
-
-    @Override
-    protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
-        Map<String, String> sparkConfigOverride = 
super.getSparkConfigOverride(config);
-        //run resource detect job on local not cluster
-        sparkConfigOverride.put("spark.master", "local");
-        sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1");
-        sparkConfigOverride.put("spark.sql.adaptive.enabled", "false");
-        for (String sparkConf : excludedSparkConf) {
-            if (sparkConfigOverride.containsKey(sparkConf)) {
-                sparkConfigOverride.remove(sparkConf);
-            }
-        }
-        return sparkConfigOverride;
-    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkBatchOptimizeJobCheckpointBuilder.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkBatchOptimizeJobCheckpointBuilder.java
new file mode 100644
index 0000000..8922d2f
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkBatchOptimizeJobCheckpointBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+
+public class NSparkBatchOptimizeJobCheckpointBuilder {
+    protected SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd 
HH:mm:ss", Locale.ROOT);
+
+    final protected CubeInstance cube;
+    final protected String submitter;
+
+    public NSparkBatchOptimizeJobCheckpointBuilder(CubeInstance cube, String 
submitter) {
+        this.cube = cube;
+        this.submitter = submitter;
+
+        Preconditions.checkNotNull(cube.getFirstSegment(), "Cube " + cube + " 
is empty!!!");
+    }
+
+    public CheckpointExecutable build() {
+        KylinConfig kylinConfig = cube.getConfig();
+        List<ProjectInstance> projList = 
ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
+                cube.getName());
+        if (projList == null || projList.size() == 0) {
+            throw new RuntimeException("Cannot find the project containing the 
cube " + cube.getName() + "!!!");
+        } else if (projList.size() >= 2) {
+            throw new RuntimeException("Find more than one project containing 
the cube " + cube.getName()
+                    + ". It does't meet the uniqueness requirement!!! ");
+        }
+
+        CheckpointExecutable checkpointJob = new CheckpointExecutable();
+        checkpointJob.setSubmitter(submitter);
+        CubingExecutableUtil.setCubeName(cube.getName(), 
checkpointJob.getParams());
+        checkpointJob.setName(
+                cube.getName() + " - OPTIMIZE CHECKPOINT - " + 
format.format(new Date(System.currentTimeMillis())));
+        checkpointJob.setProjectName(projList.get(0).getName());
+
+        // Phase 1: Update cube information
+        checkpointJob.addTask(createUpdateCubeInfoAfterCheckpointStep());
+
+        // Phase 2: Cleanup hdfs storage
+        checkpointJob.addTask(createCleanupHdfsStorageStep());
+
+        return checkpointJob;
+    }
+
+    private NSparkUpdateCubeInfoAfterOptimizeStep 
createUpdateCubeInfoAfterCheckpointStep() {
+        NSparkUpdateCubeInfoAfterOptimizeStep result = new 
NSparkUpdateCubeInfoAfterOptimizeStep();
+        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+        CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+        return result;
+    }
+
+    private NSparkCleanupHdfsStorageStep createCleanupHdfsStorageStep() {
+        NSparkCleanupHdfsStorageStep result = new 
NSparkCleanupHdfsStorageStep();
+        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+        CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+        return result;
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCleanupHdfsStorageStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCleanupHdfsStorageStep.java
new file mode 100644
index 0000000..950d236
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCleanupHdfsStorageStep.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class NSparkCleanupHdfsStorageStep extends NSparkExecutable {
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkCleanupHdfsStorageStep.class);
+    private FileSystem fs = HadoopUtil.getWorkingFileSystem();
+
+    public NSparkCleanupHdfsStorageStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+
+        List<String> segments = cube.getSegments().stream().map(segment -> {
+            return segment.getName() + "_" + 
segment.getStorageLocationIdentifier();
+        }).collect(Collectors.toList());
+        String project = cube.getProject();
+
+        //list all segment directory
+        Path cubePath = new 
Path(context.getConfig().getHdfsWorkingDirectory(project) + "/parquet/" + 
cube.getName());
+        try {
+            if (fs.exists(cubePath)) {
+                FileStatus[] segmentStatus = fs.listStatus(cubePath);
+                if (segmentStatus != null) {
+                    for (FileStatus status : segmentStatus) {
+                        String segment = status.getPath().getName();
+                        if (!segments.contains(segment)) {
+                            logger.info("Deleting old segment storage {}", 
status.getPath());
+                            fs.delete(status.getPath(), true);
+                        }
+                    }
+                }
+            } else {
+                logger.warn("Cube path doesn't exist! The path is " + 
cubePath);
+            }
+            return new ExecuteResult();
+        } catch (IOException e) {
+            logger.error("Failed to clean old segment storage", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+    @Override
+    protected Set<String> getMetadataDumpList(KylinConfig config) {
+        AbstractExecutable parent = getParentExecutable();
+        return ((DefaultChainedExecutable) parent).getMetadataDumpList(config);
+    }
+
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
index 6a1f9f1..146a1f2 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.spark.job;
 
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.spark.sql.Column;
 import org.spark_project.guava.collect.Sets;
@@ -79,10 +80,8 @@ public class NSparkCubingUtil {
         return withoutDot;
     }
 
-    public static String getStoragePath(CubeSegment nDataSegment, Long 
layoutId) {
-        String hdfsWorkingDir = 
nDataSegment.getConfig().getReadHdfsWorkingDirectory();
-        return hdfsWorkingDir + 
getStoragePathWithoutPrefix(nDataSegment.getProject(),
-                nDataSegment.getCubeInstance().getId(), 
nDataSegment.getUuid(), layoutId);
+    public static String getStoragePath(CubeSegment segment, Long layoutId) {
+        return PathManager.getParquetStoragePath(segment.getCubeInstance(), 
segment.getName(), segment.getStorageLocationIdentifier(), layoutId);
     }
 
     static Set<String> toSegmentNames(Segments<CubeSegment> segments) {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java
similarity index 68%
copy from 
kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
copy to 
kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java
index 2aed71e..1bd63ba 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NResourceDetectStep.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java
@@ -18,54 +18,31 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 
-public class NResourceDetectStep extends NSparkExecutable {
+import java.util.Map;
+import java.util.Set;
 
+public class NSparkLocalStep extends NSparkExecutable {
     private final static String[] excludedSparkConf = new String[] 
{"spark.executor.cores",
             "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions",
             "spark.executor.instances", "spark.executor.memory", 
"spark.executor.extraClassPath"};
 
-    // called by reflection
-    public NResourceDetectStep() {
-
-    }
-
-    public NResourceDetectStep(DefaultChainedExecutable parent) {
-        if (parent instanceof NSparkCubingJob) {
-            
this.setSparkSubmitClassName(ResourceDetectBeforeCubingJob.class.getName());
-        } else if (parent instanceof NSparkMergingJob) {
-            
this.setSparkSubmitClassName(ResourceDetectBeforeMergingJob.class.getName());
-        /*} else if (parent instanceof NTableSamplingJob) {
-            
this.setSparkSubmitClassName(ResourceDetectBeforeSampling.class.getName());
-        */} else {
-            throw new IllegalArgumentException("Unsupported resource detect 
for " + parent.getName() + " job");
-        }
-        this.setName(ExecutableConstants.STEP_NAME_DETECT_RESOURCE);
-    }
-
     @Override
     protected Set<String> getMetadataDumpList(KylinConfig config) {
         AbstractExecutable parent = getParentExecutable();
         if (parent instanceof DefaultChainedExecutable) {
             return ((DefaultChainedExecutable) 
parent).getMetadataDumpList(config);
         }
-        throw new IllegalStateException("Unsupported resource detect for non 
chained executable!");
+        throw new IllegalStateException("Unsupported " + this.getName() + " 
for non chained executable!");
     }
 
     @Override
     protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
         Map<String, String> sparkConfigOverride = 
super.getSparkConfigOverride(config);
-        //run resource detect job on local not cluster
-        sparkConfigOverride.put("spark.master", "local");
-        sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1");
-        sparkConfigOverride.put("spark.sql.adaptive.enabled", "false");
+        overrideSparkConf(sparkConfigOverride);
         for (String sparkConf : excludedSparkConf) {
             if (sparkConfigOverride.containsKey(sparkConf)) {
                 sparkConfigOverride.remove(sparkConf);
@@ -73,4 +50,11 @@ public class NResourceDetectStep extends NSparkExecutable {
         }
         return sparkConfigOverride;
     }
+
+    protected void overrideSparkConf(Map<String, String> sparkConfigOverride) {
+        //run job on local not cluster
+        sparkConfigOverride.put("spark.master", "local");
+        sparkConfigOverride.put("spark.sql.autoBroadcastJoinThreshold", "-1");
+        sparkConfigOverride.put("spark.sql.adaptive.enabled", "false");
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java
new file mode 100644
index 0000000..f09ea04
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingJob.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.Set;
+
+public class NSparkOptimizingJob extends CubingJob {
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkOptimizingJob.class);
+    private static final String DEPLOY_ENV_NAME = "envName";
+
+    public static NSparkOptimizingJob optimize(CubeSegment optimizedSegment, 
String submitter) {
+        return NSparkOptimizingJob.optimize(optimizedSegment, submitter, 
CubingJobTypeEnum.OPTIMIZE, UUID.randomUUID().toString());
+    }
+
+    public static NSparkOptimizingJob optimize(CubeSegment optimizedSegment, 
String submitter, CubingJobTypeEnum jobType, String jobId) {
+        logger.info("SPARK_V2 new job to OPTIMIZE a segment " + 
optimizedSegment);
+        CubeSegment oldSegment = 
optimizedSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizedSegment);
+        Preconditions.checkNotNull(oldSegment, "cannot find the original 
segment to be optimized by " + optimizedSegment);
+        CubeInstance cube = optimizedSegment.getCubeInstance();
+
+        NSparkOptimizingJob job = new NSparkOptimizingJob();
+        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd 
HH:mm:ss", Locale.ROOT);
+        
format.setTimeZone(TimeZone.getTimeZone(cube.getConfig().getTimeZone()));
+
+        StringBuilder builder = new StringBuilder();
+        builder.append(jobType).append(" CUBE - ");
+        
builder.append(optimizedSegment.getCubeInstance().getDisplayName()).append(" - 
").append(optimizedSegment.getName())
+                .append(" - ");
+
+        builder.append(format.format(new Date(System.currentTimeMillis())));
+        job.setName(builder.toString());
+        job.setId(jobId);
+        job.setTargetSubject(optimizedSegment.getModel().getUuid());
+        
job.setTargetSegments(Lists.newArrayList(String.valueOf(optimizedSegment.getUuid())));
+        job.setProject(optimizedSegment.getProject());
+        job.setSubmitter(submitter);
+
+        job.setParam(MetadataConstants.P_JOB_ID, jobId);
+        job.setParam(MetadataConstants.P_PROJECT_NAME, cube.getProject());
+        job.setParam(MetadataConstants.P_TARGET_MODEL, job.getTargetSubject());
+        job.setParam(MetadataConstants.P_CUBE_ID, cube.getId());
+        job.setParam(MetadataConstants.P_CUBE_NAME, cube.getName());
+        job.setParam(MetadataConstants.P_SEGMENT_IDS, String.join(",", 
job.getTargetSegments()));
+        job.setParam(CubingExecutableUtil.SEGMENT_ID, 
optimizedSegment.getUuid());
+        job.setParam(MetadataConstants.SEGMENT_NAME, 
optimizedSegment.getName());
+        job.setParam(MetadataConstants.P_DATA_RANGE_START, 
optimizedSegment.getSegRange().start.toString());
+        job.setParam(MetadataConstants.P_DATA_RANGE_END, 
optimizedSegment.getSegRange().end.toString());
+        job.setParam(MetadataConstants.P_OUTPUT_META_URL, 
cube.getConfig().getMetadataUrl().toString());
+        job.setParam(MetadataConstants.P_JOB_TYPE, String.valueOf(jobType));
+        job.setParam(MetadataConstants.P_CUBOID_NUMBER, 
String.valueOf(cube.getDescriptor().getAllCuboids().size()));
+
+        // Phase 1: Prepare base cuboid data from old segment
+        JobStepFactory.addStep(job, JobStepType.FILTER_RECOMMEND_CUBOID, cube);
+
+        // Phase 2: Resource detect
+        JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, cube);
+
+        // Phase 3: Calculate cuboid statistics for optimized segment, Build 
Cube for Missing Cuboid Data, Update metadata
+        JobStepFactory.addStep(job, JobStepType.OPTIMIZING, cube);
+
+        return job;
+    }
+
+    @Override
+    public Set<String> getMetadataDumpList(KylinConfig config) {
+        String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+        CubeInstance cubeInstance = 
CubeManager.getInstance(config).getCubeByUuid(cubeId);
+        return MetaDumpUtil.collectCubeMetadata(cubeInstance);
+    }
+
+    public String getDeployEnvName() {
+        return getParam(DEPLOY_ENV_NAME);
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingStep.java
new file mode 100644
index 0000000..afb2a85
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkOptimizingStep.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
+import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+
+public class NSparkOptimizingStep extends NSparkExecutable {
+    private static final Logger logger = 
LoggerFactory.getLogger(NSparkOptimizingStep.class);
+
+    // called by reflection
+    public NSparkOptimizingStep() {
+    }
+
+    public NSparkOptimizingStep(String sparkSubmitClassName) {
+        this.setSparkSubmitClassName(sparkSubmitClassName);
+        
this.setName(ExecutableConstants.STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID);
+    }
+
+    @Override
+    protected Set<String> getMetadataDumpList(KylinConfig config) {
+        String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+        CubeInstance cubeInstance = 
CubeManager.getInstance(config).getCubeByUuid(cubeId);
+        return MetaDumpUtil.collectCubeMetadata(cubeInstance);
+    }
+
+    public static class Mockup {
+        public static void main(String[] args) {
+            logger.info(NSparkCubingStep.Mockup.class + ".main() invoked, 
args: " + Arrays.toString(args));
+        }
+    }
+
+    @Override
+    public boolean needMergeMetadata() {
+        return true;
+    }
+
+    @Override
+    protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cube = cubeManager.getCube(getCubeName());
+        Map<String, String> joblogInfo = Maps.newHashMap();
+        joblogInfo.put(CubingJob.SOURCE_SIZE_BYTES, 
String.valueOf(cube.getInputRecordSizeBytes()));
+        joblogInfo.put(CubingJob.CUBE_SIZE_BYTES, 
String.valueOf(cube.getSizeKB()));
+        return joblogInfo;
+    }
+
+    @Override
+    public void cleanup(ExecuteResult result) throws ExecuteException {
+        // delete job tmp dir
+        if (result != null && result.state().ordinal() == 
ExecuteResult.State.SUCCEED.ordinal()) {
+            PathManager.deleteJobTempPath(getConfig(), 
getParam(MetadataConstants.P_PROJECT_NAME),
+                    getParam(MetadataConstants.P_JOB_ID));
+        }
+    }
+
+    @Override
+    protected void updateMetaAfterOperation(KylinConfig config) throws 
IOException {
+        UpdateMetadataUtil.syncLocalMetadataToRemote(config, this);
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateCubeInfoAfterOptimizeStep.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateCubeInfoAfterOptimizeStep.java
new file mode 100644
index 0000000..6572b0f
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateCubeInfoAfterOptimizeStep.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterCheckpointStep;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class NSparkUpdateCubeInfoAfterOptimizeStep extends NSparkExecutable {
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateCubeInfoAfterCheckpointStep.class);
+
+    public NSparkUpdateCubeInfoAfterOptimizeStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        final CubeManager cubeManager = 
CubeManager.getInstance(context.getConfig());
+        final CubeInstance cube = 
cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+
+        Set<Long> recommendCuboids = cube.getCuboidsRecommend();
+        try {
+            List<CubeSegment> newSegments = 
cube.getSegments(SegmentStatusEnum.READY_PENDING);
+            Map<Long, Long> recommendCuboidsWithStats = CuboidStatsReaderUtil
+                    .readCuboidStatsFromSegments(recommendCuboids, 
newSegments);
+            if (recommendCuboidsWithStats == null) {
+                throw new RuntimeException("Fail to get statistics info for 
recommended cuboids after optimization!!!");
+            }
+            cubeManager.promoteCheckpointOptimizeSegments(cube, 
recommendCuboidsWithStats,
+                    newSegments.toArray(new CubeSegment[newSegments.size()]));
+            return new ExecuteResult();
+        } catch (Exception e) {
+            logger.error("fail to update cube after build", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
similarity index 53%
copy from 
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
copy to 
kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
index efce341..a4e2d5d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
@@ -18,40 +18,20 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsWriter;
-import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.shaded.com.google.common.base.Joiner;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.NSparkCubingEngine;
 import org.apache.kylin.engine.spark.application.SparkApplication;
 import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
@@ -61,243 +41,183 @@ import 
org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
 import org.apache.kylin.engine.spark.utils.JobMetrics;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.Metrics;
 import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
-import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
 import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.storage.StorageFactory;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 
-public class CubeBuildJob extends SparkApplication {
-    protected static final Logger logger = 
LoggerFactory.getLogger(CubeBuildJob.class);
+import java.io.IOException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.UUID;
+
+import java.util.stream.Collectors;
+
+public class OptimizeBuildJob extends SparkApplication {
+    private static final Logger logger = 
LoggerFactory.getLogger(OptimizeBuildJob.class);
+
+    private Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
     protected static String TEMP_DIR_SUFFIX = "_temp";
 
-    private CubeManager cubeManager;
-    private CubeInstance cubeInstance;
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
     private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
     private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
-    private Map<Long, Long> recommendCuboidMap = new HashMap<>();
+
+    private Configuration conf = HadoopUtil.getCurrentConfiguration();
+    private CubeManager cubeManager;
+    private CubeInstance cubeInstance;
+    private SegmentInfo optSegInfo;
+    private SegmentInfo originalSegInfo;
+    private CubeSegment optSeg;
+    private CubeSegment originalSeg;
+    private long baseCuboidId;
 
     public static void main(String[] args) {
-        CubeBuildJob cubeBuildJob = new CubeBuildJob();
-        cubeBuildJob.execute(args);
+        OptimizeBuildJob optimizeBuildJob = new OptimizeBuildJob();
+        optimizeBuildJob.execute(args);
     }
 
     @Override
     protected void doExecute() throws Exception {
+        String segmentId = getParam(CubingExecutableUtil.SEGMENT_ID);
+        String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+
+        cubeManager = CubeManager.getInstance(config);
+        cubeInstance = cubeManager.getCubeByUuid(cubeId);
+        optSeg = cubeInstance.getSegmentById(segmentId);
+        originalSeg = cubeInstance.getOriginalSegmentToOptimize(optSeg);
+        originalSegInfo = ManagerHub.getSegmentInfo(config, cubeId, 
originalSeg.getUuid());
 
+        calculateCuboidFromBaseCuboid();
+        buildCuboidFromParent(cubeId);
+    }
+
+    private void calculateCuboidFromBaseCuboid() throws IOException {
+        logger.info("Start to calculate cuboid statistics for optimized 
segment");
         long start = System.currentTimeMillis();
-        logger.info("Start building cube job for {} ...", 
getParam(MetadataConstants.P_SEGMENT_IDS));
-        Set<String> segmentIds = 
Sets.newHashSet(StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS)));
 
-        // For now, Kylin should only build one segment in one time, cube 
planner has this restriction (maybe we can remove this limitation later)
-        Preconditions.checkArgument(segmentIds.size() == 1, "Build one segment 
in one time.");
+        baseCuboidId = cubeInstance.getCuboidScheduler().getBaseCuboidId();
+        LayoutEntity baseCuboid = originalSegInfo.getAllLayoutJava().stream()
+                .filter(layoutEntity -> layoutEntity.getId() == 
baseCuboidId).iterator().next();
+        Dataset<Row> baseCuboidDS = StorageFactory
+                .createEngineAdapter(baseCuboid, 
NSparkCubingEngine.NSparkCubingStorage.class)
+                .getFrom(PathManager.getParquetStoragePath(config, 
cubeInstance.getName(), optSeg.getName(),
+                        optSeg.getStorageLocationIdentifier(), 
String.valueOf(baseCuboid.getId())), ss);
 
-        String firstSegmentId = segmentIds.iterator().next();
-        String cubeName = getParam(MetadataConstants.P_CUBE_ID);
-        SegmentInfo seg = ManagerHub.getSegmentInfo(config, cubeName, 
firstSegmentId);
-        cubeManager = CubeManager.getInstance(config);
-        cubeInstance = cubeManager.getCubeByUuid(cubeName);
-        CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId);
-        SpanningTree spanningTree;
-        ParentSourceChooser sourceChooser;
+        Map<Long, HLLCounter> hllMap = new HashMap<>();
 
-        // Cuboid Statistics is served for Cube Planner Phase One at the moment
-        boolean needStatistics = 
StatisticsDecisionUtil.isAbleToOptimizeCubingPlan(newSegment)
-                || config.isSegmentStatisticsEnabled();
-
-        if (needStatistics) {
-            // 1.1 Call CuboidStatistics#statistics
-            long startMills = System.currentTimeMillis();
-            spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts()));
-            sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, 
ss, config, false);
-            sourceChooser.setNeedStatistics();
-            sourceChooser.decideFlatTableSource(null);
-            Map<Long, HLLCounter> hllMap = new HashMap<>();
-            for (Tuple2<Object, AggInfo> cuboidData : sourceChooser.aggInfo()) 
{
-                hllMap.put((Long) cuboidData._1, 
cuboidData._2.cuboid().counter());
-            }
-            logger.info("Cuboid statistics return {} records and cost {} ms.", 
hllMap.size(), (System.currentTimeMillis() - startMills));
-
-            // 1.2 Save cuboid statistics
-            String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId;
-            Path statisticsDir = new Path(jobTmpDir + "/" + 
ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + 
"/");
-            Optional<HLLCounter> hll = 
hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate));
-            long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L);
-            
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), 
statisticsDir, hllMap, 1, rc);
-
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            ResourceStore rs = ResourceStore.getStore(config);
-            String metaKey = newSegment.getStatisticsResourcePath();
-            Path statisticsFile = new Path(statisticsDir, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
-            FSDataInputStream is = fs.open(statisticsFile);
-            rs.putResource(metaKey, is, System.currentTimeMillis()); // write 
to Job-Local metastore
-            logger.info("{}'s stats saved to resource key({}) with path({})", 
newSegment, metaKey, statisticsFile);
-
-            // 1.3 Trigger cube planner phase one and save optimized cuboid 
set into CubeInstance
-            recommendCuboidMap = 
StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
-            if (!recommendCuboidMap.isEmpty())
-                logger.info("Triggered cube planner phase one .");
+        for (Tuple2<Object, AggInfo> cuboidData : 
CuboidStatisticsJob.statistics(baseCuboidDS,
+                originalSegInfo, getNewCuboidIds())) {
+            hllMap.put((Long) cuboidData._1, cuboidData._2.cuboid().counter());
         }
 
-        buildLayoutWithUpdate = new BuildLayoutWithUpdate(config);
-        List<String> persistedFlatTable = new ArrayList<>();
-        List<String> persistedViewFactTable = new ArrayList<>();
-        Path shareDir = config.getJobTmpShareDir(project, jobId);
-        try {
-            //TODO: what if a segment is deleted during building?
-            for (String segId : segmentIds) {
-                seg = ManagerHub.getSegmentInfo(config, cubeName, segId);
-                spanningTree = new ForestSpanningTree(
-                        
JavaConversions.asJavaCollection(seg.toBuildLayouts()));
-                logger.info("There are {} cuboids to be built in segment {}.",
-                        seg.toBuildLayouts().size(), seg.name());
-                for (LayoutEntity cuboid : 
JavaConversions.asJavaCollection(seg.toBuildLayouts())) {
-                    logger.debug("Cuboid {} has row keys: {}", cuboid.getId(),
-                            Joiner.on(", 
").join(cuboid.getOrderedDimensions().keySet()));
-                }
+        String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId;
+        Path statisticsDir = new Path(jobTmpDir + "/" + 
ResourceStore.CUBE_STATISTICS_ROOT + "/"
+                + cubeInstance.getUuid() + "/" + optSeg.getUuid() + "/");
 
-                // choose source
-                sourceChooser = new ParentSourceChooser(spanningTree, seg, 
jobId, ss, config, true);
-                sourceChooser.decideSources();
-                NBuildSourceInfo buildFromFlatTable = 
sourceChooser.flatTableSource();
-                Map<Long, NBuildSourceInfo> buildFromLayouts = 
sourceChooser.reuseSources();
+        CubeStatsWriter.writeCuboidStatistics(conf, statisticsDir, hllMap, 1, 
-1);
 
-                infos.clearCuboidsNumPerLayer(segId);
+        logger.info("Calculate cuboid statistics from base cuboid job takes {} 
ms",
+                (System.currentTimeMillis() - start));
+    }
 
-                // build cuboids from flat table
-                if (buildFromFlatTable != null) {
-                    collectPersistedTablePath(persistedFlatTable, 
sourceChooser);
-                    build(Collections.singletonList(buildFromFlatTable), seg, 
spanningTree);
-                }
+    private void buildCuboidFromParent(String cubeId) throws IOException {
+        logger.info("Start to build recommend cuboid for optimized segment");
+        long start = System.currentTimeMillis();
+        optSegInfo = ManagerHub.getSegmentInfo(config, cubeId, 
optSeg.getUuid(), CuboidModeEnum.RECOMMEND);
+        buildLayoutWithUpdate = new BuildLayoutWithUpdate(config);
 
-                // build cuboids from reused layouts
-                if (!buildFromLayouts.isEmpty()) {
-                    build(buildFromLayouts.values(), seg, spanningTree);
-                }
-                infos.recordSpanningTree(segId, spanningTree);
+        infos.clearAddCuboids();
 
-                logger.info("Updating segment info");
-                assert buildFromFlatTable != null;
-                updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, 
buildFromFlatTable.getFlatTableDS().count());
-            }
-            updateCubeAndSegmentMeta(getParam(MetadataConstants.P_CUBE_ID),
-                    ResourceDetectUtils.getSegmentSourceSize(shareDir), 
recommendCuboidMap);
-        } finally {
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            for (String viewPath : persistedViewFactTable) {
-                fs.delete(new Path(viewPath), true);
-                logger.info("Delete persisted view fact table: {}.", viewPath);
-            }
-            for (String path : persistedFlatTable) {
-                fs.delete(new Path(path), true);
-                logger.info("Delete persisted flat table: {}.", path);
+        SpanningTree spanningTree;
+        ParentSourceChooser sourceChooser;
+        try {
+            spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts()));
+            logger.info("There are {} cuboids to be built in segment {}.", 
optSegInfo.toBuildLayouts().size(),
+                    optSegInfo.name());
+            for (LayoutEntity cuboid : 
JavaConversions.asJavaCollection(optSegInfo.toBuildLayouts())) {
+                logger.debug("Cuboid {} has row keys: {}", cuboid.getId(),
+                        Joiner.on(", 
").join(cuboid.getOrderedDimensions().keySet()));
             }
-            logger.info("Building job takes {} ms", 
(System.currentTimeMillis() - start));
-        }
-    }
 
-    private void updateSegmentInfo(String cubeId, SegmentInfo segmentInfo, 
long sourceRowCount) throws IOException {
-        CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId);
-        CubeInstance cubeCopy = cubeInstance.latestCopyForWrite();
-        CubeUpdate update = new CubeUpdate(cubeCopy);
+            // choose source
+            optSegInfo.removeLayout(baseCuboidId);
+            sourceChooser = new ParentSourceChooser(spanningTree, optSegInfo, 
optSeg, jobId, ss, config, false);
+            sourceChooser.decideSources();
+            Map<Long, NBuildSourceInfo> buildFromLayouts = 
sourceChooser.reuseSources();
 
-        List<CubeSegment> cubeSegments = Lists.newArrayList();
-        CubeSegment segment = cubeCopy.getSegmentById(segmentInfo.id());
-        segment.setSizeKB(segmentInfo.getAllLayoutSize() / 1024);
-        List<String> cuboidStatics = new LinkedList<>();
-
-        String template = "{\"cuboid\":%d, \"rows\": %d, \"size\": %d 
\"deviation\": %7f}";
-        for (LayoutEntity layoutEntity : segmentInfo.getAllLayoutJava()) {
-            double deviation = 0.0d;
-            if (layoutEntity.getRows() > 0 && recommendCuboidMap != null && 
!recommendCuboidMap.isEmpty()) {
-                long diff = (layoutEntity.getRows() - 
recommendCuboidMap.get(layoutEntity.getId()));
-                deviation = diff / (layoutEntity.getRows() + 0.0d);
-            }
-            cuboidStatics.add(String.format(Locale.getDefault(), template, 
layoutEntity.getId(),
-                    layoutEntity.getRows(), layoutEntity.getByteSize(), 
deviation));
-        }
+            infos.clearCuboidsNumPerLayer(optSegInfo.id());
 
-        try {
-            FileSystem fs = HadoopUtil.getWorkingFileSystem();
-            JavaSparkContext jsc = 
JavaSparkContext.fromSparkContext(ss.sparkContext());
-            JavaRDD<String> cuboidStatRdd = jsc.parallelize(cuboidStatics, 1);
-            for (String cuboid : cuboidStatics) {
-                logger.info("Statistics \t: {}", cuboid);
-            }
-            String pathDir = config.getHdfsWorkingDirectory() + 
segment.getPreciseStatisticsResourcePath();
-            logger.info("Saving {} {} .", pathDir, segmentInfo);
-            Path path = new Path(pathDir);
-            if (fs.exists(path)) {
-                fs.delete(path, true);
+            // build cuboids from reused layouts
+            if (!buildFromLayouts.isEmpty()) {
+                build(buildFromLayouts.values(), optSegInfo, spanningTree);
             }
-            cuboidStatRdd.saveAsTextFile(pathDir);
-        } catch (Exception e) {
-            logger.error("Write metrics failed.", e);
-        }
+            infos.recordSpanningTree(optSegInfo.id(), spanningTree);
 
-        segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID));
-        segment.setInputRecords(sourceRowCount);
-        segment.setSnapshots(new 
ConcurrentHashMap<>(segmentInfo.getSnapShot2JavaMap()));
-        segment.setCuboidShardNums(cuboidShardNum);
-        Map<String, String> additionalInfo = segment.getAdditionalInfo();
-        additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
-        segment.setAdditionalInfo(additionalInfo);
-        cubeSegments.add(segment);
-        update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
-        cubeManager.updateCube(update);
+            logger.info("Updating segment info");
+            updateOptimizeSegmentInfo();
+        } finally {
+            logger.info("Building job takes {} ms", 
(System.currentTimeMillis() - start));
+        }
     }
 
-    private void collectPersistedTablePath(List<String> persistedFlatTable, 
ParentSourceChooser sourceChooser) {
-        String flatTablePath = sourceChooser.persistFlatTableIfNecessary();
-        if (!flatTablePath.isEmpty()) {
-            persistedFlatTable.add(flatTablePath);
+    private long[] getNewCuboidIds() {
+        Set<Long> recommendCuboidsSet = 
cubeInstance.getCuboidsByMode(CuboidModeEnum.RECOMMEND_MISSING);
+        Preconditions.checkNotNull(recommendCuboidsSet, "The recommend cuboid 
map could not be null");
+        long[] recommendCuboid = new long[recommendCuboidsSet.size()];
+        int i = 0;
+        for (long cuboidId : recommendCuboidsSet) {
+            recommendCuboid[i++] = cuboidId;
         }
+        return recommendCuboid;
     }
 
-    private void updateCubeAndSegmentMeta(String cubeId, Map<String, Object> 
toUpdateSegmentSourceSize,
-                                          Map<Long, Long> recommendCuboidMap) 
throws IOException {
-        CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId);
-        CubeInstance cubeCopy = cubeInstance.latestCopyForWrite();
+    protected void updateOptimizeSegmentInfo() throws IOException {
+        CubeInstance cubeCopy = optSeg.getCubeInstance().latestCopyForWrite();
+        List<CubeSegment> cubeSegments = Lists.newArrayList();
         CubeUpdate update = new CubeUpdate(cubeCopy);
 
-        if (recommendCuboidMap != null && !recommendCuboidMap.isEmpty())
-            update.setCuboids(recommendCuboidMap);
-
-        List<CubeSegment> cubeSegments = Lists.newArrayList();
-        for (Map.Entry<String, Object> entry : 
toUpdateSegmentSourceSize.entrySet()) {
-            CubeSegment segment = cubeCopy.getSegmentById(entry.getKey());
-            if (segment.getInputRecords() > 0L) {
-                segment.setInputRecordsSize((Long) entry.getValue());
-                segment.setLastBuildTime(System.currentTimeMillis());
-                cubeSegments.add(segment);
-            }
-        }
-        if (!cubeSegments.isEmpty()) {
-            update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
-            cubeManager.updateCube(update);
+        optSeg.setSizeKB(optSegInfo.getAllLayoutSize() / 1024);
+        optSeg.setLastBuildTime(System.currentTimeMillis());
+        optSeg.setLastBuildJobID(jobId);
+        optSeg.setInputRecords(originalSeg.getInputRecords());
+        Map<Long, Short> existingShardNums = originalSeg.getCuboidShardNums();
+        for (Long cuboidId : 
cubeCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING)) {
+            cuboidShardNum.putIfAbsent(cuboidId, 
existingShardNums.get(cuboidId));
         }
+        optSeg.setCuboidShardNums(cuboidShardNum);
+        optSeg.setInputRecordsSize(originalSeg.getInputRecordsSize());
+        Map<String, String> additionalInfo = optSeg.getAdditionalInfo();
+        additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
+        optSeg.setAdditionalInfo(additionalInfo);
+        cubeSegments.add(optSeg);
+        update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
+        cubeManager.updateCube(update);
     }
 
-    private void build(Collection<NBuildSourceInfo> buildSourceInfos, 
SegmentInfo seg, SpanningTree st) throws InterruptedException{
+    private void build(Collection<NBuildSourceInfo> buildSourceInfos, 
SegmentInfo seg, SpanningTree st) {
 
         List<NBuildSourceInfo> theFirstLevelBuildInfos = 
buildLayer(buildSourceInfos, seg, st);
         LinkedList<List<NBuildSourceInfo>> queue = new LinkedList<>();
@@ -313,12 +233,11 @@ public class CubeBuildJob extends SparkApplication {
                 queue.offer(theNextLayer);
             }
         }
-
     }
 
     // build current layer and return the next layer to be built.
     private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> 
buildSourceInfos, SegmentInfo seg,
-                                              SpanningTree st) throws 
InterruptedException{
+                                              SpanningTree st) {
         int cuboidsNumInLayer = 0;
 
         // build current layer
@@ -330,11 +249,6 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsNumInLayer += toBuildCuboids.size();
             Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built 
cuboids is empty.");
             Dataset<Row> parentDS = info.getParentDS();
-
-            if (toBuildCuboids.size() > 1) {
-                buildLayoutWithUpdate.cacheAndRegister(info.getLayoutId(), 
parentDS);
-            }
-
             // record the source count of flat table
             if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
                 cuboidsRowCount.putIfAbsent(info.getLayoutId(), 
parentDS.count());
@@ -342,22 +256,31 @@ public class CubeBuildJob extends SparkApplication {
 
             for (LayoutEntity index : toBuildCuboids) {
                 Preconditions.checkNotNull(parentDS, "Parent dataset is null 
when building.");
-                buildLayoutWithUpdate.submit(new 
BuildLayoutWithUpdate.JobEntity() {
-                    @Override
-                    public String getName() {
-                        return "build-cuboid-" + index.getId();
-                    }
-
-                    @Override
-                    public LayoutEntity build() throws IOException {
-                        return buildCuboid(seg, index, parentDS, st, 
info.getLayoutId());
+                if 
(!cubeInstance.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING).contains(index.getId()))
 {
+                    infos.recordAddCuboids(index.getId());
+                    buildLayoutWithUpdate.submit(new 
BuildLayoutWithUpdate.JobEntity() {
+                        @Override
+                        public String getName() {
+                            return "build-cuboid-" + index.getId();
+                        }
+
+                        @Override
+                        public LayoutEntity build() throws IOException {
+                            return buildCuboid(seg, index, parentDS, st, 
info.getLayoutId());
+                        }
+
+                        @Override
+                        public NBuildSourceInfo getBuildSourceInfo() {
+                            return info;
+                        }
+                    }, config);
+                } else {
+                    try {
+                        updateExistingLayout(index, info.getLayoutId());
+                    } catch (IOException e) {
+                        logger.error("Failed to update existing cuboid info: 
{}", index.getId());
                     }
-
-                    @Override
-                    public NBuildSourceInfo getBuildSourceInfo() {
-                        return info;
-                    }
-                }, config);
+                }
                 allIndexesInCurrentLayer.add(index);
             }
         }
@@ -490,8 +413,30 @@ public class CubeBuildJob extends SparkApplication {
         BuildUtils.fillCuboidInfo(layout, path);
     }
 
+    private void updateExistingLayout(LayoutEntity layout, long parentId) 
throws IOException {
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        long layoutId = layout.getId();
+        String path = PathManager.getParquetStoragePath(config, 
cubeInstance.getName(), optSegInfo.name(), optSegInfo.identifier(),
+                String.valueOf(layoutId));
+        Dataset<Row> dataset = StorageFactory
+                .createEngineAdapter(layout, 
NSparkCubingEngine.NSparkCubingStorage.class)
+                .getFrom(path, ss);
+        logger.debug("Existing cuboid, use count() to collect cuboid rows.");
+        long cuboidRowCnt = dataset.count();
+        ContentSummary cs = HadoopUtil.getContentSummary(fs, new Path(path));
+        layout.setRows(cuboidRowCnt);
+        layout.setFileCount(cs.getFileCount());
+        layout.setByteSize(cs.getLength());
+        // record the row count of cuboid
+        cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
+        layout.setSourceRows(cuboidsRowCount.get(parentId));
+        int shardNum = originalSeg.getCuboidShardNums().get(layoutId);
+        layout.setShardNum(shardNum);
+        optSegInfo.updateLayout(layout);
+    }
+
     @Override
     protected String generateInfo() {
-        return LogJobInfoUtils.dfBuildJobInfo();
+        return LogJobInfoUtils.dfOptimizeJobInfo();
     }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/ResourceDetectBeforeOptimizingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/ResourceDetectBeforeOptimizingJob.java
new file mode 100644
index 0000000..8e77303d
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/ResourceDetectBeforeOptimizingJob.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
+import org.apache.kylin.engine.spark.metadata.SegmentInfo;
+import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
+import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
+import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.SparkUtils;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConversions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ResourceDetectBeforeOptimizingJob extends SparkApplication {
+    protected static final Logger logger = 
LoggerFactory.getLogger(ResourceDetectBeforeOptimizingJob.class);
+    protected volatile SpanningTree spanningTree;
+    protected volatile List<NBuildSourceInfo> sources = new ArrayList<>();
+
+    public static void main(String[] args) {
+        ResourceDetectBeforeOptimizingJob resourceDetectJob = new 
ResourceDetectBeforeOptimizingJob();
+        resourceDetectJob.execute(args);
+    }
+
+    @Override
+    protected void doExecute() throws Exception {
+        logger.info("Start detect resource before optimize.");
+        String segId = getParam(CubingExecutableUtil.SEGMENT_ID);
+        String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId);
+        SegmentInfo segInfo = ManagerHub.getSegmentInfo(config, cubeId, segId);
+        CubeSegment segment = cubeInstance.getSegmentById(segId);
+        infos.recordOptimizingSegment(segInfo);
+
+        spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(segInfo.toBuildLayouts()));
+        segInfo.removeLayout(segment.getCuboidScheduler().getBaseCuboidId());
+        ResourceDetectUtils.write(new Path(config.getJobTmpShareDir(project, 
jobId), ResourceDetectUtils.countDistinctSuffix()), false);
+        ParentSourceChooser datasetChooser = new 
ParentSourceChooser(spanningTree, segInfo, segment, jobId, ss, config, false);
+
+        datasetChooser.decideSources();
+        NBuildSourceInfo buildFromFlatTable = datasetChooser.flatTableSource();
+        if (buildFromFlatTable != null) {
+            sources.add(buildFromFlatTable);
+        }
+        Map<Long, NBuildSourceInfo> buildFromLayouts = 
datasetChooser.reuseSources();
+        sources.addAll(buildFromLayouts.values());
+
+        Map<String, List<String>> resourcePaths = Maps.newHashMap();
+        Map<String, Integer> layoutLeafTaskNums = Maps.newHashMap();
+        infos.clearSparkPlans();
+        for (NBuildSourceInfo source : sources) {
+            Dataset<Row> dataset = source.getParentDS();
+            RDD actionRdd = dataset.queryExecution().toRdd();
+            logger.info("leaf nodes is: {} ", SparkUtils.leafNodes(actionRdd));
+            infos.recordSparkPlan(dataset.queryExecution().sparkPlan());
+            List<Path> paths = JavaConversions
+                    
.seqAsJavaList(ResourceDetectUtils.getPaths(dataset.queryExecution().sparkPlan()));
+            List<String> pathList = 
paths.stream().map(Path::toString).collect(Collectors.toList());
+            resourcePaths.put(String.valueOf(source.getLayoutId()), pathList);
+            layoutLeafTaskNums.put(String.valueOf(source.getLayoutId()), 
SparkUtils.leafNodePartitionNums(actionRdd));
+        }
+        ResourceDetectUtils.write(
+                new Path(config.getJobTmpShareDir(project, jobId), segId + "_" 
+ ResourceDetectUtils.fileName()),
+                resourcePaths);
+        ResourceDetectUtils.write(
+                new Path(config.getJobTmpShareDir(project, jobId), segId + "_" 
+ ResourceDetectUtils.cubingDetectItemFileSuffix()),
+                layoutLeafTaskNums);
+    }
+
+    @Override
+    protected String generateInfo() {
+        return LogJobInfoUtils.resourceDetectBeforeOptimizeJobInfo();
+    }
+}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
index 0987842..a02530e 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -39,13 +39,18 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.job.NSparkExecutable;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,11 +58,11 @@ public class UpdateMetadataUtil {
 
     protected static final Logger logger = 
LoggerFactory.getLogger(UpdateMetadataUtil.class);
 
-    public static void syncLocalMetadataToRemote(KylinConfig config,
-                                                 NSparkExecutable 
nsparkExecutable) throws IOException {
+    public static void syncLocalMetadataToRemote(KylinConfig config, 
NSparkExecutable nsparkExecutable)
+            throws IOException {
         String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID);
-        Set<String> segmentIds = Sets.newHashSet(StringUtils.split(
-                nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " 
"));
+        Set<String> segmentIds = Sets
+                
.newHashSet(StringUtils.split(nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID),
 " "));
         String segmentId = segmentIds.iterator().next();
         String remoteResourceStore = nsparkExecutable.getDistMetaUrl();
         String jobType = 
nsparkExecutable.getParam(MetadataConstants.P_JOB_TYPE);
@@ -77,8 +82,10 @@ public class UpdateMetadataUtil {
                             currentInstanceCopy.toString(), 
toUpdateSeg.toString(), tobeSegments.toString()));
 
         String resKey = toUpdateSeg.getStatisticsResourcePath();
-        String jobTmpDir = 
config.getJobTmpDir(currentInstanceCopy.getProject()) + "/" + 
nsparkExecutable.getParam(MetadataConstants.P_JOB_ID);
-        Path statisticsFile = new Path(jobTmpDir + "/" + 
ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + segmentId + "/" + 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+        String statisticsDir = 
config.getJobTmpDir(currentInstanceCopy.getProject()) + "/"
+                + nsparkExecutable.getParam(MetadataConstants.P_JOB_ID) + "/" 
+ ResourceStore.CUBE_STATISTICS_ROOT + "/"
+                + cubeId + "/" + segmentId + "/";
+        Path statisticsFile = new Path(statisticsDir, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
         if (fs.exists(statisticsFile)) {
             FSDataInputStream is = fs.open(statisticsFile);
@@ -92,10 +99,45 @@ public class UpdateMetadataUtil {
         if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) {
             toUpdateSeg.getSnapshots().clear();
             // update the snapshot table path
-            for (Map.Entry<String, String> entry :
-                    
currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) {
+            for (Map.Entry<String, String> entry : 
currentInstanceCopy.getLatestReadySegment().getSnapshots()
+                    .entrySet()) {
                 toUpdateSeg.putSnapshotResPath(entry.getKey(), 
entry.getValue());
             }
+        } else if (String.valueOf(CubeBuildTypeEnum.OPTIMIZE).equals(jobType)) 
{
+            CubeSegment origSeg = 
currentInstanceCopy.getOriginalSegmentToOptimize(toUpdateSeg);
+            toUpdateSeg.getDictionaries().putAll(origSeg.getDictionaries());
+            toUpdateSeg.getSnapshots().putAll(origSeg.getSnapshots());
+            toUpdateSeg.getRowkeyStats().addAll(origSeg.getRowkeyStats());
+
+            CubeStatsReader optSegStatsReader = new 
CubeStatsReader(toUpdateSeg, config);
+            CubeStatsReader origSegStatsReader = new CubeStatsReader(origSeg, 
config);
+            Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+            if (origSegStatsReader.getCuboidRowHLLCounters() == null) {
+                throw new IllegalArgumentException(
+                        "Cuboid statistics of original segment do not exist. 
Please check the config of kylin.engine.segment-statistics-enabled.");
+            }
+            addFromCubeStatsReader(origSegStatsReader, cuboidHLLMap);
+            addFromCubeStatsReader(optSegStatsReader, cuboidHLLMap);
+
+            Set<Long> recommendCuboids = 
currentInstanceCopy.getCuboidsByMode(CuboidModeEnum.RECOMMEND);
+            Map<Long, HLLCounter> resultCuboidHLLMap = 
Maps.newHashMapWithExpectedSize(recommendCuboids.size());
+            for (long cuboid : recommendCuboids) {
+                HLLCounter hll = cuboidHLLMap.get(cuboid);
+                if (hll == null) {
+                    logger.warn("Cannot get the row count stats for cuboid " + 
cuboid);
+                } else {
+                    resultCuboidHLLMap.put(cuboid, hll);
+                }
+            }
+            if (fs.exists(statisticsFile)) {
+                fs.delete(statisticsFile, false);
+            }
+            
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), new 
Path(statisticsDir),
+                    resultCuboidHLLMap, 1, 
origSegStatsReader.getSourceRowCount());
+            FSDataInputStream is = fs.open(statisticsFile);
+            ResourceStore.getStore(config).putBigResource(resKey, is, 
System.currentTimeMillis());
+
+            toUpdateSeg.setStatus(SegmentStatusEnum.READY_PENDING);
         } else {
             toUpdateSeg.setStatus(SegmentStatusEnum.READY);
             for (CubeSegment segment : currentInstanceCopy.getSegments()) {
@@ -108,16 +150,16 @@ public class UpdateMetadataUtil {
             }
         }
 
-        logger.info("Promoting cube {}, new segment {}, to remove segments 
{}", currentInstanceCopy, toUpdateSeg, toRemoveSegs);
+        logger.info("Promoting cube {}, new segment {}, to remove segments 
{}", currentInstanceCopy, toUpdateSeg,
+                toRemoveSegs);
 
         toUpdateSeg.setLastBuildTime(System.currentTimeMillis());
-        update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
-                .setToUpdateSegs(toUpdateSeg);
+        update.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[0])).setToUpdateSegs(toUpdateSeg);
         cubeManager.updateCube(update);
     }
 
-    public static void updateMetadataAfterMerge(String cubeId, String 
segmentId,
-                                                KylinConfig config) throws 
IOException {
+    public static void updateMetadataAfterMerge(String cubeId, String 
segmentId, KylinConfig config)
+            throws IOException {
         CubeManager cubeManager = CubeManager.getInstance(config);
         CubeInstance currentInstanceCopy = 
cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
 
@@ -142,11 +184,21 @@ public class UpdateMetadataUtil {
             update.setStatus(RealizationStatusEnum.READY);
         }
 
-        logger.info("Promoting cube {}, new segment {}, to remove segments 
{}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
+        logger.info("Promoting cube {}, new segment {}, to remove segments 
{}", currentInstanceCopy, toUpdateSegs,
+                toRemoveSegs);
 
         toUpdateSegs.setLastBuildTime(System.currentTimeMillis());
-        update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
-                .setToUpdateSegs(toUpdateSegs);
+        update.setToRemoveSegs(toRemoveSegs.toArray(new 
CubeSegment[0])).setToUpdateSegs(toUpdateSegs);
         cubeManager.updateCube(update);
     }
+
+    private static void addFromCubeStatsReader(CubeStatsReader 
cubeStatsReader, Map<Long, HLLCounter> cuboidHLLMap) {
+        for (Map.Entry<Long, HLLCounter> entry : 
cubeStatsReader.getCuboidRowHLLCounters().entrySet()) {
+            if (cuboidHLLMap.get(entry.getKey()) != null) {
+                cuboidHLLMap.get(entry.getKey()).merge(entry.getValue());
+            } else {
+                cuboidHLLMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+    }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
index a1592bf..2266087 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildJobInfos.scala
@@ -38,6 +38,13 @@ class BuildJobInfos {
 
   private val mergingSegments: java.util.List[SegmentInfo] = new 
util.LinkedList[SegmentInfo]
 
+  // OPTIMIZE
+  private var optimizingSegment: SegmentInfo = null
+
+  private val addCuboids: java.util.List[Long] = new util.LinkedList[Long]
+
+  private val reusedCuboids: java.util.Set[Long] = new util.HashSet[Long]
+
   // COMMON
   private val abnormalLayouts: util.Map[Long, util.List[String]] = new 
util.HashMap[Long, util.List[String]]
 
@@ -82,6 +89,38 @@ class BuildJobInfos {
     mergingSegments.addAll(segments)
   }
 
+  def recordOptimizingSegment(segment: SegmentInfo): Unit = {
+    optimizingSegment = segment
+  }
+
+  def getOptimizingSegment(): SegmentInfo = {
+    optimizingSegment
+  }
+
+  def recordReusedCuboids(cuboids: util.Set[Long]): Unit = {
+    reusedCuboids.addAll(cuboids)
+  }
+
+  def getReusedCuboid(): util.Set[Long] = {
+    reusedCuboids
+  }
+
+  def clearReusedCuboids(): Unit = {
+    reusedCuboids.clear()
+  }
+
+  def clearAddCuboids(): Unit = {
+    addCuboids.clear()
+  }
+
+  def getAddCuboids: util.List[Long] = {
+    addCuboids
+  }
+
+  def recordAddCuboids(cuboidId: Long): Unit = {
+    addCuboids.add(cuboidId)
+  }
+
   def clearMergingSegments(): Unit = {
     mergingSegments.clear()
   }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index efce341..dbfe810 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -36,7 +36,6 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsWriter;
 import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
@@ -111,10 +110,10 @@ public class CubeBuildJob extends SparkApplication {
         Preconditions.checkArgument(segmentIds.size() == 1, "Build one segment 
in one time.");
 
         String firstSegmentId = segmentIds.iterator().next();
-        String cubeName = getParam(MetadataConstants.P_CUBE_ID);
-        SegmentInfo seg = ManagerHub.getSegmentInfo(config, cubeName, 
firstSegmentId);
+        String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+        SegmentInfo seg = ManagerHub.getSegmentInfo(config, cubeId, 
firstSegmentId);
         cubeManager = CubeManager.getInstance(config);
-        cubeInstance = cubeManager.getCubeByUuid(cubeName);
+        cubeInstance = cubeManager.getCubeByUuid(cubeId);
         CubeSegment newSegment = cubeInstance.getSegmentById(firstSegmentId);
         SpanningTree spanningTree;
         ParentSourceChooser sourceChooser;
@@ -127,7 +126,7 @@ public class CubeBuildJob extends SparkApplication {
             // 1.1 Call CuboidStatistics#statistics
             long startMills = System.currentTimeMillis();
             spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts()));
-            sourceChooser = new ParentSourceChooser(spanningTree, seg, jobId, 
ss, config, false);
+            sourceChooser = new ParentSourceChooser(spanningTree, seg, 
newSegment, jobId, ss, config, false);
             sourceChooser.setNeedStatistics();
             sourceChooser.decideFlatTableSource(null);
             Map<Long, HLLCounter> hllMap = new HashMap<>();
@@ -138,7 +137,7 @@ public class CubeBuildJob extends SparkApplication {
 
             // 1.2 Save cuboid statistics
             String jobTmpDir = config.getJobTmpDir(project) + "/" + jobId;
-            Path statisticsDir = new Path(jobTmpDir + "/" + 
ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + firstSegmentId + 
"/");
+            Path statisticsDir = new Path(jobTmpDir + "/" + 
ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeId + "/" + firstSegmentId + "/");
             Optional<HLLCounter> hll = 
hllMap.values().stream().max(Comparator.comparingLong(HLLCounter::getCountEstimate));
             long rc = hll.map(HLLCounter::getCountEstimate).orElse(1L);
             
CubeStatsWriter.writeCuboidStatistics(HadoopUtil.getCurrentConfiguration(), 
statisticsDir, hllMap, 1, rc);
@@ -164,7 +163,7 @@ public class CubeBuildJob extends SparkApplication {
         try {
             //TODO: what if a segment is deleted during building?
             for (String segId : segmentIds) {
-                seg = ManagerHub.getSegmentInfo(config, cubeName, segId);
+                seg = ManagerHub.getSegmentInfo(config, cubeId, segId);
                 spanningTree = new ForestSpanningTree(
                         
JavaConversions.asJavaCollection(seg.toBuildLayouts()));
                 logger.info("There are {} cuboids to be built in segment {}.",
@@ -175,7 +174,7 @@ public class CubeBuildJob extends SparkApplication {
                 }
 
                 // choose source
-                sourceChooser = new ParentSourceChooser(spanningTree, seg, 
jobId, ss, config, true);
+                sourceChooser = new ParentSourceChooser(spanningTree, seg, 
newSegment, jobId, ss, config, true);
                 sourceChooser.decideSources();
                 NBuildSourceInfo buildFromFlatTable = 
sourceChooser.flatTableSource();
                 Map<Long, NBuildSourceInfo> buildFromLayouts = 
sourceChooser.reuseSources();
@@ -313,7 +312,6 @@ public class CubeBuildJob extends SparkApplication {
                 queue.offer(theNextLayer);
             }
         }
-
     }
 
     // build current layer and return the next layer to be built.
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
index 9c18765..c02d6a7 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidStatisticsJob.scala
@@ -45,6 +45,15 @@ object CuboidStatisticsJob {
     //    l.foreach(x => println(x._1 + " >>><<< " + 
x._2.cuboid.counter.getCountEstimate))
     l
   }
+
+  def statistics(inputDs: Dataset[Row], seg: SegmentInfo, layoutIds: 
Array[Long]): Array[(Long, AggInfo)] = {
+    val res = 
inputDs.rdd.repartition(inputDs.sparkSession.sparkContext.defaultParallelism)
+      .mapPartitions(new CuboidStatisticsJob(layoutIds, seg.allColumns.count(c 
=> c.rowKey)).statisticsWithinPartition)
+    val l = res.map(a => (a.key, a)).reduceByKey((a, b) => 
a.merge(b)).collect()
+    l
+  }
+
+
 }
 
 class CuboidStatisticsJob(ids: Array[Long], rkc: Int) extends Serializable {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala
index 5a388f4..7c29ee6 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/LogJobInfoUtils.scala
@@ -72,6 +72,15 @@ object LogJobInfoUtils {
      """.stripMargin
   }
 
+  def resourceDetectBeforeOptimizeJobInfo: String = {
+    s"""
+       |==========================[RESOURCE DETECT BEFORE 
OPTIMIZE]===============================
+       |optimizing segment : ${infos.getOptimizingSegment()}
+       |spark plans : ${infos.getSparkPlans}
+       |==========================[RESOURCE DETECT BEFORE 
OPTIMIZE]===============================
+     """.stripMargin
+  }
+
   def dfMergeJobInfo: String = {
     s"""
        |==========================[MERGE CUBE]===============================
@@ -86,4 +95,27 @@ object LogJobInfoUtils {
        |==========================[MERGE CUBE]===============================
      """.stripMargin
   }
+
+  def filterRecommendCuboidJobInfo: String = {
+    s"""
+       |==========================[FILTER RECOMMEND 
CUBOID]===============================
+       |copy cuboids : ${infos.getReusedCuboid}
+       |==========================[[FILTER RECOMMEND 
CUBOID]===============================
+     """.stripMargin
+  }
+
+  def dfOptimizeJobInfo: String = {
+    s"""
+       |==========================[BUILD CUBE]===============================
+       |auto spark config :${infos.getAutoSparkConfs}
+       |wait time: ${infos.waitTime}
+       |build time: ${infos.buildTime}
+       |add cuboids: ${infos.getAddCuboids}
+       |abnormal layouts : ${infos.getAbnormalLayouts}
+       |retry times : ${infos.getRetryTimes}
+       |job retry infos :
+       |  ${infos.getJobRetryInfos.asScala.map(_.toString).mkString("\n")}
+       |==========================[BUILD CUBE]===============================
+     """.stripMargin
+  }
 }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index 216fbf8..bd54d0b 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -21,9 +21,11 @@ package org.apache.kylin.engine.spark.job
 import org.apache.kylin.shaded.com.google.common.collect.Maps
 import org.apache.kylin.engine.spark.builder._
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.cube.CubeSegment
 import org.apache.kylin.engine.spark.builder.NBuildSourceInfo
 import org.apache.kylin.engine.spark.metadata.cube.model.{LayoutEntity, 
SpanningTree}
 import org.apache.kylin.engine.spark.metadata.SegmentInfo
+import org.apache.kylin.engine.spark.metadata.cube.PathManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
@@ -33,7 +35,8 @@ import scala.collection.JavaConverters._
 
 class ParentSourceChooser(
   toBuildTree: SpanningTree,
-  var seg: SegmentInfo,
+  var segInfo: SegmentInfo,
+  var segment: CubeSegment,
   jobId: String,
   ss: SparkSession,
   config: KylinConfig,
@@ -52,8 +55,8 @@ class ParentSourceChooser(
   //TODO: MetadataConverter don't have getCubeDesc() now
 
   /*val flatTableDesc = new CubeJoinedFlatTableDesc(
-    MetadataConverter.getCubeDesc(seg.getCube),
-    ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree))*/
+    MetadataConverter.getCubeDesc(segInfo.getCube),
+    ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree))*/
   def setNeedStatistics(): Unit =
     needStatistics = true
 
@@ -61,7 +64,7 @@ class ParentSourceChooser(
 
   def decideSources(): Unit = {
     toBuildTree.getRootIndexEntities.asScala.foreach { entity =>
-      val parentLayout = CuboidLayoutChooser.selectLayoutForBuild(seg, entity)
+      val parentLayout = CuboidLayoutChooser.selectLayoutForBuild(segInfo, 
entity)
       if (parentLayout != null) {
         decideParentLayoutSource(entity, parentLayout)
       } else {
@@ -76,18 +79,18 @@ class ParentSourceChooser(
         // hacked, for some case, you do not want to trigger buildSnapshot
         // eg: resource detect
         // Move this to a more suitable place
-        val builder = new CubeSnapshotBuilder(seg, ss)
+        val builder = new CubeSnapshotBuilder(segInfo, ss)
         builder.checkDupKey()
-        seg = builder.buildSnapshot
+        segInfo = builder.buildSnapshot
       }
       flatTableSource = getFlatTable
 
-      val rowKeyColumns: Seq[String] = seg.allColumns.filter(c => 
c.rowKey).map(c => c.id.toString)
+      val rowKeyColumns: Seq[String] = segInfo.allColumns.filter(c => 
c.rowKey).map(c => c.id.toString)
       if (aggInfo == null && needStatistics) {
         val startMs = System.currentTimeMillis()
         logInfo("Sampling start ...")
         val coreDs = flatTableSource.getFlatTableDS.select(rowKeyColumns.head, 
rowKeyColumns.tail: _*)
-        aggInfo = CuboidStatisticsJob.statistics(coreDs, seg)
+        aggInfo = CuboidStatisticsJob.statistics(coreDs, segInfo)
         logInfo("Sampling finished and cost " + (System.currentTimeMillis() - 
startMs)/1000 + " s .")
         val statisticsStr = aggInfo.sortBy(x => x._1).map(x => x._1 + ":" + 
x._2.cuboid.counter.getCountEstimate).mkString(", ")
         logInfo("Cuboid Statistics results : \t" + statisticsStr)
@@ -131,7 +134,7 @@ class ParentSourceChooser(
         }.toSeq
 
         df.select(allUsedCols.map(col): _*)
-        path = s"${config.getJobTmpFlatTableDir(seg.project, jobId)}"
+        path = s"${config.getJobTmpFlatTableDir(segInfo.project, jobId)}"
         ss.sparkContext.setJobDescription("Persist flat table.")
         df.write.mode(SaveMode.Overwrite).parquet(path)
         logInfo(s"Persist flat table into:$path. Selected cols in table are 
$allUsedCols.")
@@ -146,14 +149,14 @@ class ParentSourceChooser(
   //  private def persistFactViewIfNecessary(): String = {
   //    var path = ""
   //    if (needEncoding) {
-  //      logInfo(s"Check project:${seg.getProject} seg:${seg.getName} persist 
view fact table.")
+  //      logInfo(s"Check project:${segInfo.getProject} 
segInfo:${segInfo.getName} persist view fact table.")
   //      val fact = flatTableDesc.getDataModel.getRootFactTable
-  //      val globalDicts = 
DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(seg, toBuildTree)
+  //      val globalDicts = 
DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(segInfo, toBuildTree)
   //      val existsFactDictCol = 
globalDicts.asScala.exists(_.tableName.equals(buildDesc.factTable.tableName))
   //
   //      if (fact.getTableDesc.isView && existsFactDictCol) {
   //        val viewDS = ss.table(fact.getTableDesc).alias(fact.getAlias)
-  //        path = s"${config.getJobTmpViewFactTableDir(seg.getProject, 
jobId)}"
+  //        path = s"${config.getJobTmpViewFactTableDir(segInfo.getProject, 
jobId)}"
   //        ss.sparkContext.setJobDescription("Persist view fact table.")
   //        viewDS.write.mode(SaveMode.Overwrite).parquet(path)
   //        logInfo(s"Persist view fact table into:$path.")
@@ -164,7 +167,7 @@ class ParentSourceChooser(
 
   private def getSourceFromLayout(layout: LayoutEntity): NBuildSourceInfo = {
     val buildSource = new NBuildSourceInfo
-    
buildSource.setParentStoragePath("NSparkCubingUtil.getStoragePath(dataCuboid)")
+    buildSource.setParentStoragePath(NSparkCubingUtil.getStoragePath(segment, 
layout.getId))
     buildSource.setSparkSession(ss)
     buildSource.setLayoutId(layout.getId)
     buildSource.setLayout(layout)
@@ -181,8 +184,8 @@ class ParentSourceChooser(
     sourceInfo.setLayoutId(ParentSourceChooser.FLAT_TABLE_FLAG)
     //    sourceInfo.setViewFactTablePath(viewPath)
 
-    //    val needJoin = 
ParentSourceChooser.needJoinLookupTables(seg.getModel, toBuildTree)
-    val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo)
+    //    val needJoin = 
ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree)
+    val flatTable = new CreateFlatTable(segInfo, toBuildTree, ss, sourceInfo)
     val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true)
     sourceInfo.setFlatTableDS(afterJoin)
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
index 065bd5e..dd52d7a 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeCubingJob.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.engine.spark.job;
 
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.engine.spark.application.SparkApplication;
@@ -52,12 +55,15 @@ public class ResourceDetectBeforeCubingJob extends 
SparkApplication {
     protected void doExecute() throws Exception {
         logger.info("Start detect resource before cube.");
         Set<String> segmentIds = 
Sets.newHashSet(StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS)));
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cubeInstance = 
cubeManager.getCubeByUuid(getParam(MetadataConstants.P_CUBE_ID));
         for (String segId : segmentIds) {
             SegmentInfo seg = ManagerHub.getSegmentInfo(config, 
getParam(MetadataConstants.P_CUBE_ID), segId);
+            CubeSegment segment = cubeInstance.getSegmentById(segId);
             spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts()));
             ResourceDetectUtils.write(new 
Path(config.getJobTmpShareDir(project, jobId), 
ResourceDetectUtils.countDistinctSuffix()),
                     
ResourceDetectUtils.findCountDistinctMeasure(JavaConversions.asJavaCollection(seg.toBuildLayouts())));
-            ParentSourceChooser datasetChooser = new 
ParentSourceChooser(spanningTree, seg, jobId, ss, config, false);
+            ParentSourceChooser datasetChooser = new 
ParentSourceChooser(spanningTree, seg, segment, jobId, ss, config, false);
             datasetChooser.decideSources();
             NBuildSourceInfo buildFromFlatTable = 
datasetChooser.flatTableSource();
             if (buildFromFlatTable != null) {
diff --git 
a/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java
 
b/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java
index 11ad50d..69d3d11 100644
--- 
a/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java
+++ 
b/kylin-spark-project/kylin-spark-metadata/src/main/java/org/apache/kylin/engine/spark/metadata/cube/ManagerHub.java
@@ -22,6 +22,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.engine.spark.metadata.MetadataConverter;
 import org.apache.kylin.engine.spark.metadata.SegmentInfo;
 
@@ -32,11 +33,15 @@ public class ManagerHub {
     private ManagerHub() {
     }
 
-    public static SegmentInfo getSegmentInfo(KylinConfig kylinConfig, String 
cubeName, String segmentId) {
-        CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeName);
+    public static SegmentInfo getSegmentInfo(KylinConfig kylinConfig, String 
cubeId, String segmentId) {
+        return getSegmentInfo(kylinConfig, cubeId, segmentId, 
CuboidModeEnum.CURRENT);
+    }
+
+    public static SegmentInfo getSegmentInfo(KylinConfig kylinConfig, String 
cubeId, String segmentId, CuboidModeEnum cuboidMode) {
+        CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId);
         CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-        return 
MetadataConverter.getSegmentInfo(CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeName),
-                segment.getUuid(), segment.getName(), 
segment.getStorageLocationIdentifier());
+        return 
MetadataConverter.getSegmentInfo(CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId),
+                segment.getUuid(), segment.getName(), 
segment.getStorageLocationIdentifier(), cuboidMode);
     }
 
     public static CubeInstance updateSegment(KylinConfig kylinConfig, 
SegmentInfo segmentInfo) throws IOException {
diff --git 
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
 
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
index 030834b..0469775 100644
--- 
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
+++ 
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetaData.scala
@@ -118,6 +118,10 @@ case class SegmentInfo(id: String,
     toBuildLayouts.remove(layoutEntity)
   }
 
+  def removeLayout(layoutId: Long): Unit = {
+    toBuildLayouts = toBuildLayouts.filter(layout => 
!layout.getId.equals(layoutId))
+  }
+
   def updateSnapshot(tableInfo: Map[String, String]): Unit = {
     snapshotInfo = tableInfo
   }
diff --git 
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
 
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
index a50a835..b0ba58e 100644
--- 
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
+++ 
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/kylin/engine/spark/metadata/MetadataConverter.scala
@@ -23,7 +23,7 @@ import java.util.regex.Pattern
 import java.{lang, util}
 
 import org.apache.commons.lang.StringUtils
-import org.apache.kylin.cube.cuboid.Cuboid
+import org.apache.kylin.cube.cuboid.{Cuboid, CuboidModeEnum}
 import org.apache.kylin.cube.{CubeInstance, CubeSegment, CubeUpdate}
 import org.apache.kylin.engine.spark.metadata.cube.BitUtils
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity
@@ -37,8 +37,12 @@ import scala.collection.mutable
 
 object MetadataConverter {
   def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, 
segmentName: String, identifier: String): SegmentInfo = {
+    getSegmentInfo(cubeInstance, segmentId, segmentName, identifier, 
CuboidModeEnum.CURRENT)
+  }
+
+  def getSegmentInfo(cubeInstance: CubeInstance, segmentId: String, 
segmentName: String, identifier: String, cuboidMode: CuboidModeEnum): 
SegmentInfo = {
     val (allColumnDesc, allRowKeyCols) = extractAllColumnDesc(cubeInstance)
-    val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance)
+    val (layoutEntities, measure) = extractEntityAndMeasures(cubeInstance, 
cuboidMode)
     val dictColumn = 
measure.values.filter(_.returnType.dataType.equals("bitmap"))
       .map(_.pra.head).toSet
     SegmentInfo(segmentId, segmentName, identifier, cubeInstance.getProject, 
cubeInstance.getConfig, extractFactTable(cubeInstance),
@@ -122,9 +126,12 @@ object MetadataConverter {
   }
 
   def extractEntityAndMeasures(cubeInstance: CubeInstance): 
(List[LayoutEntity], Map[Integer, FunctionDesc]) = {
+    extractEntityAndMeasures(cubeInstance, CuboidModeEnum.CURRENT)
+  }
+
+  def extractEntityAndMeasures(cubeInstance: CubeInstance, cuboidMode: 
CuboidModeEnum): (List[LayoutEntity], Map[Integer, FunctionDesc]) = {
     val (columnIndexes, shardByColumnsId, idToColumnMap, measureId) = 
genIDToColumnMap(cubeInstance)
-    (cubeInstance.getCuboidScheduler
-      .getAllCuboidIds
+    (cubeInstance.getCuboidsByMode(cuboidMode)
       .asScala
       .map { long =>
         genLayoutEntity(columnIndexes, shardByColumnsId, idToColumnMap, 
measureId, long)
@@ -202,7 +209,11 @@ object MetadataConverter {
   }
 
   def extractEntityList2JavaList(cubeInstance: CubeInstance): 
java.util.List[LayoutEntity] = {
-    extractEntityAndMeasures(cubeInstance)._1.asJava
+    extractEntityList2JavaList(cubeInstance, CuboidModeEnum.CURRENT)
+  }
+
+  def extractEntityList2JavaList(cubeInstance: CubeInstance, cuboidMode: 
CuboidModeEnum): java.util.List[LayoutEntity] = {
+    extractEntityAndMeasures(cubeInstance, cuboidMode)._1.asJava
   }
 
   private def toColumnDesc(ref: TblColRef, index: Int = -1, rowKey: Boolean = 
false) = {
diff --git 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
new file mode 100644
index 0000000..5e5789c
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NOptimizeJobTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark2;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.spark.LocalWithSparkSessionTest;
+import 
org.apache.kylin.engine.spark.job.NSparkBatchOptimizeJobCheckpointBuilder;
+import org.apache.kylin.engine.spark.job.NSparkOptimizingJob;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class NOptimizeJobTest extends LocalWithSparkSessionTest {
+    protected KylinConfig config;
+    protected CubeManager cubeMgr;
+    protected ExecutableManager execMgr;
+
+    private final String CUBE_NAME = "ci_left_join_cube";
+    private final long CUBOID_ADD = 1048575L;
+    private final long CUBOID_DELETE = 14336L;
+
+    @Override
+    public void setup() throws SchedulerException {
+        super.setup();
+        overwriteSystemProp("kylin.env", "UT");
+        overwriteSystemProp("isDeveloperMode", "true");
+        overwriteSystemProp("kylin.engine.segment-statistics-enabled", "true");
+        Map<RealizationType, Integer> priorities = Maps.newHashMap();
+        priorities.put(RealizationType.HYBRID, 0);
+        priorities.put(RealizationType.CUBE, 0);
+        Candidate.setPriorities(priorities);
+        config = KylinConfig.getInstanceFromEnv();
+        cubeMgr = CubeManager.getInstance(config);
+        execMgr = ExecutableManager.getInstance(config);
+    }
+
+    @Override
+    public void after() {
+        super.after();
+    }
+
+    @Test
+    public void verifyOptimizeJob() throws Exception {
+        CubeInstance cube = cubeMgr.reloadCube(CUBE_NAME);
+        Set<Long> recommendCuboids = new HashSet<>();
+        recommendCuboids.addAll(cube.getCuboidScheduler().getAllCuboidIds());
+        recommendCuboids.add(CUBOID_ADD);
+        recommendCuboids.remove(CUBOID_DELETE);
+        // 1. Build two segments
+        buildSegments(CUBE_NAME, new 
SegmentRange.TSRange(dateToLong("2012-01-01"), dateToLong("2012-02-01")),
+                new SegmentRange.TSRange(dateToLong("2012-02-01"), 
dateToLong("2012-03-01")));
+
+        // 2. Optimize Segment
+        CubeSegment[] optimizeSegments = cubeMgr.optimizeSegments(cube, 
recommendCuboids);
+        for (CubeSegment segment : optimizeSegments) {
+            ExecutableState result = optimizeSegment(segment);
+            Assert.assertEquals(ExecutableState.SUCCEED, result);
+        }
+
+        cube = cubeMgr.reloadCube(CUBE_NAME);
+
+        Assert.assertEquals(4, cube.getSegments().size());
+        Assert.assertEquals(2, 
cube.getSegments(SegmentStatusEnum.READY_PENDING).size());
+        Assert.assertEquals(2, 
cube.getSegments(SegmentStatusEnum.READY).size());
+
+        // 3. CheckPoint Job
+        executeCheckPoint(cube);
+
+        cube = cubeMgr.reloadCube(CUBE_NAME);
+
+        // 4. Check cube status and cuboid list
+        Assert.assertEquals(2, cube.getSegments().size());
+        Assert.assertEquals(2, 
cube.getSegments(SegmentStatusEnum.READY).size());
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        for (CubeSegment segment : cube.getSegments()) {
+            Assert.assertEquals(SegmentStatusEnum.READY, segment.getStatus());
+            CubeStatsReader segStatsReader = new CubeStatsReader(segment, 
config);
+            Assert.assertEquals(recommendCuboids, 
segStatsReader.getCuboidRowHLLCounters().keySet());
+            String cuboidPath = PathManager.getSegmentParquetStoragePath(cube, 
segment.getName(), segment.getStorageLocationIdentifier());
+            Assert.assertTrue(fs.exists(new Path(cuboidPath)));
+            Assert.assertTrue(fs.exists(new Path(cuboidPath + "/" + 
CUBOID_ADD)));
+            Assert.assertFalse(fs.exists(new Path(cuboidPath + "/" + 
CUBOID_DELETE)));
+
+        }
+        Assert.assertEquals(recommendCuboids, 
cube.getCuboidScheduler().getAllCuboidIds());
+    }
+
+    public void buildSegments(String cubeName, SegmentRange.TSRange... 
toBuildRanges) throws Exception {
+        
Assert.assertTrue(config.getHdfsWorkingDirectory().startsWith("file:"));
+
+        // cleanup all segments first
+        cleanupSegments(cubeName);
+
+        ExecutableState state;
+        for (SegmentRange.TSRange toBuildRange : toBuildRanges) {
+            state = buildCuboid(cubeName, toBuildRange);
+            Assert.assertEquals(ExecutableState.SUCCEED, state);
+        }
+    }
+
+    protected ExecutableState optimizeSegment(CubeSegment segment) throws 
Exception {
+        NSparkOptimizingJob optimizeJob = 
NSparkOptimizingJob.optimize(segment, "ADMIN");
+        execMgr.addJob(optimizeJob);
+        ExecutableState result = wait(optimizeJob);
+        checkJobTmpPathDeleted(config, optimizeJob);
+        return result;
+    }
+
+    protected ExecutableState executeCheckPoint(CubeInstance cubeInstance) 
throws Exception {
+        CheckpointExecutable checkPointJob = new 
NSparkBatchOptimizeJobCheckpointBuilder(cubeInstance, "ADMIN").build();
+        execMgr.addJob(checkPointJob);
+        ExecutableState result = wait(checkPointJob);
+        return result;
+    }
+}
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 60bbf6c..c514bd4 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.HashSet;
 
 import javax.servlet.http.HttpServletResponse;
 
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 7cb5d3d..3cfcc78 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -44,12 +44,12 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.LookupSnapshotBuildJob;
 import org.apache.kylin.engine.mr.common.CubeJobLockUtil;
 import org.apache.kylin.engine.mr.common.JobInfoConverter;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import 
org.apache.kylin.engine.spark.job.NSparkBatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.engine.spark.metadata.cube.source.SourceFactory;
 import org.apache.kylin.job.JobInstance;
@@ -331,7 +331,7 @@ public class JobService extends BasicService implements 
InitializingBean {
             }
 
             /** Add checkpoint job for batch jobs */
-            CheckpointExecutable checkpointJob = new 
BatchOptimizeJobCheckpointBuilder(cube, submitter).build();
+            CheckpointExecutable checkpointJob = new 
NSparkBatchOptimizeJobCheckpointBuilder(cube, submitter).build();
             checkpointJob.addTaskListForCheck(optimizeJobList);
 
             getExecutableManager().addJob(checkpointJob);
@@ -501,11 +501,19 @@ public class JobService extends BasicService implements 
InitializingBean {
 
     public String getJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
+        AbstractExecutable job = executableManager.getJob(jobId);
+        if (job instanceof CheckpointExecutable) {
+            return executableManager.getOutput(stepId).getVerboseMsg();
+        }
         return executableManager.getOutputFromHDFSByJobId(jobId, 
stepId).getVerboseMsg();
     }
 
     public String getAllJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
+        AbstractExecutable job = executableManager.getJob(jobId);
+        if (job instanceof CheckpointExecutable) {
+            return executableManager.getOutput(stepId).getVerboseMsg();
+        }
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId, 
Integer.MAX_VALUE).getVerboseMsg();
     }
 
@@ -666,6 +674,7 @@ public class JobService extends BasicService implements 
InitializingBean {
         if (null == job.getRelatedCube() || null == 
getCubeManager().getCube(job.getRelatedCube())
                 || null == job.getRelatedSegment()) {
             getExecutableManager().discardJob(job.getId());
+            return;
         }
 
         logger.info("Cancel job [" + job.getId() + "] trigger by "

Reply via email to