This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e39b34cb4667430134d6054123acf34eb9992daf
Author: jlf <longfei.ji...@kyligence.io>
AuthorDate: Fri Feb 24 22:08:06 2023 +0800

    KYLIN-5531 remove redundancy code
---
 .../kylin/rest/service/ModelServiceBuildTest.java  |   9 +-
 .../engine/spark/builder/PartitionFlatTable.scala  |  69 ---
 .../engine/spark/builder/SegmentFlatTable.scala    | 683 ---------------------
 .../engine/spark/job/RDPartitionBuildExec.scala    |  35 +-
 .../engine/spark/job/RDSegmentBuildExec.scala      |  32 +-
 .../kylin/engine/spark/job/RDSegmentBuildJob.java  |  13 +-
 .../engine/spark/job/stage/build/BuildStage.scala  |   5 +
 .../job/stage/build/FlatTableAndDictBase.scala     |  99 ++-
 .../stage/build/MaterializedFactTableView.scala    |  24 +-
 .../partition/PartitionFlatTableAndDictBase.scala  |  22 +
 .../PartitionMaterializedFactTableView.scala       |  30 +-
 .../spark/smarter/IndexDependencyParser.scala      |  16 +-
 .../spark/builder/TestDimensionTableStat.scala     |  13 +-
 .../engine/spark/builder/TestFlatTable.scala}      |  41 +-
 .../engine/spark/builder/TestInferFilters.scala    |  18 +-
 .../spark/builder/TestSegmentFlatTable.scala       |  35 +-
 .../engine/spark/job/TestRDSegmentBuildExec.scala  | 111 ++++
 .../PartitionFlatTableAndDictBaseTest.scala        |  88 +++
 18 files changed, 394 insertions(+), 949 deletions(-)

diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index cb2896da8f..78ba0133e9 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.rest.service;
 
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CONCURRENT_SUBMIT_LIMIT;
-import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_FAIL;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_ABANDON;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_DUPLICATE;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_LOCKED;
@@ -505,7 +504,6 @@ public class ModelServiceBuildTest extends SourceTestCase {
     }
 
     @Test
-    @Ignore("Sorry, I don't know")
     public void testBuildSegmentsManually_NoPartition_FullSegExisted() throws 
Exception {
         val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa";
         val project = "default";
@@ -527,13 +525,14 @@ public class ModelServiceBuildTest extends SourceTestCase 
{
         modelService.updateDataModelSemantic(project, request);
         try {
             modelBuildService.buildSegmentsManually("default", 
"89af4ee2-2cdb-4b07-b39e-4c29856309aa", "", "");
+            Assert.fail();
         } catch (TransactionException exception) {
             Assert.assertTrue(exception.getCause() instanceof KylinException);
-            
Assert.assertEquals(JOB_CREATE_CHECK_FAIL.getErrorMsg().getLocalizedString(),
-                    exception.getCause().getMessage());
+            Assert.assertEquals(SEGMENT_STATUS.getErrorMsg().getCodeString(),
+                    ((KylinException) 
exception.getCause()).getErrorCode().getCodeString());
         }
         val executables = getRunningExecutables(project, modelId);
-        Assert.assertEquals(2, executables.size());
+        Assert.assertEquals(1, executables.size());
         Assert.assertTrue(((NSparkCubingJob) executables.get(0)).getHandler() 
instanceof ExecutableAddCuboidHandler);
     }
 
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/PartitionFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/PartitionFlatTable.scala
deleted file mode 100644
index 76c386d8c2..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/PartitionFlatTable.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.builder
-
-import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-
-import java.util.Objects
-import java.{lang, util}
-import scala.collection.JavaConverters._
-
-class PartitionFlatTable(private val sparkSession: SparkSession, //
-                         private val tableDesc: PartitionFlatTableDesc) 
extends SegmentFlatTable(sparkSession, tableDesc) {
-
-  override protected def applyPartitionDesc(originDS: Dataset[Row]): 
Dataset[Row] = {
-    // Multi level partition.
-    val descMLP = dataModel.getMultiPartitionDesc
-    require(Objects.nonNull(descMLP))
-    // Date range partition.
-    val descDRP = dataModel.getPartitionDesc
-    val condition = descMLP.getPartitionConditionBuilder
-      .buildMultiPartitionCondition(descDRP, descMLP, //
-        new util.LinkedList[lang.Long](tableDesc.getPartitions), null, 
segmentRange)
-    if (StringUtils.isBlank(condition)) {
-      logInfo(s"Segment $segmentId no available partition condition.")
-      return originDS
-    }
-    logInfo(s"Segment $segmentId apply partition condition $condition.")
-    originDS.where(condition)
-  }
-
-  def getPartitionDS(partitionId: Long): Dataset[Row] = {
-    val columnIds = tableDesc.getColumnIds.asScala
-    val columnName2Id = tableDesc.getColumns //
-      .asScala //
-      .map(column => column.getIdentity) //
-      .zip(columnIds) //
-    val column2IdMap = columnName2Id.toMap
-
-    val partitionColumnIds = 
dataModel.getMultiPartitionDesc.getColumnRefs.asScala //
-      .map(_.getIdentity).map(x => column2IdMap.apply(x))
-    val values = 
dataModel.getMultiPartitionDesc.getPartitionInfo(partitionId).getValues.toSeq
-
-    val converted = partitionColumnIds.zip(values).map { case (k, v) =>
-      s"`$k` = '$v'"
-    }.mkString(" and ")
-
-    logInfo(s"Segment $segmentId single partition condition: $converted")
-    FLAT_TABLE.where(converted)
-  }
-
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
deleted file mode 100644
index e6e41fd9b9..0000000000
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala
+++ /dev/null
@@ -1,683 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.builder
-
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.{Locale, Objects, Timer, TimerTask}
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.common.constant.LogConstant
-import org.apache.kylin.common.logging.SetLogCategory
-import org.apache.kylin.common.util.HadoopUtil
-import org.apache.kylin.common.{CustomUtils, KapConfig, KylinConfig}
-import org.apache.kylin.engine.spark.builder.DFBuilderHelper._
-import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
-import org.apache.kylin.engine.spark.job.{FiltersUtil, TableMetaManager}
-import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
-import org.apache.kylin.engine.spark.utils.LogEx
-import org.apache.kylin.engine.spark.utils.SparkDataSource._
-import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.kylin.metadata.model._
-import org.apache.kylin.query.util.PushDownUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.functions.{col, expr}
-import org.apache.spark.sql.manager.SparderLookupManager
-import org.apache.spark.sql.types.StructField
-import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.utils.ProxyThreadUtils
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.parallel.ForkJoinTaskSupport
-import scala.concurrent.duration.{Duration, MILLISECONDS}
-import scala.concurrent.forkjoin.ForkJoinPool
-import scala.util.{Failure, Success, Try}
-
-import com.google.common.collect.Sets
-
-class SegmentFlatTable(private val sparkSession: SparkSession, //
-                       private val tableDesc: SegmentFlatTableDesc) extends 
LogEx {
-
-  import SegmentFlatTable._
-
-  protected final val project = tableDesc.getProject
-  protected final val spanningTree = tableDesc.getSpanningTree
-  protected final val dataSegment = tableDesc.getDataSegment
-  protected final val dataModel = tableDesc.getDataModel
-  protected final val indexPlan = tableDesc.getIndexPlan
-  protected final val segmentRange = tableDesc.getSegmentRange
-  protected lazy final val flatTablePath = tableDesc.getFlatTablePath
-  protected lazy final val factTableViewPath = tableDesc.getFactTableViewPath
-  protected lazy final val workingDir = tableDesc.getWorkingDir
-  protected lazy final val sampleRowCount = tableDesc.getSampleRowCount
-
-  protected final val segmentId = dataSegment.getId
-
-  protected lazy final val FLAT_TABLE = generateFlatTable()
-  // flat-table without dict columns
-  private lazy final val FLAT_TABLE_PART = generateFlatTablePart()
-
-  protected val rootFactTable: TableRef = dataModel.getRootFactTable
-
-  // Flat table.
-  private lazy val shouldPersistFT = tableDesc.shouldPersistFlatTable()
-  private lazy val isFTReady = dataSegment.isFlatTableReady && 
tableDesc.buildFilesSeparationPathExists(flatTablePath)
-
-  // Fact table view.
-  private lazy val isFTV = rootFactTable.getTableDesc.isView
-  private lazy val shouldPersistFTV = tableDesc.shouldPersistView()
-  private lazy val isFTVReady = dataSegment.isFactViewReady && 
HadoopUtil.getWorkingFileSystem.exists(factTableViewPath)
-
-  private lazy val needJoin = {
-    val join = tableDesc.shouldJoinLookupTables
-    logInfo(s"Segment $segmentId flat table need join: $join")
-    join
-  }
-
-  protected lazy val factTableDS: Dataset[Row] = newFactTableDS()
-  private lazy val fastFactTableDS = newFastFactTableDS()
-
-  // By design, COMPUTED-COLUMN could only be defined on fact table.
-  protected lazy val factTableCCs: Set[TblColRef] = 
rootFactTable.getColumns.asScala
-    .filter(_.getColumnDesc.isComputedColumn)
-    .toSet
-
-  def getFlatTablePartDS: Dataset[Row] = {
-    FLAT_TABLE_PART
-  }
-
-  def getFlatTableDS: Dataset[Row] = {
-    FLAT_TABLE
-  }
-
-  def gatherStatistics(): Statistics = {
-    val stepDesc = s"Segment $segmentId collect flat table statistics."
-    logInfo(stepDesc)
-    sparkSession.sparkContext.setJobDescription(stepDesc)
-    val statistics = gatherStatistics(FLAT_TABLE)
-    logInfo(s"Segment $segmentId collect flat table statistics $statistics.")
-    sparkSession.sparkContext.setJobDescription(null)
-    statistics
-  }
-
-  protected def generateFlatTablePart(): Dataset[Row] = {
-    val recoveredDS = tryRecoverFTDS()
-    if (recoveredDS.nonEmpty) {
-      return recoveredDS.get
-    }
-    var flatTableDS = if (needJoin) {
-      val lookupTableDSMap = generateLookupTables()
-      if (inferFiltersEnabled) {
-        FiltersUtil.initFilters(tableDesc, lookupTableDSMap)
-      }
-      val jointDS = joinFactTableWithLookupTables(fastFactTableDS, 
lookupTableDSMap, dataModel, sparkSession)
-      concatCCs(jointDS, factTableCCs)
-    } else {
-      concatCCs(fastFactTableDS, factTableCCs)
-    }
-    flatTableDS = applyFilterCondition(flatTableDS)
-    changeSchemeToColumnId(flatTableDS, tableDesc)
-  }
-
-  protected def generateFlatTable(): Dataset[Row] = {
-    val recoveredDS = tryRecoverFTDS()
-    if (recoveredDS.nonEmpty) {
-      return recoveredDS.get
-    }
-
-    /**
-     * If need to build and encode dict columns, then
-     * 1. try best to build in fact-table.
-     * 2. try best to build in lookup-tables (without cc dict).
-     * 3. try to build in fact-table.
-     *
-     * CC in lookup-tables MUST be built in flat-table.
-     */
-    val (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc) = 
prepareForDict()
-    val factTable = buildDictIfNeed(factTableDS, dictCols, encodeCols)
-
-    var flatTable = if (needJoin) {
-
-      val lookupTables = generateLookupTables()
-        .map(lookupTableMap =>
-          (lookupTableMap._1, buildDictIfNeed(lookupTableMap._2, 
dictColsWithoutCc, encodeColsWithoutCc)))
-      if (lookupTables.nonEmpty) {
-        generateLookupTableMeta(project, lookupTables)
-      }
-      if (inferFiltersEnabled) {
-        FiltersUtil.initFilters(tableDesc, lookupTables)
-      }
-
-      val jointTable = joinFactTableWithLookupTables(factTable, lookupTables, 
dataModel, sparkSession)
-      buildDictIfNeed(concatCCs(jointTable, factTableCCs),
-        selectColumnsNotInTables(factTable, lookupTables.values.toSeq, 
dictCols),
-        selectColumnsNotInTables(factTable, lookupTables.values.toSeq, 
encodeCols))
-    } else {
-      factTable
-    }
-
-    DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => 
copied.setDictReady(true))
-
-    flatTable = applyFilterCondition(flatTable)
-    flatTable = changeSchemeToColumnId(flatTable, tableDesc)
-    tryPersistFTDS(flatTable)
-  }
-
-  private def prepareForDict(): (Set[TblColRef], Set[TblColRef], 
Set[TblColRef], Set[TblColRef]) = {
-    val dictCols = 
DictionaryBuilderHelper.extractTreeRelatedGlobalDictToBuild(dataSegment, 
spanningTree.getIndices).asScala.toSet
-    val encodeCols = 
DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(dataSegment, 
spanningTree.getIndices).asScala.toSet
-    val dictColsWithoutCc = dictCols.filter(!_.getColumnDesc.isComputedColumn)
-    val encodeColsWithoutCc = 
encodeCols.filter(!_.getColumnDesc.isComputedColumn)
-    (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc)
-  }
-
-  private def newFastFactTableDS(): Dataset[Row] = {
-    val partDS = newPartitionedFTDS(needFast = true)
-    fulfillDS(partDS, factTableCCs, rootFactTable)
-  }
-
-  private def newFactTableDS(): Dataset[Row] = {
-    val partDS = newPartitionedFTDS()
-    fulfillDS(partDS, factTableCCs, rootFactTable)
-  }
-
-  private def newPartitionedFTDS(needFast: Boolean = false): Dataset[Row] = {
-    if (isFTVReady) {
-      logInfo(s"Skip FACT-TABLE-VIEW segment $segmentId.")
-      return sparkSession.read.parquet(factTableViewPath.toString)
-    }
-    val tableDS = newTableDS(rootFactTable)
-    val partDS = applyPartitionDesc(tableDS)
-    if (needFast || !isFTV) {
-      return partDS
-    }
-    tryPersistFTVDS(partDS)
-  }
-
-  protected def generateLookupTables(): mutable.LinkedHashMap[JoinTableDesc, 
Dataset[Row]] = {
-    val ret = mutable.LinkedHashMap[JoinTableDesc, Dataset[Row]]()
-    val antiFlattenTableSet = mutable.Set[String]()
-    dataModel.getJoinTables.asScala
-      .filter(isTableToBuild)
-      .foreach { joinDesc =>
-        val fkTableRef = joinDesc.getJoin.getFKSide
-        if (fkTableRef == null) {
-          throw new IllegalArgumentException("FK table cannot be null")
-        }
-        val fkTable = fkTableRef.getTableDesc.getIdentity
-        if (!joinDesc.isFlattenable || antiFlattenTableSet.contains(fkTable)) {
-          antiFlattenTableSet.add(joinDesc.getTable)
-        }
-        if (joinDesc.isFlattenable && 
!antiFlattenTableSet.contains(joinDesc.getTable)) {
-          val tableRef = joinDesc.getTableRef
-          val tableDS = newTableDS(tableRef)
-          ret.put(joinDesc, fulfillDS(tableDS, Set.empty, tableRef))
-        }
-      }
-    ret
-  }
-
-  private def isTableToBuild(joinDesc: JoinTableDesc): Boolean = {
-    !tableDesc.isPartialBuild || (tableDesc.isPartialBuild && 
tableDesc.getRelatedTables.contains(joinDesc.getAlias))
-  }
-
-  protected def applyPartitionDesc(originDS: Dataset[Row]): Dataset[Row] = {
-    // Date range partition.
-    val descDRP = dataModel.getPartitionDesc
-    if (Objects.isNull(descDRP) //
-      || Objects.isNull(descDRP.getPartitionDateColumn) //
-      || Objects.isNull(segmentRange) //
-      || segmentRange.isInfinite) {
-      logInfo(s"No available PARTITION-CONDITION segment $segmentId")
-      return originDS
-    }
-
-    val condition = descDRP.getPartitionConditionBuilder //
-      .buildDateRangeCondition(descDRP, null, segmentRange)
-    logInfo(s"Apply PARTITION-CONDITION $condition segment $segmentId")
-    originDS.where(condition)
-  }
-
-  private def applyFilterCondition(originDS: Dataset[Row]): Dataset[Row] = {
-    if (StringUtils.isBlank(dataModel.getFilterCondition)) {
-      logInfo(s"No available FILTER-CONDITION segment $segmentId")
-      return originDS
-    }
-    val expression = PushDownUtil.massageExpression(dataModel, project, 
dataModel.getFilterCondition, null)
-    val converted = replaceDot(expression, dataModel)
-    val condition = s" (1=1) AND ($converted)"
-    logInfo(s"Apply FILTER-CONDITION: $condition segment $segmentId")
-    originDS.where(condition)
-  }
-
-  private def tryPersistFTVDS(tableDS: Dataset[Row]): Dataset[Row] = {
-    if (!shouldPersistFTV) {
-      return tableDS
-    }
-    logInfo(s"Persist FACT-TABLE-VIEW $factTableViewPath")
-    sparkSession.sparkContext.setJobDescription("Persist FACT-TABLE-VIEW.")
-    tableDS.write.mode(SaveMode.Overwrite).parquet(factTableViewPath.toString)
-    // Checkpoint fact table view.
-    DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => 
copied.setFactViewReady(true))
-    val newDS = sparkSession.read.parquet(factTableViewPath.toString)
-    sparkSession.sparkContext.setJobDescription(null)
-    newDS
-  }
-
-  private def tryPersistFTDS(tableDS: Dataset[Row]): Dataset[Row] = {
-    if (!shouldPersistFT) {
-      return tableDS
-    }
-    if (tableDS.schema.isEmpty) {
-      logInfo("No available flat table schema.")
-      return tableDS
-    }
-    logInfo(s"Segment $segmentId persist flat table: $flatTablePath")
-    sparkSession.sparkContext.setJobDescription(s"Segment $segmentId persist 
flat table.")
-    val coalescePartitionNum = tableDesc.getFlatTableCoalescePartitionNum
-    if (coalescePartitionNum > 0) {
-      logInfo(s"Segment $segmentId flat table coalesce partition num 
$coalescePartitionNum")
-      tableDS.coalesce(coalescePartitionNum) //
-        .write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString)
-    } else {
-      tableDS.write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString)
-    }
-    DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => {
-      copied.setFlatTableReady(true)
-      if (dataSegment.isFlatTableReady) {
-        // KE-14714 if flat table is updated, there might be some data 
inconsistency across indexes
-        copied.setStatus(SegmentStatusEnum.WARNING)
-      }
-    })
-    val newDS = sparkSession.read.parquet(flatTablePath.toString)
-    sparkSession.sparkContext.setJobDescription(null)
-    newDS
-  }
-
-  private def tryRecoverFTDS(): Option[Dataset[Row]] = {
-    if (tableDesc.isPartialBuild) {
-      logInfo(s"Segment $segmentId no need reuse flat table for partial 
build.")
-      return None
-    } else if (!isFTReady) {
-      logInfo(s"Segment $segmentId  no available flat table.")
-      return None
-    }
-    // +----------+---+---+---+---+-----------+-----------+
-    // |         0|  2|  3|  4|  1|2_KE_ENCODE|4_KE_ENCODE|
-    // +----------+---+---+---+---+-----------+-----------+
-    val tableDS: DataFrame = 
Try(sparkSession.read.parquet(flatTablePath.toString)) match {
-      case Success(df) => df
-      case Failure(f) =>
-        logInfo(s"Handled AnalysisException: Unable to infer schema for 
Parquet. Flat table path $flatTablePath is empty.", f)
-        sparkSession.emptyDataFrame
-    }
-    // ([2_KE_ENCODE,4_KE_ENCODE], [0,1,2,3,4])
-    val (coarseEncodes, noneEncodes) = tableDS.schema.map(sf => 
sf.name).partition(_.endsWith(ENCODE_SUFFIX))
-    val encodes = coarseEncodes.map(_.stripSuffix(ENCODE_SUFFIX))
-
-    val noneEncodesFieldMap: Map[String, StructField] = 
tableDS.schema.map(_.name)
-      .zip(tableDS.schema.fields)
-      .filter(p => noneEncodes.contains(p._1))
-      .toMap
-
-    val nones = tableDesc.getColumnIds.asScala //
-      .zip(tableDesc.getColumns.asScala)
-      .map(p => (String.valueOf(p._1), p._2)) //
-      .filterNot(p => {
-        val dataType = SparderTypeUtil.toSparkType(p._2.getType)
-        noneEncodesFieldMap.contains(p._1) && (dataType == 
noneEncodesFieldMap(p._1).dataType)
-      }) ++
-      // [xx_KE_ENCODE]
-      tableDesc.getMeasures.asScala //
-        .map(DictionaryBuilderHelper.needGlobalDict) //
-        .filter(Objects.nonNull) //
-        .map(colRef => dataModel.getColumnIdByColumnName(colRef.getIdentity)) 
//
-        .map(String.valueOf) //
-        .filterNot(encodes.contains)
-        .map(id => id + ENCODE_SUFFIX)
-
-    if (nones.nonEmpty) {
-      // The previous flat table missed some columns.
-      // Flat table would be updated at afterwards step.
-      logInfo(s"Segment $segmentId update flat table, columns should have been 
included " + //
-        s"${nones.mkString("[", ",", "]")}")
-      return None
-    }
-    // The previous flat table could be reusable.
-    logInfo(s"Segment $segmentId skip build flat table.")
-    Some(tableDS)
-  }
-
-  def newTableDS(tableRef: TableRef): Dataset[Row] = {
-    // By design, why not try recovering from table snapshot.
-    // If fact table is a view and its snapshot exists, that will benefit.
-    logInfo(s"Load source table ${tableRef.getTableIdentity}")
-    val tableDescCopy = tableRef.getTableDesc
-    if (tableDescCopy.isTransactional || tableDescCopy.isRangePartition) {
-      val model = tableRef.getModel
-      if (Objects.nonNull(model)) {
-        tableDescCopy.setPartitionDesc(model.getPartitionDesc)
-      }
-
-      if (Objects.nonNull(segmentRange) && 
Objects.nonNull(segmentRange.getStart) && Objects.nonNull(segmentRange.getEnd)) 
{
-        sparkSession.table(tableDescCopy, segmentRange.getStart.toString, 
segmentRange.getEnd.toString).alias(tableRef.getAlias)
-      } else {
-        sparkSession.table(tableDescCopy).alias(tableRef.getAlias)
-      }
-    } else {
-      sparkSession.table(tableDescCopy).alias(tableRef.getAlias)
-    }
-  }
-
-  protected final def gatherStatistics(tableDS: Dataset[Row]): Statistics = {
-    val totalRowCount = tableDS.count()
-    if (!shouldPersistFT) {
-      // By design, evaluating column bytes should be based on existed flat 
table.
-      logInfo(s"Flat table not persisted, only compute row count.")
-      return Statistics(totalRowCount, Map.empty[String, Long])
-    }
-    // zipWithIndex before filter
-    val canonicalIndices = tableDS.columns //
-      .zipWithIndex //
-      .filterNot(_._1.endsWith(ENCODE_SUFFIX)) //
-      .map { case (name, index) =>
-        val canonical = tableDesc.getCanonicalName(Integer.parseInt(name))
-        (canonical, index)
-      }.filterNot(t => Objects.isNull(t._1))
-    logInfo(s"CANONICAL INDICES ${canonicalIndices.mkString("[", ", ", "]")}")
-    // By design, action-take is not sampling.
-    val sampled = tableDS.take(sampleRowCount).flatMap(row => //
-      canonicalIndices.map { case (canonical, index) => //
-        val bytes = utf8Length(row.get(index))
-        (canonical, bytes) //
-      }).groupBy(_._1).mapValues(_.map(_._2).sum)
-    val evaluated = evaluateColumnBytes(totalRowCount, sampled)
-    Statistics(totalRowCount, evaluated)
-  }
-
-  private def evaluateColumnBytes(totalCount: Long, //
-                                  sampled: Map[String, Long]): Map[String, 
Long] = {
-    val multiple = if (totalCount < sampleRowCount) 1f else totalCount.toFloat 
/ sampleRowCount
-    sampled.mapValues(bytes => (bytes * multiple).toLong)
-  }
-
-  // Copied from DFChooser.
-  private def utf8Length(value: Any): Long = {
-    if (Objects.isNull(value)) {
-      return 0L
-    }
-    var i = 0
-    var bytes = 0L
-    val sequence = value.toString
-    while (i < sequence.length) {
-      val c = sequence.charAt(i)
-      if (c <= 0x7F) bytes += 1
-      else if (c <= 0x7FF) bytes += 2
-      else if (Character.isHighSurrogate(c)) {
-        bytes += 4
-        i += 1
-      }
-      else bytes += 3
-      i += 1
-    }
-    bytes
-  }
-
-  // ====================================== Dividing line, till the bottom. 
====================================== //
-  // Historical debt.
-  // Waiting for reconstruction.
-
-  protected def buildDictIfNeed(table: Dataset[Row],
-                                dictCols: Set[TblColRef],
-                                encodeCols: Set[TblColRef]): Dataset[Row] = {
-    if (dictCols.isEmpty && encodeCols.isEmpty) {
-      return table
-    }
-    if (dataSegment.isDictReady) {
-      logInfo(s"Skip DICTIONARY segment $segmentId")
-    } else {
-      // KE-32076 ensure at least one worker was registered before dictionary 
lock added.
-      waitTillWorkerRegistered()
-      buildDict(table, dictCols)
-    }
-    encodeColumn(table, encodeCols)
-  }
-
-  def waitTillWorkerRegistered(): Unit = {
-    val cdl = new CountDownLatch(1)
-    val timer = new Timer("worker-starvation-timer", true)
-    timer.scheduleAtFixedRate(new TimerTask {
-      override def run(): Unit = {
-        if (sparkSession.sparkContext.statusTracker.getExecutorInfos.isEmpty) {
-          logWarning("Ensure at least one worker has been registered before 
building dictionary.")
-        } else {
-          this.cancel()
-          cdl.countDown()
-        }
-      }
-    }, 0, TimeUnit.SECONDS.toMillis(20))
-    cdl.await()
-    timer.cancel()
-  }
-
-  private def concatCCs(table: Dataset[Row], computColumns: Set[TblColRef]): 
Dataset[Row] = {
-    val matchedCols = selectColumnsInTable(table, computColumns)
-    var tableWithCcs = table
-    matchedCols.foreach(m =>
-      tableWithCcs = 
tableWithCcs.withColumn(convertFromDot(m.getBackTickIdentity),
-        expr(convertFromDot(m.getBackTickExp))))
-    tableWithCcs
-  }
-
-  private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = {
-    var matchedCols = selectColumnsInTable(ds, dictCols)
-    if (dataSegment.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) {
-      matchedCols = matchedCols.filterNot(_.getType.isIntegerFamily)
-    }
-    val builder = new DFDictionaryBuilder(ds, dataSegment, sparkSession, 
Sets.newHashSet(matchedCols.asJavaCollection))
-    builder.buildDictSet()
-  }
-
-  private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef]): 
Dataset[Row] = {
-    val matchedCols = selectColumnsInTable(ds, encodeCols)
-    var encodeDs = ds
-    if (matchedCols.nonEmpty) {
-      encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, 
matchedCols.asJava)
-    }
-    encodeDs
-  }
-
-}
-
-object SegmentFlatTable extends LogEx {
-
-  import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
-
-  private val conf = KylinConfig.getInstanceFromEnv
-  var inferFiltersEnabled: Boolean = conf.inferFiltersEnabled()
-
-  def fulfillDS(originDS: Dataset[Row], cols: Set[TblColRef], tableRef: 
TableRef): Dataset[Row] = {
-    // wrap computed columns, filter out valid columns
-    val computedColumns = chooseSuitableCols(originDS, cols)
-    // wrap alias
-    val newDS = wrapAlias(originDS, tableRef.getAlias)
-    val selectedColumns = newDS.schema.fields.map(tp => col(tp.name)) ++ 
computedColumns
-    logInfo(s"Table SCHEMA ${tableRef.getTableIdentity} 
${newDS.schema.treeString}")
-    newDS.select(selectedColumns: _*)
-  }
-
-  def wrapAlias(originDS: Dataset[Row], alias: String): Dataset[Row] = {
-    val newFields = originDS.schema.fields.map(f =>
-      convertFromDot("`" + alias + "`" + "." + "`" + f.name + "`")).toSeq
-    val newDS = originDS.toDF(newFields: _*)
-    CustomUtils.tryWithResourceIgnore(new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
-      _ => logInfo(s"Wrap ALIAS ${originDS.schema.treeString} TO 
${newDS.schema.treeString}")
-    }
-    newDS
-  }
-
-
-  def joinFactTableWithLookupTables(rootFactDataset: Dataset[Row],
-                                    lookupTableDatasetMap: 
mutable.Map[JoinTableDesc, Dataset[Row]],
-                                    model: NDataModel,
-                                    ss: SparkSession): Dataset[Row] = {
-    lookupTableDatasetMap.foldLeft(rootFactDataset)(
-      (joinedDataset: Dataset[Row], tuple: (JoinTableDesc, Dataset[Row])) =>
-        joinTableDataset(model.getRootFactTable.getTableDesc, tuple._1, 
joinedDataset, tuple._2, ss))
-  }
-
-  def joinTableDataset(rootFactDesc: TableDesc,
-                       lookupDesc: JoinTableDesc,
-                       rootFactDataset: Dataset[Row],
-                       lookupDataset: Dataset[Row],
-                       ss: SparkSession): Dataset[Row] = {
-    var afterJoin = rootFactDataset
-    val join = lookupDesc.getJoin
-    if (join != null && !StringUtils.isEmpty(join.getType)) {
-      val joinType = join.getType.toUpperCase(Locale.ROOT)
-      val pk = join.getPrimaryKeyColumns
-      val fk = join.getForeignKeyColumns
-      if (pk.length != fk.length) {
-        throw new RuntimeException(
-          s"Invalid join condition of fact table: $rootFactDesc,fk: 
${fk.mkString(",")}," +
-            s" lookup table:$lookupDesc, pk: ${pk.mkString(",")}")
-      }
-      val equiConditionColPairs = fk.zip(pk).map(joinKey =>
-        col(convertFromDot(joinKey._1.getBackTickIdentity))
-          .equalTo(col(convertFromDot(joinKey._2.getBackTickIdentity))))
-      CustomUtils.tryWithResourceIgnore(new 
SetLogCategory(LogConstant.BUILD_CATEGORY)) {
-        _ => logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}")
-      }
-
-      if (join.getNonEquiJoinCondition != null) {
-        var condition = 
NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition)
-        if (!equiConditionColPairs.isEmpty) {
-          condition = condition && equiConditionColPairs.reduce(_ && _)
-        }
-        logInfo(s"Root table ${rootFactDesc.getIdentity}, join table 
${lookupDesc.getAlias}, non-equi condition: ${condition.toString()}")
-        afterJoin = afterJoin.join(lookupDataset, condition, joinType)
-      } else {
-        val condition = equiConditionColPairs.reduce(_ && _)
-        logInfo(s"Root table ${rootFactDesc.getIdentity}, join table 
${lookupDesc.getAlias}, condition: ${condition.toString()}")
-        if (inferFiltersEnabled) {
-          afterJoin = afterJoin.join(FiltersUtil.inferFilters(pk, 
lookupDataset), condition, joinType)
-        } else {
-          afterJoin = afterJoin.join(lookupDataset, condition, joinType)
-        }
-      }
-    }
-    afterJoin
-  }
-
-  def changeSchemeToColumnId(ds: Dataset[Row], tableDesc: 
SegmentFlatTableDesc): Dataset[Row] = {
-    val structType = ds.schema
-    val columnIds = tableDesc.getColumnIds.asScala
-    val columnName2Id = tableDesc.getColumns
-      .asScala
-      .map(column => convertFromDot(column.getBackTickIdentity))
-      .zip(columnIds)
-    val columnName2IdMap = columnName2Id.toMap
-    val encodeSeq = structType.filter(_.name.endsWith(ENCODE_SUFFIX)).map {
-      tp =>
-        val columnName = tp.name.stripSuffix(ENCODE_SUFFIX)
-        val columnId = columnName2IdMap.apply(columnName)
-        col(tp.name).alias(columnId.toString + ENCODE_SUFFIX)
-    }
-    val columns = columnName2Id.map(tp => expr("`" + tp._1 + 
"`").alias(tp._2.toString))
-    logInfo(s"Select model column is ${columns.mkString(",")}")
-    logInfo(s"Select model encoding column is ${encodeSeq.mkString(",")}")
-    val selectedColumns = columns ++ encodeSeq
-
-    logInfo(s"Select model all column is ${selectedColumns.mkString(",")}")
-    ds.select(selectedColumns: _*)
-  }
-
-  private def generateLookupTableMeta(project: String,
-                                      lookupTables: 
mutable.LinkedHashMap[JoinTableDesc, Dataset[Row]]): Unit = {
-    val config = KapConfig.getInstanceFromEnv
-    if (config.isRecordSourceUsage) {
-      lookupTables.keySet.foreach { joinTable =>
-        val tableManager = 
NTableMetadataManager.getInstance(config.getKylinConfig, project)
-        val table = tableManager.getOrCreateTableExt(joinTable.getTable)
-        if (table.getTotalRows > 0) {
-          TableMetaManager.putTableMeta(joinTable.getTable, 0, 
table.getTotalRows)
-          logInfo(s"put meta table: ${joinTable.getTable}, count: 
${table.getTotalRows}")
-        }
-      }
-    }
-    val noStatLookupTables = lookupTables.filterKeys(table => 
TableMetaManager.getTableMeta(table.getTable).isEmpty)
-    if (config.getKylinConfig.isNeedCollectLookupTableInfo && 
noStatLookupTables.nonEmpty) {
-      val lookupTablePar = noStatLookupTables.par
-      lookupTablePar.tasksupport = new ForkJoinTaskSupport(new 
ForkJoinPool(lookupTablePar.size))
-      lookupTablePar.foreach { case (joinTableDesc, dataset) =>
-        val tableIdentity = joinTableDesc.getTable
-        logTime(s"count $tableIdentity") {
-          val maxTime = 
Duration(config.getKylinConfig.getCountLookupTableMaxTime, MILLISECONDS)
-          val defaultCount = 
config.getKylinConfig.getLookupTableCountDefaultValue
-          val rowCount = countTableInFiniteTimeOrDefault(dataset, 
tableIdentity, maxTime, defaultCount)
-          TableMetaManager.putTableMeta(tableIdentity, 0L, rowCount)
-          logInfo(s"put meta table: $tableIdentity , count: $rowCount")
-        }
-      }
-    }
-  }
-
-  def countTableInFiniteTimeOrDefault(dataset: Dataset[Row], tableName: String,
-                                      duration: Duration, defaultCount: Long): 
Long = {
-    val countTask = dataset.rdd.countAsync()
-    try {
-      ProxyThreadUtils.awaitResult(countTask, duration)
-    } catch {
-      case e: Exception =>
-        countTask.cancel()
-        logInfo(s"$tableName count fail, and return defaultCount 
$defaultCount", e)
-        defaultCount
-    }
-  }
-
-  def replaceDot(original: String, model: NDataModel): String = {
-    val sb = new StringBuilder(original)
-
-    for (namedColumn <- model.getAllNamedColumns.asScala) {
-      val colName = namedColumn.getAliasDotColumn.toLowerCase(Locale.ROOT)
-      doReplaceDot(sb, colName, namedColumn.getAliasDotColumn)
-
-      // try replacing quoted identifiers if any
-      val quotedColName = colName.split('.').mkString("`", "`.`", "`")
-      if (quotedColName.nonEmpty) {
-        doReplaceDot(sb, quotedColName, 
namedColumn.getAliasDotColumn.split('.').mkString("`", "`.`", "`"))
-      }
-    }
-    sb.toString()
-  }
-
-  private def doReplaceDot(sb: StringBuilder, namedCol: String, 
colAliasDotColumn: String): Unit = {
-    var start = sb.toString.toLowerCase(Locale.ROOT).indexOf(namedCol)
-    while (start != -1) {
-      sb.replace(start,
-        start + namedCol.length,
-        "`" + convertFromDot(colAliasDotColumn) + "`")
-      start = sb.toString.toLowerCase(Locale.ROOT)
-        .indexOf(namedCol)
-    }
-  }
-
-  case class Statistics(totalCount: Long, columnBytes: Map[String, Long])
-
-}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
index d643cd6c05..6fe76fe447 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala
@@ -19,11 +19,11 @@
 package org.apache.kylin.engine.spark.job
 
 import io.kyligence.kap.guava20.shaded.common.collect.{Maps, Sets}
+import io.kyligence.kap.guava20.shaded.common.collect.Maps
 import org.apache.hadoop.fs.Path
-import org.apache.kylin.engine.spark.builder.PartitionFlatTable
-import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc
-import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree
-import 
org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.{PartitionTreeBuilder,
 PartitionTreeNode}
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import 
org.apache.kylin.engine.spark.job.stage.build.partition.PartitionFlatTableAndDictBase
+import 
org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.PartitionTreeNode
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.spark.sql.SparderEnv
 import org.apache.spark.sql.datasource.storage.StorageStoreUtils
@@ -32,31 +32,18 @@ import org.apache.spark.sql.hive.utils.ResourceDetectUtils
 import java.io.IOException
 import scala.collection.JavaConverters._
 
-class RDPartitionBuildExec(private val jobContext: RDSegmentBuildJob, //
-                           private val dataSegment: NDataSegment) extends 
RDSegmentBuildExec(jobContext, dataSegment) {
-
-  private val newBuckets =
-    
jobContext.getReadOnlyBuckets.asScala.filter(_.getSegmentId.equals(segmentId)).toSeq
-
-  protected final lazy val partitions = {
-    val distincted = newBuckets.map(_.getPartitionId).distinct.sorted
-    logInfo(s"Segment $segmentId partitions: ${distincted.mkString("[", ",", 
"]")}")
-    
scala.collection.JavaConverters.seqAsJavaList(distincted.map(java.lang.Long.valueOf))
-  }
-
-  private lazy val spanningTree = new PartitionSpanningTree(config, //
-    new PartitionTreeBuilder(dataSegment, readOnlyLayouts, jobId, partitions, 
Sets.newHashSet(newBuckets.asJava)))
-
-  private lazy val flatTableDesc = new PartitionFlatTableDesc(config, 
dataSegment, spanningTree, jobId, partitions)
-
-  private lazy val flatTable = new PartitionFlatTable(sparkSession, 
flatTableDesc)
+class RDPartitionBuildExec(private val jobContext: SegmentJob, //
+                           private val dataSegment: NDataSegment, private val 
buildParam: BuildParam)
+  extends PartitionFlatTableAndDictBase(jobContext, dataSegment, buildParam) {
 
+  protected final val rdSharedPath = jobContext.getRdSharedPath
 
   @throws(classOf[IOException])
-  override def detectResource(): Unit = {
+  def detectResource(): Unit = {
+    initFlatTableOnDetectResource()
 
     val flatTableExecutions = if (spanningTree.fromFlatTable()) {
-      Seq((-1L, Seq(flatTable.getFlatTablePartDS.queryExecution)))
+      Seq((-1L, Seq(getFlatTablePartDS.queryExecution)))
     } else {
       Seq.empty
     }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
index a2c272dec6..bfaddaa11d 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala
@@ -20,12 +20,9 @@ package org.apache.kylin.engine.spark.job
 
 import com.google.common.collect.Maps
 import org.apache.hadoop.fs.Path
-import org.apache.kylin.engine.spark.builder.SegmentFlatTable
-import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
-import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
-import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase
 import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparderEnv
 import org.apache.spark.sql.datasource.storage.StorageStoreUtils
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
@@ -33,33 +30,20 @@ import org.apache.spark.sql.hive.utils.ResourceDetectUtils
 import java.io.IOException
 import scala.collection.JavaConverters._
 
-class RDSegmentBuildExec(private val jobContext: RDSegmentBuildJob, //
-                         private val dataSegment: NDataSegment) extends 
Logging {
+class RDSegmentBuildExec(private val jobContext: SegmentJob, //
+                         private val dataSegment: NDataSegment, private val 
buildParam: BuildParam
+                        )
+  extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) {
   // Resource detect segment build exec.
 
-  // Needed variables from job context.
-  protected final val jobId = jobContext.getJobId
-  protected final val config = jobContext.getConfig
-  protected final val dataflowId = jobContext.getDataflowId
-  protected final val sparkSession = jobContext.getSparkSession
   protected final val rdSharedPath = jobContext.getRdSharedPath
-  protected final val readOnlyLayouts = jobContext.getReadOnlyLayouts
-
-  // Needed variables from data segment.
-  protected final val segmentId = dataSegment.getId
-  protected final val project = dataSegment.getProject
-
-  private lazy val spanningTree = new AdaptiveSpanningTree(config, new 
AdaptiveTreeBuilder(dataSegment, readOnlyLayouts))
-
-  private lazy val flatTableDesc = new SegmentFlatTableDesc(config, 
dataSegment, spanningTree)
-
-  private lazy val flatTable = new SegmentFlatTable(sparkSession, 
flatTableDesc)
 
   @throws(classOf[IOException])
   def detectResource(): Unit = {
+    initFlatTableOnDetectResource()
 
     val flatTableExecutions = if (spanningTree.fromFlatTable()) {
-      Seq((-1L, flatTable.getFlatTablePartDS.queryExecution))
+      Seq((-1L, getFlatTablePartDS.queryExecution))
     } else {
       Seq.empty
     }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
index f4809375ab..2fbf4652dc 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java
@@ -24,9 +24,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
 import org.apache.kylin.engine.spark.job.RDPartitionBuildExec;
 import org.apache.kylin.engine.spark.job.RDSegmentBuildExec;
+import org.apache.kylin.engine.spark.job.ResourceDetect;
+import org.apache.kylin.engine.spark.job.SegmentJob;
+import org.apache.kylin.engine.spark.job.stage.BuildParam;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 
+import lombok.val;
+
 public class RDSegmentBuildJob extends SegmentJob implements ResourceDetect {
 
     public static void main(String[] args) {
@@ -46,14 +51,16 @@ public class RDSegmentBuildJob extends SegmentJob 
implements ResourceDetect {
 
     private void detectPartition() throws IOException {
         for (NDataSegment dataSegment : readOnlySegments) {
-            RDSegmentBuildExec exec = new RDPartitionBuildExec(this, 
dataSegment);
+            val buildParam = new BuildParam();
+            val exec = new RDPartitionBuildExec(this, dataSegment, buildParam);
             exec.detectResource();
         }
     }
 
     private void detect() throws IOException {
         for (NDataSegment dataSegment : readOnlySegments) {
-            RDSegmentBuildExec exec = new RDSegmentBuildExec(this, 
dataSegment);
+            val buildParam = new BuildParam();
+            val exec = new RDSegmentBuildExec(this, dataSegment, buildParam);
             exec.detectResource();
         }
     }
@@ -65,6 +72,6 @@ public class RDSegmentBuildJob extends SegmentJob implements 
ResourceDetect {
 
     private void writeCountDistinct() {
         ResourceDetectUtils.write(new Path(rdSharedPath, 
ResourceDetectUtils.countDistinctSuffix()), //
-                
ResourceDetectUtils.findCountDistinctMeasure(getReadOnlyLayouts()));
+                ResourceDetectUtils.findCountDistinctMeasure(readOnlyLayouts));
     }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
index dba42cca96..2cfe5b7e2d 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala
@@ -559,4 +559,9 @@ abstract class BuildStage(private val jobContext: 
SegmentJob,
 
   // ----------------------------- Beta feature: Inferior Flat Table. 
----------------------------- //
 
+  override def getStageName: String = ""
+
+  override def execute(): Unit = {
+    // parent class is empty
+  }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
index a8b28d283b..0599fc95ae 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala
@@ -31,9 +31,12 @@ import 
org.apache.kylin.engine.spark.job.NSparkCubingUtil.convertFromDot
 import org.apache.kylin.engine.spark.job.stage.{BuildParam, StageExec}
 import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob, 
TableMetaManager}
 import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
+import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
 import org.apache.kylin.engine.spark.model.planner.{CuboIdToLayoutUtils, 
FlatTableToCostUtils}
 import org.apache.kylin.engine.spark.utils.LogEx
 import org.apache.kylin.engine.spark.utils.SparkDataSource._
+import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
+import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.kylin.metadata.cube.planner.CostBasePlannerUtils
 import org.apache.kylin.metadata.model._
@@ -485,29 +488,7 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
               total = tableSizeMap(tableName).longValue()
               logInfo(s"Find $column's table $tableName count $total from 
cache")
             } else {
-              val catalogStatistics = 
TableMetaManager.getTableMeta(tableDesc.getTableName(column))
-              if (catalogStatistics.isDefined) {
-                total = catalogStatistics.get.rowCount.get.longValue()
-                logInfo(s"Find $column's table $tableName count $total from 
catalog")
-              } else {
-                val tableMetadataDesc = 
tableMetadataManager.getTableDesc(tableDesc.getTableName(column))
-                if (tableMetadataDesc != null) {
-                  val tableExtDesc = 
tableMetadataManager.getTableExtIfExists(tableMetadataDesc)
-                  if (tableExtDesc.getTotalRows > 0) {
-                    total = tableExtDesc.getTotalRows
-                    logInfo(s"Find $column's table $tableName count $total 
from table ext")
-                  } else if (tableMetadataDesc.getLastSnapshotPath != null) {
-                    val baseDir = 
KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory
-                    val fs = HadoopUtil.getWorkingFileSystem
-                    val path = new Path(baseDir, 
tableMetadataDesc.getLastSnapshotPath)
-                    if (fs.exists(path)) {
-                      total = sparkSession.read.parquet(path.toString).count()
-                      logInfo(s"Calculate $column's table $tableName count 
$total " +
-                        s"from parquet 
${tableMetadataDesc.getLastSnapshotPath}")
-                    }
-                  }
-                }
-              }
+              total = evaluateColumnTotalFromTableDesc(tableMetadataManager, 
totalCount, tableName, column)
             }
           }
         } catch {
@@ -528,6 +509,29 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     }
   }
 
+  def evaluateColumnTotalFromTableDesc(tableMetadataManager: 
NTableMetadataManager, totalCount: Long, //
+                                       tableName: String, column: String): 
Long = {
+    var total: Long = totalCount
+    val tableMetadataDesc = 
tableMetadataManager.getTableDesc(tableDesc.getTableName(column))
+    if (tableMetadataDesc != null) {
+      val tableExtDesc = 
tableMetadataManager.getTableExtIfExists(tableMetadataDesc)
+      if (tableExtDesc.getTotalRows > 0) {
+        total = tableExtDesc.getTotalRows
+        logInfo(s"Find $column's table $tableName count $total from table ext")
+      } else if (tableMetadataDesc.getLastSnapshotPath != null) {
+        val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory
+        val fs = HadoopUtil.getWorkingFileSystem
+        val path = new Path(baseDir, tableMetadataDesc.getLastSnapshotPath)
+        if (fs.exists(path)) {
+          total = sparkSession.read.parquet(path.toString).count()
+          logInfo(s"Calculate $column's table $tableName count $total " +
+            s"from parquet ${tableMetadataDesc.getLastSnapshotPath}")
+        }
+      }
+    }
+    total
+  }
+
   // Copied from DFChooser.
   private def utf8Length(value: Any): Long = {
     if (Objects.isNull(value)) {
@@ -648,6 +652,42 @@ abstract class FlatTableAndDictBase(private val 
jobContext: SegmentJob,
     }
     encodeDs
   }
+
+  protected def initSpanningTree(): Unit = {
+    val spanTree = new AdaptiveSpanningTree(config, new 
AdaptiveTreeBuilder(dataSegment, readOnlyLayouts))
+    buildParam.setSpanningTree(spanTree)
+  }
+
+  protected def initFlatTableDesc(): Unit = {
+    val flatTableDesc: SegmentFlatTableDesc = if (jobContext.isPartialBuild) {
+      val parser = new IndexDependencyParser(dataModel)
+      val relatedTableAlias =
+        parser.getRelatedTablesAlias(jobContext.getReadOnlyLayouts)
+      new SegmentFlatTableDesc(config, dataSegment, spanningTree, 
relatedTableAlias)
+    } else {
+      new SegmentFlatTableDesc(config, dataSegment, spanningTree)
+    }
+    buildParam.setFlatTableDesc(flatTableDesc)
+  }
+
+  def initFactTable(): Unit = {
+    val factTableDS: Dataset[Row] = newFactTableDS()
+    buildParam.setFactTableDS(factTableDS)
+    val fastFactTableDS: Dataset[Row] = newFastFactTableDS()
+    buildParam.setFastFactTableDS(fastFactTableDS)
+  }
+
+  def materializedFactTableView(): Unit = {
+    initSpanningTree()
+    initFlatTableDesc()
+    initFactTable()
+  }
+
+  def initFlatTableOnDetectResource(): Unit = {
+    materializedFactTableView()
+    val flatTablePart: Dataset[Row] = generateFlatTablePart()
+    buildParam.setFlatTablePart(flatTablePart)
+  }
 }
 
 object FlatTableAndDictBase extends LogEx {
@@ -707,10 +747,7 @@ object FlatTableAndDictBase extends LogEx {
       logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}")
 
       if (join.getNonEquiJoinCondition != null) {
-        var condition = 
NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition)
-        if (!equiConditionColPairs.isEmpty) {
-          condition = condition && equiConditionColPairs.reduce(_ && _)
-        }
+        val condition: Column = getCondition(join, equiConditionColPairs)
         logInfo(s"Root table ${rootFactDesc.getIdentity}, join table 
${lookupDesc.getAlias}, non-equi condition: ${condition.toString()}")
         afterJoin = afterJoin.join(lookupDataset, condition, joinType)
       } else {
@@ -726,6 +763,14 @@ object FlatTableAndDictBase extends LogEx {
     afterJoin
   }
 
+  def getCondition(join: JoinDesc, equiConditionColPairs: Array[Column]): 
Column = {
+    var condition = 
NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition)
+    if (!equiConditionColPairs.isEmpty) {
+      condition = condition && equiConditionColPairs.reduce(_ && _)
+    }
+    condition
+  }
+
   def changeSchemeToColumnId(ds: Dataset[Row], tableDesc: 
SegmentFlatTableDesc): Dataset[Row] = {
     val structType = ds.schema
     val columnIds = tableDesc.getColumnIds.asScala
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
index 632ae0cd1c..19b5cd073c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
@@ -20,36 +20,14 @@ package org.apache.kylin.engine.spark.job.stage.build
 
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
-import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
-import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
-import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
 import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.spark.sql.{Dataset, Row}
 
 class MaterializedFactTableView(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildParam: BuildParam)
   extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) {
 
   override def execute(): Unit = {
     logInfo(s"Build SEGMENT $segmentId")
-    val spanTree = new AdaptiveSpanningTree(config, new 
AdaptiveTreeBuilder(dataSegment, readOnlyLayouts))
-    buildParam.setSpanningTree(spanTree)
-
-    val flatTableDesc: SegmentFlatTableDesc = if (jobContext.isPartialBuild) {
-      val parser = new IndexDependencyParser(dataModel)
-      val relatedTableAlias =
-        parser.getRelatedTablesAlias(readOnlyLayouts)
-      new SegmentFlatTableDesc(config, dataSegment, spanningTree, 
relatedTableAlias)
-    } else {
-      new SegmentFlatTableDesc(config, dataSegment, spanningTree)
-    }
-    buildParam.setFlatTableDesc(flatTableDesc)
-
-    val factTableDS: Dataset[Row] = newFactTableDS()
-    buildParam.setFactTableDS(factTableDS)
-
-    val fastFactTableDS: Dataset[Row] = newFastFactTableDS()
-    buildParam.setFastFactTableDS(fastFactTableDS)
+    materializedFactTableView()
     if (buildParam.isSkipMaterializedFactTableView) {
       onStageSkipped()
     }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala
index 4b9a7bdfc0..af1d8251c7 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark.job.stage.build.partition
 
+import io.kyligence.kap.guava20.shaded.common.collect.Sets
 import org.apache.commons.lang3.StringUtils
 import org.apache.kylin.engine.spark.builder.{DictionaryBuilderHelper, 
PartitionDictionaryBuilderHelper}
 import org.apache.kylin.engine.spark.job.stage.BuildParam
@@ -25,6 +26,9 @@ import 
org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase
 import 
org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase.Statistics
 import org.apache.kylin.engine.spark.job.{PartitionExec, SegmentJob}
 import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc
+import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
+import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree
+import 
org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.PartitionTreeBuilder
 import org.apache.kylin.metadata.cube.model.NDataSegment
 import org.apache.kylin.metadata.model.TblColRef
 import org.apache.spark.sql.{Dataset, Row}
@@ -99,4 +103,22 @@ abstract class PartitionFlatTableAndDictBase(private val 
jobContext: SegmentJob,
     val encodeColsWithoutCc = 
encodeCols.filter(!_.getColumnDesc.isComputedColumn)
     (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc)
   }
+
+  override def initSpanningTree(): Unit = {
+    val spanTree = new PartitionSpanningTree(config, //
+      new PartitionTreeBuilder(dataSegment, readOnlyLayouts, jobId, 
partitions, Sets.newHashSet(newBuckets.asJava)))
+    buildParam.setPartitionSpanningTree(spanTree)
+  }
+
+  override def initFlatTableDesc(): Unit = {
+    val tableDesc = if (jobContext.isPartialBuild) {
+      val parser = new IndexDependencyParser(dataModel)
+      val relatedTableAlias =
+        parser.getRelatedTablesAlias(jobContext.getReadOnlyLayouts)
+      new PartitionFlatTableDesc(config, dataSegment, spanningTree, 
relatedTableAlias, jobId, partitions)
+    } else {
+      new PartitionFlatTableDesc(config, dataSegment, spanningTree, jobId, 
partitions)
+    }
+    buildParam.setTableDesc(tableDesc)
+  }
 }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
index deb1c604bb..3f04461b15 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala
@@ -20,15 +20,7 @@ package 
org.apache.kylin.engine.spark.job.stage.build.partition
 
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.engine.spark.job.stage.BuildParam
-import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc
-import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
-import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree
-import 
org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.PartitionTreeBuilder
 import org.apache.kylin.metadata.cube.model.NDataSegment
-import org.apache.kylin.metadata.job.JobBucket
-import org.apache.spark.sql.{Dataset, Row}
-
-import java.util.stream.Collectors
 
 class PartitionMaterializedFactTableView(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildParam: BuildParam)
   extends PartitionFlatTableAndDictBase(jobContext, dataSegment, buildParam) {
@@ -36,27 +28,7 @@ class PartitionMaterializedFactTableView(jobContext: 
SegmentJob, dataSegment: ND
 
   override def execute(): Unit = {
     logInfo(s"Build SEGMENT $segmentId")
-    val spanTree = new PartitionSpanningTree(config,
-      new PartitionTreeBuilder(dataSegment, readOnlyLayouts, jobId, partitions,
-        
jobContext.getReadOnlyBuckets.stream.filter(_.getSegmentId.equals(segmentId)).collect(Collectors.toSet[JobBucket])))
-    buildParam.setPartitionSpanningTree(spanTree)
-
-    val tableDesc = if (jobContext.isPartialBuild) {
-      val parser = new IndexDependencyParser(dataModel)
-      val relatedTableAlias =
-        parser.getRelatedTablesAlias(readOnlyLayouts)
-      new PartitionFlatTableDesc(config, dataSegment, spanTree, 
relatedTableAlias, jobId, partitions)
-    } else {
-      new PartitionFlatTableDesc(config, dataSegment, spanTree, jobId, 
partitions)
-    }
-    buildParam.setTableDesc(tableDesc)
-
-    val factTableDS: Dataset[Row] = newFactTableDS()
-    buildParam.setFactTableDS(factTableDS)
-
-    val fastFactTableDS: Dataset[Row] = newFastFactTableDS()
-    buildParam.setFastFactTableDS(fastFactTableDS)
-
+    materializedFactTableView()
     if (buildParam.isSkipMaterializedFactTableView) {
       onStageSkipped()
     }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
index e1335978fc..04234294c9 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala
@@ -17,25 +17,23 @@
  */
 package org.apache.kylin.engine.spark.smarter
 
-import java.util
-import java.util.Collections
-
+import com.google.common.collect.{Lists, Maps, Sets}
 import org.apache.commons.collections.CollectionUtils
 import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.engine.spark.builder.SegmentFlatTable
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
+import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase
 import org.apache.kylin.metadata.cube.model.LayoutEntity
-import org.apache.kylin.metadata.model.{FunctionDesc, JoinTableDesc, 
NDataModel, TableRef, TblColRef}
+import org.apache.kylin.metadata.model._
 import org.apache.kylin.query.util.PushDownUtil
 import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.{Dataset, Row, SparderEnv, SparkSession}
 
+import java.util
+import java.util.Collections
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import com.google.common.collect.{Lists, Maps, Sets}
-
 class IndexDependencyParser(val model: NDataModel) {
 
   private val ccTableNameAliasMap = Maps.newHashMap[String, util.Set[String]]
@@ -113,7 +111,7 @@ class IndexDependencyParser(val model: NDataModel) {
     model.getJoinTables.asScala.map((joinTable: JoinTableDesc) => {
       joinTableDFMap.put(joinTable, generateDatasetOnTable(ss, 
joinTable.getTableRef))
     })
-    val df = SegmentFlatTable.joinFactTableWithLookupTables(rootDF, 
joinTableDFMap, model, ss)
+    val df = FlatTableAndDictBase.joinFactTableWithLookupTables(rootDF, 
joinTableDFMap, model, ss)
     val filterCondition = model.getFilterCondition
     if (StringUtils.isNotEmpty(filterCondition)) {
       val massagedCondition = PushDownUtil.massageExpression(model, 
model.getProject, filterCondition, null)
@@ -127,7 +125,7 @@ class IndexDependencyParser(val model: NDataModel) {
     val structType = SchemaProcessor.buildSchemaWithRawTable(tableCols)
     val alias = tableRef.getAlias
     val dataset = ss.createDataFrame(Lists.newArrayList[Row], 
structType).alias(alias)
-    SegmentFlatTable.wrapAlias(dataset, alias)
+    FlatTableAndDictBase.wrapAlias(dataset, alias)
   }
 
   private def initTableNames(): Unit = {
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala
index 0a16a971f0..b4ee2087a4 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala
@@ -19,13 +19,14 @@
 package org.apache.kylin.engine.spark.builder
 
 import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.engine.spark.job.TableMetaManager
-import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.engine.spark.job.{SegmentJob, TableMetaManager}
 import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
 import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
 import org.apache.kylin.metadata.cube.model._
 import org.apache.kylin.metadata.model.SegmentRange
 import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
+import org.mockito.Mockito
 
 import scala.collection.JavaConverters._
 
@@ -54,9 +55,11 @@ class TestDimensionTableStat extends SparderBaseFunSuite 
with SharedSparkSession
 
     val seg = dfMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
     val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new 
AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts))
-    val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, 
toBuildTree)
-    val flatTable = new SegmentFlatTable(spark, flatTableDesc)
-    flatTable.getFlatTableDS
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    val buildParam = new BuildParam()
+    new TestFlatTable(segmentJob, seg, buildParam).test(getTestConfig, 
toBuildTree)
+
 
     df.getModel.getJoinTables.asScala.foreach { joinTable =>
       val dimCount = 
TableMetaManager.getTableMeta(joinTable.getTable).get.rowCount.get
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestFlatTable.scala
similarity index 57%
copy from 
src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
copy to 
src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestFlatTable.scala
index 632ae0cd1c..61cbaf61b0 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestFlatTable.scala
@@ -16,44 +16,37 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.engine.spark.job.stage.build
+package org.apache.kylin.engine.spark.builder
 
+import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.job.SegmentJob
 import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase
 import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
-import org.apache.kylin.engine.spark.smarter.IndexDependencyParser
 import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
-import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
 import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.kylin.metadata.model.TableRef
 import org.apache.spark.sql.{Dataset, Row}
 
-class MaterializedFactTableView(jobContext: SegmentJob, dataSegment: 
NDataSegment, buildParam: BuildParam)
+class TestFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, 
buildParam: BuildParam)
   extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) {
-
-  override def execute(): Unit = {
-    logInfo(s"Build SEGMENT $segmentId")
-    val spanTree = new AdaptiveSpanningTree(config, new 
AdaptiveTreeBuilder(dataSegment, readOnlyLayouts))
-    buildParam.setSpanningTree(spanTree)
-
-    val flatTableDesc: SegmentFlatTableDesc = if (jobContext.isPartialBuild) {
-      val parser = new IndexDependencyParser(dataModel)
-      val relatedTableAlias =
-        parser.getRelatedTablesAlias(readOnlyLayouts)
-      new SegmentFlatTableDesc(config, dataSegment, spanningTree, 
relatedTableAlias)
-    } else {
-      new SegmentFlatTableDesc(config, dataSegment, spanningTree)
-    }
+  def test(kylinConfig: KylinConfig, toBuildTree: AdaptiveSpanningTree): Unit 
= {
+    buildParam.setSpanningTree(toBuildTree)
+    val flatTableDesc = new SegmentFlatTableDesc(kylinConfig, dataSegment, 
toBuildTree)
     buildParam.setFlatTableDesc(flatTableDesc)
-
     val factTableDS: Dataset[Row] = newFactTableDS()
     buildParam.setFactTableDS(factTableDS)
-
     val fastFactTableDS: Dataset[Row] = newFastFactTableDS()
     buildParam.setFastFactTableDS(fastFactTableDS)
-    if (buildParam.isSkipMaterializedFactTableView) {
-      onStageSkipped()
-    }
+    val dict: Dataset[Row] = buildDictIfNeed()
+    buildParam.setDict(dict)
+    val flatTable: Dataset[Row] = generateFlatTable()
+    buildParam.setFlatTable(flatTable)
+    val flatTablePart: Dataset[Row] = generateFlatTablePart()
+    buildParam.setFlatTablePart(flatTablePart)
   }
 
-  override def getStageName: String = "MaterializedFactTableView"
+  def testNewTableDS(ref: TableRef): Dataset[Row] = {
+    newTableDS(ref)
+  }
 }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala
index 561521127d..100246e112 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala
@@ -19,8 +19,9 @@
 package org.apache.kylin.engine.spark.builder
 
 import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.engine.spark.job.FiltersUtil
-import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase
+import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob}
 import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
 import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
 import org.apache.kylin.metadata.cube.model._
@@ -30,6 +31,7 @@ import org.apache.spark.sql.common.{LocalMetadata, 
SharedSparkSession, SparderBa
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.{FilterExec, SparkPlan}
 import org.junit.Assert
+import org.mockito.Mockito
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Set
@@ -46,11 +48,11 @@ class TestInferFilters extends SparderBaseFunSuite with 
AdaptiveSparkPlanHelper
   }
 
   override def beforeEach(): Unit = {
-    SegmentFlatTable.inferFiltersEnabled = true
+    FlatTableAndDictBase.inferFiltersEnabled = true
   }
 
   override def afterEach(): Unit = {
-    SegmentFlatTable.inferFiltersEnabled = false
+    FlatTableAndDictBase.inferFiltersEnabled = false
   }
 
   test("infer filters from join desc") {
@@ -64,10 +66,12 @@ class TestInferFilters extends SparderBaseFunSuite with 
AdaptiveSparkPlanHelper
 
     val seg = dsMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
     val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new 
AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts))
-    val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, 
toBuildTree)
-    val flatTable = new SegmentFlatTable(spark, flatTableDesc)
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    val buildParam = new BuildParam()
+    new TestFlatTable(segmentJob, seg, buildParam).test(getTestConfig, 
toBuildTree)
 
-    val filters = 
getFilterPlan(flatTable.getFlatTableDS.queryExecution.executedPlan)
+    val filters = 
getFilterPlan(buildParam.getFlatTable.queryExecution.executedPlan)
 
     Assert.assertTrue(Set("EDW.TEST_CAL_DT.CAL_DT", 
"DEFAULT.TEST_KYLIN_FACT.CAL_DT",
       
"DEFAULT.TEST_ORDER.TEST_DATE_ENC").subsetOf(FiltersUtil.getAllEqualColSets))
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala
index fda8cc0248..8cf81315c3 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala
@@ -19,9 +19,8 @@
 package org.apache.kylin.engine.spark.builder
 
 import org.apache.kylin.common.KylinConfig
-import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc
-import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree
-import 
org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder
+import org.apache.kylin.engine.spark.job.SegmentJob
+import org.apache.kylin.engine.spark.job.stage.BuildParam
 import org.apache.kylin.metadata.cube.model._
 import org.apache.kylin.metadata.model.{SegmentRange, TableDesc, TableRef}
 import org.apache.spark.SparkExecutorInfo
@@ -53,16 +52,17 @@ class TestSegmentFlatTable extends SparderBaseFunSuite with 
SharedSparkSession w
     dfMgr.updateDataflow(update)
 
     val seg = dfMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
-    val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new 
AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts))
-    val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, 
toBuildTree)
-    val flatTable = new SegmentFlatTable(spark, flatTableDesc)
-    assert(flatTable.newTableDS(df.getModel.getAllTables.iterator().next()) != 
null)
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    val buildParam = new BuildParam()
+    val testFlatTable = new TestFlatTable(segmentJob, seg, buildParam)
+    
assert(testFlatTable.testNewTableDS(df.getModel.getAllTables.iterator().next()) 
!= null)
 
     val tableRef: TableRef = df.getModel.getAllTables.iterator().next()
     val tableDesc: TableDesc = tableRef.getTableDesc
     tableDesc.setRangePartition(true)
     val ref = new TableRef(df.getModel, tableDesc.getName, tableDesc, false)
-    assert(flatTable.newTableDS(ref) != null)
+    assert(testFlatTable.testNewTableDS(ref) != null)
   }
 
   test("testSegmentFlatTableWithChineseAndSpecialChar") {
@@ -78,16 +78,17 @@ class TestSegmentFlatTable extends SparderBaseFunSuite with 
SharedSparkSession w
     dfMgr.updateDataflow(update)
 
     val seg = dfMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
-    val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new 
AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts))
-    val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, 
toBuildTree)
-    val flatTable = new SegmentFlatTable(spark, flatTableDesc)
-    assert(flatTable.newTableDS(df.getModel.getAllTables.iterator().next()) != 
null)
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    val buildParam = new BuildParam()
+    val testFlatTable = new TestFlatTable(segmentJob, seg, buildParam)
+    
assert(testFlatTable.testNewTableDS(df.getModel.getAllTables.iterator().next()) 
!= null)
 
     val tableRef: TableRef = df.getModel.getAllTables.iterator().next()
     val tableDesc: TableDesc = tableRef.getTableDesc
     tableDesc.setRangePartition(true)
     val ref = new TableRef(df.getModel, tableDesc.getName, tableDesc, false)
-    assert(flatTable.newTableDS(ref) != null)
+    assert(testFlatTable.testNewTableDS(ref) != null)
   }
 
   test("waitTillWorkerRegistered") {
@@ -106,13 +107,13 @@ class TestSegmentFlatTable extends SparderBaseFunSuite 
with SharedSparkSession w
     dfMgr.updateDataflow(update)
 
     val seg = dfMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
-    val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new 
AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts))
-    val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, 
toBuildTree)
-
     val spiedSparkSession = Mockito.spy(spark)
     val spiedSparkContext = Mockito.spy(spark.sparkContext)
     val spiedTracker = Mockito.spy(spark.sparkContext.statusTracker)
-    val flatTable = new SegmentFlatTable(spiedSparkSession, flatTableDesc)
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    val buildParam = new BuildParam()
+    val flatTable = new TestFlatTable(segmentJob, seg, buildParam)
     Mockito.when(spiedSparkSession.sparkContext).thenReturn(spiedSparkContext)
     Mockito.when(spiedSparkContext.statusTracker).thenReturn(spiedTracker)
     Mockito.when(spiedTracker.getExecutorInfos)
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/TestRDSegmentBuildExec.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/TestRDSegmentBuildExec.scala
new file mode 100644
index 0000000000..66dcfdf43b
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/TestRDSegmentBuildExec.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job
+
+import com.google.common.collect.Lists
+import org.apache.kylin.common.util.TestUtils.getTestConfig
+import org.apache.kylin.common.{KapConfig, KylinConfig}
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.metadata.cube.model._
+import org.apache.kylin.metadata.model.{NTableMetadataManager, SegmentRange, 
TableDesc, TableExtDesc}
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.mockito.Mockito
+import org.scalatest.PrivateMethodTester
+
+import java.util
+import scala.collection.JavaConverters._
+
+class TestRDSegmentBuildExec extends SparderBaseFunSuite with 
PrivateMethodTester
+  with AdaptiveSparkPlanHelper with SharedSparkSession with LocalMetadata {
+  private val PROJECT = "infer_filter"
+  private val MODEL_NAME1 = "89af4ee2-2cdb-4b07-b39e-4c29856309ab"
+
+  test("test evaluateColumnTotalFromTableDesc") {
+    val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
PROJECT)
+    val df: NDataflow = dsMgr.getDataflow(MODEL_NAME1)
+    // cleanup all segments first
+    val update = new NDataflowUpdate(df.getUuid)
+    update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray)
+    dsMgr.updateDataflow(update)
+
+    val seg = dsMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getConfig).thenReturn(getTestConfig)
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    val buildParam = new BuildParam()
+    val exec = new RDSegmentBuildExec(segmentJob, seg, buildParam)
+    exec.initFlatTableOnDetectResource()
+
+    val tableMetadataManager = 
NTableMetadataManager.getInstance(getTestConfig, PROJECT)
+    var result = exec.evaluateColumnTotalFromTableDesc(tableMetadataManager, 
0, "", "UUID")
+    assertEquals(0, result)
+
+    val flatTableDesc = buildParam.getFlatTableDesc
+    val columns = flatTableDesc.getColumns.asScala.toArray
+    var totalRows = 0
+    val columnsIdentity = columns.map(col => col.getIdentity)
+    columns.map(colRef => 
tableMetadataManager.getTableDesc(flatTableDesc.getTableName(colRef.toString)))
+      .filter(tableMetadataDesc => tableMetadataDesc != 
null).distinct.foreach(tableMetadataDesc => {
+      val tableName = tableMetadataDesc.getName
+      mockTableStats(tableMetadataDesc, totalRows)
+      if (totalRows % 2 != 0) {
+        val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory
+        val tmp = spark.range(totalRows)
+        tmp.write.mode(SaveMode.Overwrite).parquet(baseDir + tableName + 
".parquet")
+        tableMetadataDesc.setLastSnapshotPath(tableName + ".parquet")
+        tableMetadataDesc.setLastSnapshotSize(totalRows)
+      }
+      val colRef = tableMetadataDesc.getColumns.find(columnDesc => 
columnsIdentity.contains(columnDesc.getIdentity)).get
+      val colIdentity = tableMetadataDesc.getIdentity + "." + colRef.getName
+      result = exec.evaluateColumnTotalFromTableDesc(tableMetadataManager, 0, 
tableMetadataDesc.getName, colIdentity)
+      assertEquals(totalRows, result)
+      totalRows += 1
+    })
+  }
+
+  private def mockTableStats(tableDesc: TableDesc, totalRows: Int): 
TableExtDesc = {
+    val tableMetadataManager: NTableMetadataManager = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv, PROJECT)
+    var tableExt: TableExtDesc = 
tableMetadataManager.getOrCreateTableExt(tableDesc)
+    tableExt = tableMetadataManager.copyForWrite(tableExt)
+    val columnStats: util.List[TableExtDesc.ColumnStats] = 
Lists.newArrayList[TableExtDesc.ColumnStats]
+    for (columnDesc <- tableDesc.getColumns) {
+      if (!columnDesc.isComputedColumn) {
+        var colStats: TableExtDesc.ColumnStats = 
tableExt.getColumnStatsByName(columnDesc.getName)
+        if (colStats == null) {
+          colStats = new TableExtDesc.ColumnStats
+          colStats.setColumnName(columnDesc.getName)
+        }
+        if ("CAL_DT" == columnDesc.getName) colStats.setCardinality(1000)
+        else if ("TRANS_ID" == columnDesc.getName) 
colStats.setCardinality(10000)
+        else colStats.setCardinality(100)
+        columnStats.add(colStats)
+      }
+    }
+    if (totalRows % 2 == 0) {
+      tableExt.setTotalRows(totalRows)
+    }
+
+    tableExt.setColumnStats(columnStats)
+    tableMetadataManager.saveTableExt(tableExt)
+    tableExt
+  }
+}
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBaseTest.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBaseTest.scala
new file mode 100644
index 0000000000..1356d19a22
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBaseTest.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job.stage.build.partition
+
+import org.apache.kylin.common.util.RandomUtil
+import org.apache.kylin.common.util.TestUtils.getTestConfig
+import org.apache.kylin.engine.spark.job.SegmentJob
+import org.apache.kylin.engine.spark.job.stage.BuildParam
+import org.apache.kylin.metadata.cube.model.{NDataflow, NDataflowManager, 
NDataflowUpdate}
+import org.apache.kylin.metadata.model.SegmentRange
+import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull}
+import org.mockito.Mockito
+import org.scalatest.PrivateMethodTester
+
+import scala.collection.JavaConverters._
+
+class PartitionFlatTableAndDictBaseTest extends SparderBaseFunSuite with 
PrivateMethodTester
+  with AdaptiveSparkPlanHelper with SharedSparkSession with LocalMetadata {
+  val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6"
+  val project = "default"
+
+  test("test materializedFactTableView") {
+    val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
project)
+    val df: NDataflow = dsMgr.getDataflow(modelId)
+    val update = new NDataflowUpdate(df.getUuid)
+    update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray)
+    dsMgr.updateDataflow(update)
+
+    val seg = dsMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getConfig).thenReturn(getTestConfig)
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    Mockito.when(segmentJob.getJobId).thenReturn(RandomUtil.randomUUIDStr())
+    Mockito.when(segmentJob.isPartialBuild).thenReturn(true)
+    val buildParam = new BuildParam()
+    val exec = new PartitionMaterializedFactTableView(segmentJob, seg, 
buildParam)
+    exec.materializedFactTableView()
+    assertNull(buildParam.getFlatTableDesc)
+    assertNotNull(buildParam.getTableDesc)
+    assertNull(buildParam.getSpanningTree)
+    assertNotNull(buildParam.getPartitionSpanningTree)
+    assertNotNull(buildParam.getFactTableDS)
+    assertNotNull(buildParam.getFastFactTableDS)
+  }
+
+  test("test materializedFactTableView with isPartialBuild{false}") {
+    val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, 
project)
+    val df: NDataflow = dsMgr.getDataflow(modelId)
+    val update = new NDataflowUpdate(df.getUuid)
+    update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray)
+    dsMgr.updateDataflow(update)
+
+    val seg = dsMgr.appendSegment(df, new 
SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L))
+    val segmentJob = Mockito.mock(classOf[SegmentJob])
+    Mockito.when(segmentJob.getConfig).thenReturn(getTestConfig)
+    Mockito.when(segmentJob.getSparkSession).thenReturn(spark)
+    Mockito.when(segmentJob.getJobId).thenReturn(RandomUtil.randomUUIDStr())
+    Mockito.when(segmentJob.isPartialBuild).thenReturn(false)
+    val buildParam = new BuildParam()
+    val exec = new PartitionMaterializedFactTableView(segmentJob, seg, 
buildParam)
+
+    exec.materializedFactTableView()
+    assertNull(buildParam.getFlatTableDesc)
+    assertNotNull(buildParam.getTableDesc)
+    assertNull(buildParam.getSpanningTree)
+    assertNotNull(buildParam.getPartitionSpanningTree)
+    assertNotNull(buildParam.getFactTableDS)
+    assertNotNull(buildParam.getFastFactTableDS)
+  }
+}


Reply via email to