This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit e39b34cb4667430134d6054123acf34eb9992daf Author: jlf <longfei.ji...@kyligence.io> AuthorDate: Fri Feb 24 22:08:06 2023 +0800 KYLIN-5531 remove redundancy code --- .../kylin/rest/service/ModelServiceBuildTest.java | 9 +- .../engine/spark/builder/PartitionFlatTable.scala | 69 --- .../engine/spark/builder/SegmentFlatTable.scala | 683 --------------------- .../engine/spark/job/RDPartitionBuildExec.scala | 35 +- .../engine/spark/job/RDSegmentBuildExec.scala | 32 +- .../kylin/engine/spark/job/RDSegmentBuildJob.java | 13 +- .../engine/spark/job/stage/build/BuildStage.scala | 5 + .../job/stage/build/FlatTableAndDictBase.scala | 99 ++- .../stage/build/MaterializedFactTableView.scala | 24 +- .../partition/PartitionFlatTableAndDictBase.scala | 22 + .../PartitionMaterializedFactTableView.scala | 30 +- .../spark/smarter/IndexDependencyParser.scala | 16 +- .../spark/builder/TestDimensionTableStat.scala | 13 +- .../engine/spark/builder/TestFlatTable.scala} | 41 +- .../engine/spark/builder/TestInferFilters.scala | 18 +- .../spark/builder/TestSegmentFlatTable.scala | 35 +- .../engine/spark/job/TestRDSegmentBuildExec.scala | 111 ++++ .../PartitionFlatTableAndDictBaseTest.scala | 88 +++ 18 files changed, 394 insertions(+), 949 deletions(-) diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java index cb2896da8f..78ba0133e9 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java @@ -19,7 +19,6 @@ package org.apache.kylin.rest.service; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CONCURRENT_SUBMIT_LIMIT; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_FAIL; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_ABANDON; import static org.apache.kylin.common.exception.code.ErrorCodeServer.JOB_CREATE_CHECK_MULTI_PARTITION_DUPLICATE; import static org.apache.kylin.common.exception.code.ErrorCodeServer.SEGMENT_LOCKED; @@ -505,7 +504,6 @@ public class ModelServiceBuildTest extends SourceTestCase { } @Test - @Ignore("Sorry, I don't know") public void testBuildSegmentsManually_NoPartition_FullSegExisted() throws Exception { val modelId = "89af4ee2-2cdb-4b07-b39e-4c29856309aa"; val project = "default"; @@ -527,13 +525,14 @@ public class ModelServiceBuildTest extends SourceTestCase { modelService.updateDataModelSemantic(project, request); try { modelBuildService.buildSegmentsManually("default", "89af4ee2-2cdb-4b07-b39e-4c29856309aa", "", ""); + Assert.fail(); } catch (TransactionException exception) { Assert.assertTrue(exception.getCause() instanceof KylinException); - Assert.assertEquals(JOB_CREATE_CHECK_FAIL.getErrorMsg().getLocalizedString(), - exception.getCause().getMessage()); + Assert.assertEquals(SEGMENT_STATUS.getErrorMsg().getCodeString(), + ((KylinException) exception.getCause()).getErrorCode().getCodeString()); } val executables = getRunningExecutables(project, modelId); - Assert.assertEquals(2, executables.size()); + Assert.assertEquals(1, executables.size()); Assert.assertTrue(((NSparkCubingJob) executables.get(0)).getHandler() instanceof ExecutableAddCuboidHandler); } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/PartitionFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/PartitionFlatTable.scala deleted file mode 100644 index 76c386d8c2..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/PartitionFlatTable.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.builder - -import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc -import org.apache.commons.lang3.StringUtils -import org.apache.spark.sql.{Dataset, Row, SparkSession} - -import java.util.Objects -import java.{lang, util} -import scala.collection.JavaConverters._ - -class PartitionFlatTable(private val sparkSession: SparkSession, // - private val tableDesc: PartitionFlatTableDesc) extends SegmentFlatTable(sparkSession, tableDesc) { - - override protected def applyPartitionDesc(originDS: Dataset[Row]): Dataset[Row] = { - // Multi level partition. - val descMLP = dataModel.getMultiPartitionDesc - require(Objects.nonNull(descMLP)) - // Date range partition. - val descDRP = dataModel.getPartitionDesc - val condition = descMLP.getPartitionConditionBuilder - .buildMultiPartitionCondition(descDRP, descMLP, // - new util.LinkedList[lang.Long](tableDesc.getPartitions), null, segmentRange) - if (StringUtils.isBlank(condition)) { - logInfo(s"Segment $segmentId no available partition condition.") - return originDS - } - logInfo(s"Segment $segmentId apply partition condition $condition.") - originDS.where(condition) - } - - def getPartitionDS(partitionId: Long): Dataset[Row] = { - val columnIds = tableDesc.getColumnIds.asScala - val columnName2Id = tableDesc.getColumns // - .asScala // - .map(column => column.getIdentity) // - .zip(columnIds) // - val column2IdMap = columnName2Id.toMap - - val partitionColumnIds = dataModel.getMultiPartitionDesc.getColumnRefs.asScala // - .map(_.getIdentity).map(x => column2IdMap.apply(x)) - val values = dataModel.getMultiPartitionDesc.getPartitionInfo(partitionId).getValues.toSeq - - val converted = partitionColumnIds.zip(values).map { case (k, v) => - s"`$k` = '$v'" - }.mkString(" and ") - - logInfo(s"Segment $segmentId single partition condition: $converted") - FLAT_TABLE.where(converted) - } - -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala deleted file mode 100644 index e6e41fd9b9..0000000000 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SegmentFlatTable.scala +++ /dev/null @@ -1,683 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.builder - -import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.{Locale, Objects, Timer, TimerTask} - -import org.apache.commons.lang3.StringUtils -import org.apache.kylin.common.constant.LogConstant -import org.apache.kylin.common.logging.SetLogCategory -import org.apache.kylin.common.util.HadoopUtil -import org.apache.kylin.common.{CustomUtils, KapConfig, KylinConfig} -import org.apache.kylin.engine.spark.builder.DFBuilderHelper._ -import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ -import org.apache.kylin.engine.spark.job.{FiltersUtil, TableMetaManager} -import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc -import org.apache.kylin.engine.spark.utils.LogEx -import org.apache.kylin.engine.spark.utils.SparkDataSource._ -import org.apache.kylin.metadata.cube.model.NDataSegment -import org.apache.kylin.metadata.model._ -import org.apache.kylin.query.util.PushDownUtil -import org.apache.spark.sql._ -import org.apache.spark.sql.functions.{col, expr} -import org.apache.spark.sql.manager.SparderLookupManager -import org.apache.spark.sql.types.StructField -import org.apache.spark.sql.util.SparderTypeUtil -import org.apache.spark.utils.ProxyThreadUtils - -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.duration.{Duration, MILLISECONDS} -import scala.concurrent.forkjoin.ForkJoinPool -import scala.util.{Failure, Success, Try} - -import com.google.common.collect.Sets - -class SegmentFlatTable(private val sparkSession: SparkSession, // - private val tableDesc: SegmentFlatTableDesc) extends LogEx { - - import SegmentFlatTable._ - - protected final val project = tableDesc.getProject - protected final val spanningTree = tableDesc.getSpanningTree - protected final val dataSegment = tableDesc.getDataSegment - protected final val dataModel = tableDesc.getDataModel - protected final val indexPlan = tableDesc.getIndexPlan - protected final val segmentRange = tableDesc.getSegmentRange - protected lazy final val flatTablePath = tableDesc.getFlatTablePath - protected lazy final val factTableViewPath = tableDesc.getFactTableViewPath - protected lazy final val workingDir = tableDesc.getWorkingDir - protected lazy final val sampleRowCount = tableDesc.getSampleRowCount - - protected final val segmentId = dataSegment.getId - - protected lazy final val FLAT_TABLE = generateFlatTable() - // flat-table without dict columns - private lazy final val FLAT_TABLE_PART = generateFlatTablePart() - - protected val rootFactTable: TableRef = dataModel.getRootFactTable - - // Flat table. - private lazy val shouldPersistFT = tableDesc.shouldPersistFlatTable() - private lazy val isFTReady = dataSegment.isFlatTableReady && tableDesc.buildFilesSeparationPathExists(flatTablePath) - - // Fact table view. - private lazy val isFTV = rootFactTable.getTableDesc.isView - private lazy val shouldPersistFTV = tableDesc.shouldPersistView() - private lazy val isFTVReady = dataSegment.isFactViewReady && HadoopUtil.getWorkingFileSystem.exists(factTableViewPath) - - private lazy val needJoin = { - val join = tableDesc.shouldJoinLookupTables - logInfo(s"Segment $segmentId flat table need join: $join") - join - } - - protected lazy val factTableDS: Dataset[Row] = newFactTableDS() - private lazy val fastFactTableDS = newFastFactTableDS() - - // By design, COMPUTED-COLUMN could only be defined on fact table. - protected lazy val factTableCCs: Set[TblColRef] = rootFactTable.getColumns.asScala - .filter(_.getColumnDesc.isComputedColumn) - .toSet - - def getFlatTablePartDS: Dataset[Row] = { - FLAT_TABLE_PART - } - - def getFlatTableDS: Dataset[Row] = { - FLAT_TABLE - } - - def gatherStatistics(): Statistics = { - val stepDesc = s"Segment $segmentId collect flat table statistics." - logInfo(stepDesc) - sparkSession.sparkContext.setJobDescription(stepDesc) - val statistics = gatherStatistics(FLAT_TABLE) - logInfo(s"Segment $segmentId collect flat table statistics $statistics.") - sparkSession.sparkContext.setJobDescription(null) - statistics - } - - protected def generateFlatTablePart(): Dataset[Row] = { - val recoveredDS = tryRecoverFTDS() - if (recoveredDS.nonEmpty) { - return recoveredDS.get - } - var flatTableDS = if (needJoin) { - val lookupTableDSMap = generateLookupTables() - if (inferFiltersEnabled) { - FiltersUtil.initFilters(tableDesc, lookupTableDSMap) - } - val jointDS = joinFactTableWithLookupTables(fastFactTableDS, lookupTableDSMap, dataModel, sparkSession) - concatCCs(jointDS, factTableCCs) - } else { - concatCCs(fastFactTableDS, factTableCCs) - } - flatTableDS = applyFilterCondition(flatTableDS) - changeSchemeToColumnId(flatTableDS, tableDesc) - } - - protected def generateFlatTable(): Dataset[Row] = { - val recoveredDS = tryRecoverFTDS() - if (recoveredDS.nonEmpty) { - return recoveredDS.get - } - - /** - * If need to build and encode dict columns, then - * 1. try best to build in fact-table. - * 2. try best to build in lookup-tables (without cc dict). - * 3. try to build in fact-table. - * - * CC in lookup-tables MUST be built in flat-table. - */ - val (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc) = prepareForDict() - val factTable = buildDictIfNeed(factTableDS, dictCols, encodeCols) - - var flatTable = if (needJoin) { - - val lookupTables = generateLookupTables() - .map(lookupTableMap => - (lookupTableMap._1, buildDictIfNeed(lookupTableMap._2, dictColsWithoutCc, encodeColsWithoutCc))) - if (lookupTables.nonEmpty) { - generateLookupTableMeta(project, lookupTables) - } - if (inferFiltersEnabled) { - FiltersUtil.initFilters(tableDesc, lookupTables) - } - - val jointTable = joinFactTableWithLookupTables(factTable, lookupTables, dataModel, sparkSession) - buildDictIfNeed(concatCCs(jointTable, factTableCCs), - selectColumnsNotInTables(factTable, lookupTables.values.toSeq, dictCols), - selectColumnsNotInTables(factTable, lookupTables.values.toSeq, encodeCols)) - } else { - factTable - } - - DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => copied.setDictReady(true)) - - flatTable = applyFilterCondition(flatTable) - flatTable = changeSchemeToColumnId(flatTable, tableDesc) - tryPersistFTDS(flatTable) - } - - private def prepareForDict(): (Set[TblColRef], Set[TblColRef], Set[TblColRef], Set[TblColRef]) = { - val dictCols = DictionaryBuilderHelper.extractTreeRelatedGlobalDictToBuild(dataSegment, spanningTree.getIndices).asScala.toSet - val encodeCols = DictionaryBuilderHelper.extractTreeRelatedGlobalDicts(dataSegment, spanningTree.getIndices).asScala.toSet - val dictColsWithoutCc = dictCols.filter(!_.getColumnDesc.isComputedColumn) - val encodeColsWithoutCc = encodeCols.filter(!_.getColumnDesc.isComputedColumn) - (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc) - } - - private def newFastFactTableDS(): Dataset[Row] = { - val partDS = newPartitionedFTDS(needFast = true) - fulfillDS(partDS, factTableCCs, rootFactTable) - } - - private def newFactTableDS(): Dataset[Row] = { - val partDS = newPartitionedFTDS() - fulfillDS(partDS, factTableCCs, rootFactTable) - } - - private def newPartitionedFTDS(needFast: Boolean = false): Dataset[Row] = { - if (isFTVReady) { - logInfo(s"Skip FACT-TABLE-VIEW segment $segmentId.") - return sparkSession.read.parquet(factTableViewPath.toString) - } - val tableDS = newTableDS(rootFactTable) - val partDS = applyPartitionDesc(tableDS) - if (needFast || !isFTV) { - return partDS - } - tryPersistFTVDS(partDS) - } - - protected def generateLookupTables(): mutable.LinkedHashMap[JoinTableDesc, Dataset[Row]] = { - val ret = mutable.LinkedHashMap[JoinTableDesc, Dataset[Row]]() - val antiFlattenTableSet = mutable.Set[String]() - dataModel.getJoinTables.asScala - .filter(isTableToBuild) - .foreach { joinDesc => - val fkTableRef = joinDesc.getJoin.getFKSide - if (fkTableRef == null) { - throw new IllegalArgumentException("FK table cannot be null") - } - val fkTable = fkTableRef.getTableDesc.getIdentity - if (!joinDesc.isFlattenable || antiFlattenTableSet.contains(fkTable)) { - antiFlattenTableSet.add(joinDesc.getTable) - } - if (joinDesc.isFlattenable && !antiFlattenTableSet.contains(joinDesc.getTable)) { - val tableRef = joinDesc.getTableRef - val tableDS = newTableDS(tableRef) - ret.put(joinDesc, fulfillDS(tableDS, Set.empty, tableRef)) - } - } - ret - } - - private def isTableToBuild(joinDesc: JoinTableDesc): Boolean = { - !tableDesc.isPartialBuild || (tableDesc.isPartialBuild && tableDesc.getRelatedTables.contains(joinDesc.getAlias)) - } - - protected def applyPartitionDesc(originDS: Dataset[Row]): Dataset[Row] = { - // Date range partition. - val descDRP = dataModel.getPartitionDesc - if (Objects.isNull(descDRP) // - || Objects.isNull(descDRP.getPartitionDateColumn) // - || Objects.isNull(segmentRange) // - || segmentRange.isInfinite) { - logInfo(s"No available PARTITION-CONDITION segment $segmentId") - return originDS - } - - val condition = descDRP.getPartitionConditionBuilder // - .buildDateRangeCondition(descDRP, null, segmentRange) - logInfo(s"Apply PARTITION-CONDITION $condition segment $segmentId") - originDS.where(condition) - } - - private def applyFilterCondition(originDS: Dataset[Row]): Dataset[Row] = { - if (StringUtils.isBlank(dataModel.getFilterCondition)) { - logInfo(s"No available FILTER-CONDITION segment $segmentId") - return originDS - } - val expression = PushDownUtil.massageExpression(dataModel, project, dataModel.getFilterCondition, null) - val converted = replaceDot(expression, dataModel) - val condition = s" (1=1) AND ($converted)" - logInfo(s"Apply FILTER-CONDITION: $condition segment $segmentId") - originDS.where(condition) - } - - private def tryPersistFTVDS(tableDS: Dataset[Row]): Dataset[Row] = { - if (!shouldPersistFTV) { - return tableDS - } - logInfo(s"Persist FACT-TABLE-VIEW $factTableViewPath") - sparkSession.sparkContext.setJobDescription("Persist FACT-TABLE-VIEW.") - tableDS.write.mode(SaveMode.Overwrite).parquet(factTableViewPath.toString) - // Checkpoint fact table view. - DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => copied.setFactViewReady(true)) - val newDS = sparkSession.read.parquet(factTableViewPath.toString) - sparkSession.sparkContext.setJobDescription(null) - newDS - } - - private def tryPersistFTDS(tableDS: Dataset[Row]): Dataset[Row] = { - if (!shouldPersistFT) { - return tableDS - } - if (tableDS.schema.isEmpty) { - logInfo("No available flat table schema.") - return tableDS - } - logInfo(s"Segment $segmentId persist flat table: $flatTablePath") - sparkSession.sparkContext.setJobDescription(s"Segment $segmentId persist flat table.") - val coalescePartitionNum = tableDesc.getFlatTableCoalescePartitionNum - if (coalescePartitionNum > 0) { - logInfo(s"Segment $segmentId flat table coalesce partition num $coalescePartitionNum") - tableDS.coalesce(coalescePartitionNum) // - .write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString) - } else { - tableDS.write.mode(SaveMode.Overwrite).parquet(flatTablePath.toString) - } - DFBuilderHelper.checkPointSegment(dataSegment, (copied: NDataSegment) => { - copied.setFlatTableReady(true) - if (dataSegment.isFlatTableReady) { - // KE-14714 if flat table is updated, there might be some data inconsistency across indexes - copied.setStatus(SegmentStatusEnum.WARNING) - } - }) - val newDS = sparkSession.read.parquet(flatTablePath.toString) - sparkSession.sparkContext.setJobDescription(null) - newDS - } - - private def tryRecoverFTDS(): Option[Dataset[Row]] = { - if (tableDesc.isPartialBuild) { - logInfo(s"Segment $segmentId no need reuse flat table for partial build.") - return None - } else if (!isFTReady) { - logInfo(s"Segment $segmentId no available flat table.") - return None - } - // +----------+---+---+---+---+-----------+-----------+ - // | 0| 2| 3| 4| 1|2_KE_ENCODE|4_KE_ENCODE| - // +----------+---+---+---+---+-----------+-----------+ - val tableDS: DataFrame = Try(sparkSession.read.parquet(flatTablePath.toString)) match { - case Success(df) => df - case Failure(f) => - logInfo(s"Handled AnalysisException: Unable to infer schema for Parquet. Flat table path $flatTablePath is empty.", f) - sparkSession.emptyDataFrame - } - // ([2_KE_ENCODE,4_KE_ENCODE], [0,1,2,3,4]) - val (coarseEncodes, noneEncodes) = tableDS.schema.map(sf => sf.name).partition(_.endsWith(ENCODE_SUFFIX)) - val encodes = coarseEncodes.map(_.stripSuffix(ENCODE_SUFFIX)) - - val noneEncodesFieldMap: Map[String, StructField] = tableDS.schema.map(_.name) - .zip(tableDS.schema.fields) - .filter(p => noneEncodes.contains(p._1)) - .toMap - - val nones = tableDesc.getColumnIds.asScala // - .zip(tableDesc.getColumns.asScala) - .map(p => (String.valueOf(p._1), p._2)) // - .filterNot(p => { - val dataType = SparderTypeUtil.toSparkType(p._2.getType) - noneEncodesFieldMap.contains(p._1) && (dataType == noneEncodesFieldMap(p._1).dataType) - }) ++ - // [xx_KE_ENCODE] - tableDesc.getMeasures.asScala // - .map(DictionaryBuilderHelper.needGlobalDict) // - .filter(Objects.nonNull) // - .map(colRef => dataModel.getColumnIdByColumnName(colRef.getIdentity)) // - .map(String.valueOf) // - .filterNot(encodes.contains) - .map(id => id + ENCODE_SUFFIX) - - if (nones.nonEmpty) { - // The previous flat table missed some columns. - // Flat table would be updated at afterwards step. - logInfo(s"Segment $segmentId update flat table, columns should have been included " + // - s"${nones.mkString("[", ",", "]")}") - return None - } - // The previous flat table could be reusable. - logInfo(s"Segment $segmentId skip build flat table.") - Some(tableDS) - } - - def newTableDS(tableRef: TableRef): Dataset[Row] = { - // By design, why not try recovering from table snapshot. - // If fact table is a view and its snapshot exists, that will benefit. - logInfo(s"Load source table ${tableRef.getTableIdentity}") - val tableDescCopy = tableRef.getTableDesc - if (tableDescCopy.isTransactional || tableDescCopy.isRangePartition) { - val model = tableRef.getModel - if (Objects.nonNull(model)) { - tableDescCopy.setPartitionDesc(model.getPartitionDesc) - } - - if (Objects.nonNull(segmentRange) && Objects.nonNull(segmentRange.getStart) && Objects.nonNull(segmentRange.getEnd)) { - sparkSession.table(tableDescCopy, segmentRange.getStart.toString, segmentRange.getEnd.toString).alias(tableRef.getAlias) - } else { - sparkSession.table(tableDescCopy).alias(tableRef.getAlias) - } - } else { - sparkSession.table(tableDescCopy).alias(tableRef.getAlias) - } - } - - protected final def gatherStatistics(tableDS: Dataset[Row]): Statistics = { - val totalRowCount = tableDS.count() - if (!shouldPersistFT) { - // By design, evaluating column bytes should be based on existed flat table. - logInfo(s"Flat table not persisted, only compute row count.") - return Statistics(totalRowCount, Map.empty[String, Long]) - } - // zipWithIndex before filter - val canonicalIndices = tableDS.columns // - .zipWithIndex // - .filterNot(_._1.endsWith(ENCODE_SUFFIX)) // - .map { case (name, index) => - val canonical = tableDesc.getCanonicalName(Integer.parseInt(name)) - (canonical, index) - }.filterNot(t => Objects.isNull(t._1)) - logInfo(s"CANONICAL INDICES ${canonicalIndices.mkString("[", ", ", "]")}") - // By design, action-take is not sampling. - val sampled = tableDS.take(sampleRowCount).flatMap(row => // - canonicalIndices.map { case (canonical, index) => // - val bytes = utf8Length(row.get(index)) - (canonical, bytes) // - }).groupBy(_._1).mapValues(_.map(_._2).sum) - val evaluated = evaluateColumnBytes(totalRowCount, sampled) - Statistics(totalRowCount, evaluated) - } - - private def evaluateColumnBytes(totalCount: Long, // - sampled: Map[String, Long]): Map[String, Long] = { - val multiple = if (totalCount < sampleRowCount) 1f else totalCount.toFloat / sampleRowCount - sampled.mapValues(bytes => (bytes * multiple).toLong) - } - - // Copied from DFChooser. - private def utf8Length(value: Any): Long = { - if (Objects.isNull(value)) { - return 0L - } - var i = 0 - var bytes = 0L - val sequence = value.toString - while (i < sequence.length) { - val c = sequence.charAt(i) - if (c <= 0x7F) bytes += 1 - else if (c <= 0x7FF) bytes += 2 - else if (Character.isHighSurrogate(c)) { - bytes += 4 - i += 1 - } - else bytes += 3 - i += 1 - } - bytes - } - - // ====================================== Dividing line, till the bottom. ====================================== // - // Historical debt. - // Waiting for reconstruction. - - protected def buildDictIfNeed(table: Dataset[Row], - dictCols: Set[TblColRef], - encodeCols: Set[TblColRef]): Dataset[Row] = { - if (dictCols.isEmpty && encodeCols.isEmpty) { - return table - } - if (dataSegment.isDictReady) { - logInfo(s"Skip DICTIONARY segment $segmentId") - } else { - // KE-32076 ensure at least one worker was registered before dictionary lock added. - waitTillWorkerRegistered() - buildDict(table, dictCols) - } - encodeColumn(table, encodeCols) - } - - def waitTillWorkerRegistered(): Unit = { - val cdl = new CountDownLatch(1) - val timer = new Timer("worker-starvation-timer", true) - timer.scheduleAtFixedRate(new TimerTask { - override def run(): Unit = { - if (sparkSession.sparkContext.statusTracker.getExecutorInfos.isEmpty) { - logWarning("Ensure at least one worker has been registered before building dictionary.") - } else { - this.cancel() - cdl.countDown() - } - } - }, 0, TimeUnit.SECONDS.toMillis(20)) - cdl.await() - timer.cancel() - } - - private def concatCCs(table: Dataset[Row], computColumns: Set[TblColRef]): Dataset[Row] = { - val matchedCols = selectColumnsInTable(table, computColumns) - var tableWithCcs = table - matchedCols.foreach(m => - tableWithCcs = tableWithCcs.withColumn(convertFromDot(m.getBackTickIdentity), - expr(convertFromDot(m.getBackTickExp)))) - tableWithCcs - } - - private def buildDict(ds: Dataset[Row], dictCols: Set[TblColRef]): Unit = { - var matchedCols = selectColumnsInTable(ds, dictCols) - if (dataSegment.getIndexPlan.isSkipEncodeIntegerFamilyEnabled) { - matchedCols = matchedCols.filterNot(_.getType.isIntegerFamily) - } - val builder = new DFDictionaryBuilder(ds, dataSegment, sparkSession, Sets.newHashSet(matchedCols.asJavaCollection)) - builder.buildDictSet() - } - - private def encodeColumn(ds: Dataset[Row], encodeCols: Set[TblColRef]): Dataset[Row] = { - val matchedCols = selectColumnsInTable(ds, encodeCols) - var encodeDs = ds - if (matchedCols.nonEmpty) { - encodeDs = DFTableEncoder.encodeTable(ds, dataSegment, matchedCols.asJava) - } - encodeDs - } - -} - -object SegmentFlatTable extends LogEx { - - import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ - - private val conf = KylinConfig.getInstanceFromEnv - var inferFiltersEnabled: Boolean = conf.inferFiltersEnabled() - - def fulfillDS(originDS: Dataset[Row], cols: Set[TblColRef], tableRef: TableRef): Dataset[Row] = { - // wrap computed columns, filter out valid columns - val computedColumns = chooseSuitableCols(originDS, cols) - // wrap alias - val newDS = wrapAlias(originDS, tableRef.getAlias) - val selectedColumns = newDS.schema.fields.map(tp => col(tp.name)) ++ computedColumns - logInfo(s"Table SCHEMA ${tableRef.getTableIdentity} ${newDS.schema.treeString}") - newDS.select(selectedColumns: _*) - } - - def wrapAlias(originDS: Dataset[Row], alias: String): Dataset[Row] = { - val newFields = originDS.schema.fields.map(f => - convertFromDot("`" + alias + "`" + "." + "`" + f.name + "`")).toSeq - val newDS = originDS.toDF(newFields: _*) - CustomUtils.tryWithResourceIgnore(new SetLogCategory(LogConstant.BUILD_CATEGORY)) { - _ => logInfo(s"Wrap ALIAS ${originDS.schema.treeString} TO ${newDS.schema.treeString}") - } - newDS - } - - - def joinFactTableWithLookupTables(rootFactDataset: Dataset[Row], - lookupTableDatasetMap: mutable.Map[JoinTableDesc, Dataset[Row]], - model: NDataModel, - ss: SparkSession): Dataset[Row] = { - lookupTableDatasetMap.foldLeft(rootFactDataset)( - (joinedDataset: Dataset[Row], tuple: (JoinTableDesc, Dataset[Row])) => - joinTableDataset(model.getRootFactTable.getTableDesc, tuple._1, joinedDataset, tuple._2, ss)) - } - - def joinTableDataset(rootFactDesc: TableDesc, - lookupDesc: JoinTableDesc, - rootFactDataset: Dataset[Row], - lookupDataset: Dataset[Row], - ss: SparkSession): Dataset[Row] = { - var afterJoin = rootFactDataset - val join = lookupDesc.getJoin - if (join != null && !StringUtils.isEmpty(join.getType)) { - val joinType = join.getType.toUpperCase(Locale.ROOT) - val pk = join.getPrimaryKeyColumns - val fk = join.getForeignKeyColumns - if (pk.length != fk.length) { - throw new RuntimeException( - s"Invalid join condition of fact table: $rootFactDesc,fk: ${fk.mkString(",")}," + - s" lookup table:$lookupDesc, pk: ${pk.mkString(",")}") - } - val equiConditionColPairs = fk.zip(pk).map(joinKey => - col(convertFromDot(joinKey._1.getBackTickIdentity)) - .equalTo(col(convertFromDot(joinKey._2.getBackTickIdentity)))) - CustomUtils.tryWithResourceIgnore(new SetLogCategory(LogConstant.BUILD_CATEGORY)) { - _ => logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}") - } - - if (join.getNonEquiJoinCondition != null) { - var condition = NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition) - if (!equiConditionColPairs.isEmpty) { - condition = condition && equiConditionColPairs.reduce(_ && _) - } - logInfo(s"Root table ${rootFactDesc.getIdentity}, join table ${lookupDesc.getAlias}, non-equi condition: ${condition.toString()}") - afterJoin = afterJoin.join(lookupDataset, condition, joinType) - } else { - val condition = equiConditionColPairs.reduce(_ && _) - logInfo(s"Root table ${rootFactDesc.getIdentity}, join table ${lookupDesc.getAlias}, condition: ${condition.toString()}") - if (inferFiltersEnabled) { - afterJoin = afterJoin.join(FiltersUtil.inferFilters(pk, lookupDataset), condition, joinType) - } else { - afterJoin = afterJoin.join(lookupDataset, condition, joinType) - } - } - } - afterJoin - } - - def changeSchemeToColumnId(ds: Dataset[Row], tableDesc: SegmentFlatTableDesc): Dataset[Row] = { - val structType = ds.schema - val columnIds = tableDesc.getColumnIds.asScala - val columnName2Id = tableDesc.getColumns - .asScala - .map(column => convertFromDot(column.getBackTickIdentity)) - .zip(columnIds) - val columnName2IdMap = columnName2Id.toMap - val encodeSeq = structType.filter(_.name.endsWith(ENCODE_SUFFIX)).map { - tp => - val columnName = tp.name.stripSuffix(ENCODE_SUFFIX) - val columnId = columnName2IdMap.apply(columnName) - col(tp.name).alias(columnId.toString + ENCODE_SUFFIX) - } - val columns = columnName2Id.map(tp => expr("`" + tp._1 + "`").alias(tp._2.toString)) - logInfo(s"Select model column is ${columns.mkString(",")}") - logInfo(s"Select model encoding column is ${encodeSeq.mkString(",")}") - val selectedColumns = columns ++ encodeSeq - - logInfo(s"Select model all column is ${selectedColumns.mkString(",")}") - ds.select(selectedColumns: _*) - } - - private def generateLookupTableMeta(project: String, - lookupTables: mutable.LinkedHashMap[JoinTableDesc, Dataset[Row]]): Unit = { - val config = KapConfig.getInstanceFromEnv - if (config.isRecordSourceUsage) { - lookupTables.keySet.foreach { joinTable => - val tableManager = NTableMetadataManager.getInstance(config.getKylinConfig, project) - val table = tableManager.getOrCreateTableExt(joinTable.getTable) - if (table.getTotalRows > 0) { - TableMetaManager.putTableMeta(joinTable.getTable, 0, table.getTotalRows) - logInfo(s"put meta table: ${joinTable.getTable}, count: ${table.getTotalRows}") - } - } - } - val noStatLookupTables = lookupTables.filterKeys(table => TableMetaManager.getTableMeta(table.getTable).isEmpty) - if (config.getKylinConfig.isNeedCollectLookupTableInfo && noStatLookupTables.nonEmpty) { - val lookupTablePar = noStatLookupTables.par - lookupTablePar.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(lookupTablePar.size)) - lookupTablePar.foreach { case (joinTableDesc, dataset) => - val tableIdentity = joinTableDesc.getTable - logTime(s"count $tableIdentity") { - val maxTime = Duration(config.getKylinConfig.getCountLookupTableMaxTime, MILLISECONDS) - val defaultCount = config.getKylinConfig.getLookupTableCountDefaultValue - val rowCount = countTableInFiniteTimeOrDefault(dataset, tableIdentity, maxTime, defaultCount) - TableMetaManager.putTableMeta(tableIdentity, 0L, rowCount) - logInfo(s"put meta table: $tableIdentity , count: $rowCount") - } - } - } - } - - def countTableInFiniteTimeOrDefault(dataset: Dataset[Row], tableName: String, - duration: Duration, defaultCount: Long): Long = { - val countTask = dataset.rdd.countAsync() - try { - ProxyThreadUtils.awaitResult(countTask, duration) - } catch { - case e: Exception => - countTask.cancel() - logInfo(s"$tableName count fail, and return defaultCount $defaultCount", e) - defaultCount - } - } - - def replaceDot(original: String, model: NDataModel): String = { - val sb = new StringBuilder(original) - - for (namedColumn <- model.getAllNamedColumns.asScala) { - val colName = namedColumn.getAliasDotColumn.toLowerCase(Locale.ROOT) - doReplaceDot(sb, colName, namedColumn.getAliasDotColumn) - - // try replacing quoted identifiers if any - val quotedColName = colName.split('.').mkString("`", "`.`", "`") - if (quotedColName.nonEmpty) { - doReplaceDot(sb, quotedColName, namedColumn.getAliasDotColumn.split('.').mkString("`", "`.`", "`")) - } - } - sb.toString() - } - - private def doReplaceDot(sb: StringBuilder, namedCol: String, colAliasDotColumn: String): Unit = { - var start = sb.toString.toLowerCase(Locale.ROOT).indexOf(namedCol) - while (start != -1) { - sb.replace(start, - start + namedCol.length, - "`" + convertFromDot(colAliasDotColumn) + "`") - start = sb.toString.toLowerCase(Locale.ROOT) - .indexOf(namedCol) - } - } - - case class Statistics(totalCount: Long, columnBytes: Map[String, Long]) - -} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala index d643cd6c05..6fe76fe447 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDPartitionBuildExec.scala @@ -19,11 +19,11 @@ package org.apache.kylin.engine.spark.job import io.kyligence.kap.guava20.shaded.common.collect.{Maps, Sets} +import io.kyligence.kap.guava20.shaded.common.collect.Maps import org.apache.hadoop.fs.Path -import org.apache.kylin.engine.spark.builder.PartitionFlatTable -import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc -import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree -import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.{PartitionTreeBuilder, PartitionTreeNode} +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.engine.spark.job.stage.build.partition.PartitionFlatTableAndDictBase +import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.PartitionTreeNode import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.spark.sql.SparderEnv import org.apache.spark.sql.datasource.storage.StorageStoreUtils @@ -32,31 +32,18 @@ import org.apache.spark.sql.hive.utils.ResourceDetectUtils import java.io.IOException import scala.collection.JavaConverters._ -class RDPartitionBuildExec(private val jobContext: RDSegmentBuildJob, // - private val dataSegment: NDataSegment) extends RDSegmentBuildExec(jobContext, dataSegment) { - - private val newBuckets = - jobContext.getReadOnlyBuckets.asScala.filter(_.getSegmentId.equals(segmentId)).toSeq - - protected final lazy val partitions = { - val distincted = newBuckets.map(_.getPartitionId).distinct.sorted - logInfo(s"Segment $segmentId partitions: ${distincted.mkString("[", ",", "]")}") - scala.collection.JavaConverters.seqAsJavaList(distincted.map(java.lang.Long.valueOf)) - } - - private lazy val spanningTree = new PartitionSpanningTree(config, // - new PartitionTreeBuilder(dataSegment, readOnlyLayouts, jobId, partitions, Sets.newHashSet(newBuckets.asJava))) - - private lazy val flatTableDesc = new PartitionFlatTableDesc(config, dataSegment, spanningTree, jobId, partitions) - - private lazy val flatTable = new PartitionFlatTable(sparkSession, flatTableDesc) +class RDPartitionBuildExec(private val jobContext: SegmentJob, // + private val dataSegment: NDataSegment, private val buildParam: BuildParam) + extends PartitionFlatTableAndDictBase(jobContext, dataSegment, buildParam) { + protected final val rdSharedPath = jobContext.getRdSharedPath @throws(classOf[IOException]) - override def detectResource(): Unit = { + def detectResource(): Unit = { + initFlatTableOnDetectResource() val flatTableExecutions = if (spanningTree.fromFlatTable()) { - Seq((-1L, Seq(flatTable.getFlatTablePartDS.queryExecution))) + Seq((-1L, Seq(getFlatTablePartDS.queryExecution))) } else { Seq.empty } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala index a2c272dec6..bfaddaa11d 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildExec.scala @@ -20,12 +20,9 @@ package org.apache.kylin.engine.spark.job import com.google.common.collect.Maps import org.apache.hadoop.fs.Path -import org.apache.kylin.engine.spark.builder.SegmentFlatTable -import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase import org.apache.kylin.metadata.cube.model.NDataSegment -import org.apache.spark.internal.Logging import org.apache.spark.sql.SparderEnv import org.apache.spark.sql.datasource.storage.StorageStoreUtils import org.apache.spark.sql.hive.utils.ResourceDetectUtils @@ -33,33 +30,20 @@ import org.apache.spark.sql.hive.utils.ResourceDetectUtils import java.io.IOException import scala.collection.JavaConverters._ -class RDSegmentBuildExec(private val jobContext: RDSegmentBuildJob, // - private val dataSegment: NDataSegment) extends Logging { +class RDSegmentBuildExec(private val jobContext: SegmentJob, // + private val dataSegment: NDataSegment, private val buildParam: BuildParam + ) + extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) { // Resource detect segment build exec. - // Needed variables from job context. - protected final val jobId = jobContext.getJobId - protected final val config = jobContext.getConfig - protected final val dataflowId = jobContext.getDataflowId - protected final val sparkSession = jobContext.getSparkSession protected final val rdSharedPath = jobContext.getRdSharedPath - protected final val readOnlyLayouts = jobContext.getReadOnlyLayouts - - // Needed variables from data segment. - protected final val segmentId = dataSegment.getId - protected final val project = dataSegment.getProject - - private lazy val spanningTree = new AdaptiveSpanningTree(config, new AdaptiveTreeBuilder(dataSegment, readOnlyLayouts)) - - private lazy val flatTableDesc = new SegmentFlatTableDesc(config, dataSegment, spanningTree) - - private lazy val flatTable = new SegmentFlatTable(sparkSession, flatTableDesc) @throws(classOf[IOException]) def detectResource(): Unit = { + initFlatTableOnDetectResource() val flatTableExecutions = if (spanningTree.fromFlatTable()) { - Seq((-1L, flatTable.getFlatTablePartDS.queryExecution)) + Seq((-1L, getFlatTablePartDS.queryExecution)) } else { Seq.empty } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java index f4809375ab..2fbf4652dc 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/RDSegmentBuildJob.java @@ -24,9 +24,14 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.engine.spark.job.LogJobInfoUtils; import org.apache.kylin.engine.spark.job.RDPartitionBuildExec; import org.apache.kylin.engine.spark.job.RDSegmentBuildExec; +import org.apache.kylin.engine.spark.job.ResourceDetect; +import org.apache.kylin.engine.spark.job.SegmentJob; +import org.apache.kylin.engine.spark.job.stage.BuildParam; import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; +import lombok.val; + public class RDSegmentBuildJob extends SegmentJob implements ResourceDetect { public static void main(String[] args) { @@ -46,14 +51,16 @@ public class RDSegmentBuildJob extends SegmentJob implements ResourceDetect { private void detectPartition() throws IOException { for (NDataSegment dataSegment : readOnlySegments) { - RDSegmentBuildExec exec = new RDPartitionBuildExec(this, dataSegment); + val buildParam = new BuildParam(); + val exec = new RDPartitionBuildExec(this, dataSegment, buildParam); exec.detectResource(); } } private void detect() throws IOException { for (NDataSegment dataSegment : readOnlySegments) { - RDSegmentBuildExec exec = new RDSegmentBuildExec(this, dataSegment); + val buildParam = new BuildParam(); + val exec = new RDSegmentBuildExec(this, dataSegment, buildParam); exec.detectResource(); } } @@ -65,6 +72,6 @@ public class RDSegmentBuildJob extends SegmentJob implements ResourceDetect { private void writeCountDistinct() { ResourceDetectUtils.write(new Path(rdSharedPath, ResourceDetectUtils.countDistinctSuffix()), // - ResourceDetectUtils.findCountDistinctMeasure(getReadOnlyLayouts())); + ResourceDetectUtils.findCountDistinctMeasure(readOnlyLayouts)); } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala index dba42cca96..2cfe5b7e2d 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/BuildStage.scala @@ -559,4 +559,9 @@ abstract class BuildStage(private val jobContext: SegmentJob, // ----------------------------- Beta feature: Inferior Flat Table. ----------------------------- // + override def getStageName: String = "" + + override def execute(): Unit = { + // parent class is empty + } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala index a8b28d283b..0599fc95ae 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/FlatTableAndDictBase.scala @@ -31,9 +31,12 @@ import org.apache.kylin.engine.spark.job.NSparkCubingUtil.convertFromDot import org.apache.kylin.engine.spark.job.stage.{BuildParam, StageExec} import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob, TableMetaManager} import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc +import org.apache.kylin.engine.spark.smarter.IndexDependencyParser import org.apache.kylin.engine.spark.model.planner.{CuboIdToLayoutUtils, FlatTableToCostUtils} import org.apache.kylin.engine.spark.utils.LogEx import org.apache.kylin.engine.spark.utils.SparkDataSource._ +import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree +import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.kylin.metadata.cube.planner.CostBasePlannerUtils import org.apache.kylin.metadata.model._ @@ -485,29 +488,7 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, total = tableSizeMap(tableName).longValue() logInfo(s"Find $column's table $tableName count $total from cache") } else { - val catalogStatistics = TableMetaManager.getTableMeta(tableDesc.getTableName(column)) - if (catalogStatistics.isDefined) { - total = catalogStatistics.get.rowCount.get.longValue() - logInfo(s"Find $column's table $tableName count $total from catalog") - } else { - val tableMetadataDesc = tableMetadataManager.getTableDesc(tableDesc.getTableName(column)) - if (tableMetadataDesc != null) { - val tableExtDesc = tableMetadataManager.getTableExtIfExists(tableMetadataDesc) - if (tableExtDesc.getTotalRows > 0) { - total = tableExtDesc.getTotalRows - logInfo(s"Find $column's table $tableName count $total from table ext") - } else if (tableMetadataDesc.getLastSnapshotPath != null) { - val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory - val fs = HadoopUtil.getWorkingFileSystem - val path = new Path(baseDir, tableMetadataDesc.getLastSnapshotPath) - if (fs.exists(path)) { - total = sparkSession.read.parquet(path.toString).count() - logInfo(s"Calculate $column's table $tableName count $total " + - s"from parquet ${tableMetadataDesc.getLastSnapshotPath}") - } - } - } - } + total = evaluateColumnTotalFromTableDesc(tableMetadataManager, totalCount, tableName, column) } } } catch { @@ -528,6 +509,29 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, } } + def evaluateColumnTotalFromTableDesc(tableMetadataManager: NTableMetadataManager, totalCount: Long, // + tableName: String, column: String): Long = { + var total: Long = totalCount + val tableMetadataDesc = tableMetadataManager.getTableDesc(tableDesc.getTableName(column)) + if (tableMetadataDesc != null) { + val tableExtDesc = tableMetadataManager.getTableExtIfExists(tableMetadataDesc) + if (tableExtDesc.getTotalRows > 0) { + total = tableExtDesc.getTotalRows + logInfo(s"Find $column's table $tableName count $total from table ext") + } else if (tableMetadataDesc.getLastSnapshotPath != null) { + val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory + val fs = HadoopUtil.getWorkingFileSystem + val path = new Path(baseDir, tableMetadataDesc.getLastSnapshotPath) + if (fs.exists(path)) { + total = sparkSession.read.parquet(path.toString).count() + logInfo(s"Calculate $column's table $tableName count $total " + + s"from parquet ${tableMetadataDesc.getLastSnapshotPath}") + } + } + } + total + } + // Copied from DFChooser. private def utf8Length(value: Any): Long = { if (Objects.isNull(value)) { @@ -648,6 +652,42 @@ abstract class FlatTableAndDictBase(private val jobContext: SegmentJob, } encodeDs } + + protected def initSpanningTree(): Unit = { + val spanTree = new AdaptiveSpanningTree(config, new AdaptiveTreeBuilder(dataSegment, readOnlyLayouts)) + buildParam.setSpanningTree(spanTree) + } + + protected def initFlatTableDesc(): Unit = { + val flatTableDesc: SegmentFlatTableDesc = if (jobContext.isPartialBuild) { + val parser = new IndexDependencyParser(dataModel) + val relatedTableAlias = + parser.getRelatedTablesAlias(jobContext.getReadOnlyLayouts) + new SegmentFlatTableDesc(config, dataSegment, spanningTree, relatedTableAlias) + } else { + new SegmentFlatTableDesc(config, dataSegment, spanningTree) + } + buildParam.setFlatTableDesc(flatTableDesc) + } + + def initFactTable(): Unit = { + val factTableDS: Dataset[Row] = newFactTableDS() + buildParam.setFactTableDS(factTableDS) + val fastFactTableDS: Dataset[Row] = newFastFactTableDS() + buildParam.setFastFactTableDS(fastFactTableDS) + } + + def materializedFactTableView(): Unit = { + initSpanningTree() + initFlatTableDesc() + initFactTable() + } + + def initFlatTableOnDetectResource(): Unit = { + materializedFactTableView() + val flatTablePart: Dataset[Row] = generateFlatTablePart() + buildParam.setFlatTablePart(flatTablePart) + } } object FlatTableAndDictBase extends LogEx { @@ -707,10 +747,7 @@ object FlatTableAndDictBase extends LogEx { logInfo(s"Lookup table schema ${lookupDataset.schema.treeString}") if (join.getNonEquiJoinCondition != null) { - var condition = NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition) - if (!equiConditionColPairs.isEmpty) { - condition = condition && equiConditionColPairs.reduce(_ && _) - } + val condition: Column = getCondition(join, equiConditionColPairs) logInfo(s"Root table ${rootFactDesc.getIdentity}, join table ${lookupDesc.getAlias}, non-equi condition: ${condition.toString()}") afterJoin = afterJoin.join(lookupDataset, condition, joinType) } else { @@ -726,6 +763,14 @@ object FlatTableAndDictBase extends LogEx { afterJoin } + def getCondition(join: JoinDesc, equiConditionColPairs: Array[Column]): Column = { + var condition = NonEquiJoinConditionBuilder.convert(join.getNonEquiJoinCondition) + if (!equiConditionColPairs.isEmpty) { + condition = condition && equiConditionColPairs.reduce(_ && _) + } + condition + } + def changeSchemeToColumnId(ds: Dataset[Row], tableDesc: SegmentFlatTableDesc): Dataset[Row] = { val structType = ds.schema val columnIds = tableDesc.getColumnIds.asScala diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala index 632ae0cd1c..19b5cd073c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala @@ -20,36 +20,14 @@ package org.apache.kylin.engine.spark.job.stage.build import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.engine.spark.job.stage.BuildParam -import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc -import org.apache.kylin.engine.spark.smarter.IndexDependencyParser -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder import org.apache.kylin.metadata.cube.model.NDataSegment -import org.apache.spark.sql.{Dataset, Row} class MaterializedFactTableView(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) { override def execute(): Unit = { logInfo(s"Build SEGMENT $segmentId") - val spanTree = new AdaptiveSpanningTree(config, new AdaptiveTreeBuilder(dataSegment, readOnlyLayouts)) - buildParam.setSpanningTree(spanTree) - - val flatTableDesc: SegmentFlatTableDesc = if (jobContext.isPartialBuild) { - val parser = new IndexDependencyParser(dataModel) - val relatedTableAlias = - parser.getRelatedTablesAlias(readOnlyLayouts) - new SegmentFlatTableDesc(config, dataSegment, spanningTree, relatedTableAlias) - } else { - new SegmentFlatTableDesc(config, dataSegment, spanningTree) - } - buildParam.setFlatTableDesc(flatTableDesc) - - val factTableDS: Dataset[Row] = newFactTableDS() - buildParam.setFactTableDS(factTableDS) - - val fastFactTableDS: Dataset[Row] = newFastFactTableDS() - buildParam.setFastFactTableDS(fastFactTableDS) + materializedFactTableView() if (buildParam.isSkipMaterializedFactTableView) { onStageSkipped() } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala index 4b9a7bdfc0..af1d8251c7 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBase.scala @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark.job.stage.build.partition +import io.kyligence.kap.guava20.shaded.common.collect.Sets import org.apache.commons.lang3.StringUtils import org.apache.kylin.engine.spark.builder.{DictionaryBuilderHelper, PartitionDictionaryBuilderHelper} import org.apache.kylin.engine.spark.job.stage.BuildParam @@ -25,6 +26,9 @@ import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase.Statistics import org.apache.kylin.engine.spark.job.{PartitionExec, SegmentJob} import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc +import org.apache.kylin.engine.spark.smarter.IndexDependencyParser +import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree +import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.PartitionTreeBuilder import org.apache.kylin.metadata.cube.model.NDataSegment import org.apache.kylin.metadata.model.TblColRef import org.apache.spark.sql.{Dataset, Row} @@ -99,4 +103,22 @@ abstract class PartitionFlatTableAndDictBase(private val jobContext: SegmentJob, val encodeColsWithoutCc = encodeCols.filter(!_.getColumnDesc.isComputedColumn) (dictCols, encodeCols, dictColsWithoutCc, encodeColsWithoutCc) } + + override def initSpanningTree(): Unit = { + val spanTree = new PartitionSpanningTree(config, // + new PartitionTreeBuilder(dataSegment, readOnlyLayouts, jobId, partitions, Sets.newHashSet(newBuckets.asJava))) + buildParam.setPartitionSpanningTree(spanTree) + } + + override def initFlatTableDesc(): Unit = { + val tableDesc = if (jobContext.isPartialBuild) { + val parser = new IndexDependencyParser(dataModel) + val relatedTableAlias = + parser.getRelatedTablesAlias(jobContext.getReadOnlyLayouts) + new PartitionFlatTableDesc(config, dataSegment, spanningTree, relatedTableAlias, jobId, partitions) + } else { + new PartitionFlatTableDesc(config, dataSegment, spanningTree, jobId, partitions) + } + buildParam.setTableDesc(tableDesc) + } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala index deb1c604bb..3f04461b15 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionMaterializedFactTableView.scala @@ -20,15 +20,7 @@ package org.apache.kylin.engine.spark.job.stage.build.partition import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.engine.spark.job.stage.BuildParam -import org.apache.kylin.engine.spark.model.PartitionFlatTableDesc -import org.apache.kylin.engine.spark.smarter.IndexDependencyParser -import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree -import org.apache.kylin.metadata.cube.cuboid.PartitionSpanningTree.PartitionTreeBuilder import org.apache.kylin.metadata.cube.model.NDataSegment -import org.apache.kylin.metadata.job.JobBucket -import org.apache.spark.sql.{Dataset, Row} - -import java.util.stream.Collectors class PartitionMaterializedFactTableView(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) extends PartitionFlatTableAndDictBase(jobContext, dataSegment, buildParam) { @@ -36,27 +28,7 @@ class PartitionMaterializedFactTableView(jobContext: SegmentJob, dataSegment: ND override def execute(): Unit = { logInfo(s"Build SEGMENT $segmentId") - val spanTree = new PartitionSpanningTree(config, - new PartitionTreeBuilder(dataSegment, readOnlyLayouts, jobId, partitions, - jobContext.getReadOnlyBuckets.stream.filter(_.getSegmentId.equals(segmentId)).collect(Collectors.toSet[JobBucket]))) - buildParam.setPartitionSpanningTree(spanTree) - - val tableDesc = if (jobContext.isPartialBuild) { - val parser = new IndexDependencyParser(dataModel) - val relatedTableAlias = - parser.getRelatedTablesAlias(readOnlyLayouts) - new PartitionFlatTableDesc(config, dataSegment, spanTree, relatedTableAlias, jobId, partitions) - } else { - new PartitionFlatTableDesc(config, dataSegment, spanTree, jobId, partitions) - } - buildParam.setTableDesc(tableDesc) - - val factTableDS: Dataset[Row] = newFactTableDS() - buildParam.setFactTableDS(factTableDS) - - val fastFactTableDS: Dataset[Row] = newFastFactTableDS() - buildParam.setFastFactTableDS(fastFactTableDS) - + materializedFactTableView() if (buildParam.isSkipMaterializedFactTableView) { onStageSkipped() } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala index e1335978fc..04234294c9 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/smarter/IndexDependencyParser.scala @@ -17,25 +17,23 @@ */ package org.apache.kylin.engine.spark.smarter -import java.util -import java.util.Collections - +import com.google.common.collect.{Lists, Maps, Sets} import org.apache.commons.collections.CollectionUtils import org.apache.commons.lang3.StringUtils -import org.apache.kylin.engine.spark.builder.SegmentFlatTable import org.apache.kylin.engine.spark.job.NSparkCubingUtil +import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase import org.apache.kylin.metadata.cube.model.LayoutEntity -import org.apache.kylin.metadata.model.{FunctionDesc, JoinTableDesc, NDataModel, TableRef, TblColRef} +import org.apache.kylin.metadata.model._ import org.apache.kylin.query.util.PushDownUtil import org.apache.spark.sql.execution.utils.SchemaProcessor import org.apache.spark.sql.types.StructField import org.apache.spark.sql.{Dataset, Row, SparderEnv, SparkSession} +import java.util +import java.util.Collections import scala.collection.JavaConverters._ import scala.collection.mutable -import com.google.common.collect.{Lists, Maps, Sets} - class IndexDependencyParser(val model: NDataModel) { private val ccTableNameAliasMap = Maps.newHashMap[String, util.Set[String]] @@ -113,7 +111,7 @@ class IndexDependencyParser(val model: NDataModel) { model.getJoinTables.asScala.map((joinTable: JoinTableDesc) => { joinTableDFMap.put(joinTable, generateDatasetOnTable(ss, joinTable.getTableRef)) }) - val df = SegmentFlatTable.joinFactTableWithLookupTables(rootDF, joinTableDFMap, model, ss) + val df = FlatTableAndDictBase.joinFactTableWithLookupTables(rootDF, joinTableDFMap, model, ss) val filterCondition = model.getFilterCondition if (StringUtils.isNotEmpty(filterCondition)) { val massagedCondition = PushDownUtil.massageExpression(model, model.getProject, filterCondition, null) @@ -127,7 +125,7 @@ class IndexDependencyParser(val model: NDataModel) { val structType = SchemaProcessor.buildSchemaWithRawTable(tableCols) val alias = tableRef.getAlias val dataset = ss.createDataFrame(Lists.newArrayList[Row], structType).alias(alias) - SegmentFlatTable.wrapAlias(dataset, alias) + FlatTableAndDictBase.wrapAlias(dataset, alias) } private def initTableNames(): Unit = { diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala index 0a16a971f0..b4ee2087a4 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestDimensionTableStat.scala @@ -19,13 +19,14 @@ package org.apache.kylin.engine.spark.builder import org.apache.kylin.common.KylinConfig -import org.apache.kylin.engine.spark.job.TableMetaManager -import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.engine.spark.job.{SegmentJob, TableMetaManager} import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder import org.apache.kylin.metadata.cube.model._ import org.apache.kylin.metadata.model.SegmentRange import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} +import org.mockito.Mockito import scala.collection.JavaConverters._ @@ -54,9 +55,11 @@ class TestDimensionTableStat extends SparderBaseFunSuite with SharedSparkSession val seg = dfMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts)) - val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, toBuildTree) - val flatTable = new SegmentFlatTable(spark, flatTableDesc) - flatTable.getFlatTableDS + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + val buildParam = new BuildParam() + new TestFlatTable(segmentJob, seg, buildParam).test(getTestConfig, toBuildTree) + df.getModel.getJoinTables.asScala.foreach { joinTable => val dimCount = TableMetaManager.getTableMeta(joinTable.getTable).get.rowCount.get diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestFlatTable.scala similarity index 57% copy from src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala copy to src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestFlatTable.scala index 632ae0cd1c..61cbaf61b0 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/MaterializedFactTableView.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestFlatTable.scala @@ -16,44 +16,37 @@ * limitations under the License. */ -package org.apache.kylin.engine.spark.job.stage.build +package org.apache.kylin.engine.spark.builder +import org.apache.kylin.common.KylinConfig import org.apache.kylin.engine.spark.job.SegmentJob import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc -import org.apache.kylin.engine.spark.smarter.IndexDependencyParser import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder import org.apache.kylin.metadata.cube.model.NDataSegment +import org.apache.kylin.metadata.model.TableRef import org.apache.spark.sql.{Dataset, Row} -class MaterializedFactTableView(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) +class TestFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: BuildParam) extends FlatTableAndDictBase(jobContext, dataSegment, buildParam) { - - override def execute(): Unit = { - logInfo(s"Build SEGMENT $segmentId") - val spanTree = new AdaptiveSpanningTree(config, new AdaptiveTreeBuilder(dataSegment, readOnlyLayouts)) - buildParam.setSpanningTree(spanTree) - - val flatTableDesc: SegmentFlatTableDesc = if (jobContext.isPartialBuild) { - val parser = new IndexDependencyParser(dataModel) - val relatedTableAlias = - parser.getRelatedTablesAlias(readOnlyLayouts) - new SegmentFlatTableDesc(config, dataSegment, spanningTree, relatedTableAlias) - } else { - new SegmentFlatTableDesc(config, dataSegment, spanningTree) - } + def test(kylinConfig: KylinConfig, toBuildTree: AdaptiveSpanningTree): Unit = { + buildParam.setSpanningTree(toBuildTree) + val flatTableDesc = new SegmentFlatTableDesc(kylinConfig, dataSegment, toBuildTree) buildParam.setFlatTableDesc(flatTableDesc) - val factTableDS: Dataset[Row] = newFactTableDS() buildParam.setFactTableDS(factTableDS) - val fastFactTableDS: Dataset[Row] = newFastFactTableDS() buildParam.setFastFactTableDS(fastFactTableDS) - if (buildParam.isSkipMaterializedFactTableView) { - onStageSkipped() - } + val dict: Dataset[Row] = buildDictIfNeed() + buildParam.setDict(dict) + val flatTable: Dataset[Row] = generateFlatTable() + buildParam.setFlatTable(flatTable) + val flatTablePart: Dataset[Row] = generateFlatTablePart() + buildParam.setFlatTablePart(flatTablePart) } - override def getStageName: String = "MaterializedFactTableView" + def testNewTableDS(ref: TableRef): Dataset[Row] = { + newTableDS(ref) + } } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala index 561521127d..100246e112 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestInferFilters.scala @@ -19,8 +19,9 @@ package org.apache.kylin.engine.spark.builder import org.apache.kylin.common.KylinConfig -import org.apache.kylin.engine.spark.job.FiltersUtil -import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.engine.spark.job.stage.build.FlatTableAndDictBase +import org.apache.kylin.engine.spark.job.{FiltersUtil, SegmentJob} import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder import org.apache.kylin.metadata.cube.model._ @@ -30,6 +31,7 @@ import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBa import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.{FilterExec, SparkPlan} import org.junit.Assert +import org.mockito.Mockito import scala.collection.JavaConverters._ import scala.collection.mutable.Set @@ -46,11 +48,11 @@ class TestInferFilters extends SparderBaseFunSuite with AdaptiveSparkPlanHelper } override def beforeEach(): Unit = { - SegmentFlatTable.inferFiltersEnabled = true + FlatTableAndDictBase.inferFiltersEnabled = true } override def afterEach(): Unit = { - SegmentFlatTable.inferFiltersEnabled = false + FlatTableAndDictBase.inferFiltersEnabled = false } test("infer filters from join desc") { @@ -64,10 +66,12 @@ class TestInferFilters extends SparderBaseFunSuite with AdaptiveSparkPlanHelper val seg = dsMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts)) - val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, toBuildTree) - val flatTable = new SegmentFlatTable(spark, flatTableDesc) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + val buildParam = new BuildParam() + new TestFlatTable(segmentJob, seg, buildParam).test(getTestConfig, toBuildTree) - val filters = getFilterPlan(flatTable.getFlatTableDS.queryExecution.executedPlan) + val filters = getFilterPlan(buildParam.getFlatTable.queryExecution.executedPlan) Assert.assertTrue(Set("EDW.TEST_CAL_DT.CAL_DT", "DEFAULT.TEST_KYLIN_FACT.CAL_DT", "DEFAULT.TEST_ORDER.TEST_DATE_ENC").subsetOf(FiltersUtil.getAllEqualColSets)) diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala index fda8cc0248..8cf81315c3 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/builder/TestSegmentFlatTable.scala @@ -19,9 +19,8 @@ package org.apache.kylin.engine.spark.builder import org.apache.kylin.common.KylinConfig -import org.apache.kylin.engine.spark.model.SegmentFlatTableDesc -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree -import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree.AdaptiveTreeBuilder +import org.apache.kylin.engine.spark.job.SegmentJob +import org.apache.kylin.engine.spark.job.stage.BuildParam import org.apache.kylin.metadata.cube.model._ import org.apache.kylin.metadata.model.{SegmentRange, TableDesc, TableRef} import org.apache.spark.SparkExecutorInfo @@ -53,16 +52,17 @@ class TestSegmentFlatTable extends SparderBaseFunSuite with SharedSparkSession w dfMgr.updateDataflow(update) val seg = dfMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) - val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts)) - val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, toBuildTree) - val flatTable = new SegmentFlatTable(spark, flatTableDesc) - assert(flatTable.newTableDS(df.getModel.getAllTables.iterator().next()) != null) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + val buildParam = new BuildParam() + val testFlatTable = new TestFlatTable(segmentJob, seg, buildParam) + assert(testFlatTable.testNewTableDS(df.getModel.getAllTables.iterator().next()) != null) val tableRef: TableRef = df.getModel.getAllTables.iterator().next() val tableDesc: TableDesc = tableRef.getTableDesc tableDesc.setRangePartition(true) val ref = new TableRef(df.getModel, tableDesc.getName, tableDesc, false) - assert(flatTable.newTableDS(ref) != null) + assert(testFlatTable.testNewTableDS(ref) != null) } test("testSegmentFlatTableWithChineseAndSpecialChar") { @@ -78,16 +78,17 @@ class TestSegmentFlatTable extends SparderBaseFunSuite with SharedSparkSession w dfMgr.updateDataflow(update) val seg = dfMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) - val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts)) - val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, toBuildTree) - val flatTable = new SegmentFlatTable(spark, flatTableDesc) - assert(flatTable.newTableDS(df.getModel.getAllTables.iterator().next()) != null) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + val buildParam = new BuildParam() + val testFlatTable = new TestFlatTable(segmentJob, seg, buildParam) + assert(testFlatTable.testNewTableDS(df.getModel.getAllTables.iterator().next()) != null) val tableRef: TableRef = df.getModel.getAllTables.iterator().next() val tableDesc: TableDesc = tableRef.getTableDesc tableDesc.setRangePartition(true) val ref = new TableRef(df.getModel, tableDesc.getName, tableDesc, false) - assert(flatTable.newTableDS(ref) != null) + assert(testFlatTable.testNewTableDS(ref) != null) } test("waitTillWorkerRegistered") { @@ -106,13 +107,13 @@ class TestSegmentFlatTable extends SparderBaseFunSuite with SharedSparkSession w dfMgr.updateDataflow(update) val seg = dfMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) - val toBuildTree = new AdaptiveSpanningTree(getTestConfig, new AdaptiveTreeBuilder(seg, seg.getIndexPlan.getAllLayouts)) - val flatTableDesc = new SegmentFlatTableDesc(getTestConfig, seg, toBuildTree) - val spiedSparkSession = Mockito.spy(spark) val spiedSparkContext = Mockito.spy(spark.sparkContext) val spiedTracker = Mockito.spy(spark.sparkContext.statusTracker) - val flatTable = new SegmentFlatTable(spiedSparkSession, flatTableDesc) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + val buildParam = new BuildParam() + val flatTable = new TestFlatTable(segmentJob, seg, buildParam) Mockito.when(spiedSparkSession.sparkContext).thenReturn(spiedSparkContext) Mockito.when(spiedSparkContext.statusTracker).thenReturn(spiedTracker) Mockito.when(spiedTracker.getExecutorInfos) diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/TestRDSegmentBuildExec.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/TestRDSegmentBuildExec.scala new file mode 100644 index 0000000000..66dcfdf43b --- /dev/null +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/TestRDSegmentBuildExec.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job + +import com.google.common.collect.Lists +import org.apache.kylin.common.util.TestUtils.getTestConfig +import org.apache.kylin.common.{KapConfig, KylinConfig} +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.metadata.cube.model._ +import org.apache.kylin.metadata.model.{NTableMetadataManager, SegmentRange, TableDesc, TableExtDesc} +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.junit.jupiter.api.Assertions.assertEquals +import org.mockito.Mockito +import org.scalatest.PrivateMethodTester + +import java.util +import scala.collection.JavaConverters._ + +class TestRDSegmentBuildExec extends SparderBaseFunSuite with PrivateMethodTester + with AdaptiveSparkPlanHelper with SharedSparkSession with LocalMetadata { + private val PROJECT = "infer_filter" + private val MODEL_NAME1 = "89af4ee2-2cdb-4b07-b39e-4c29856309ab" + + test("test evaluateColumnTotalFromTableDesc") { + val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, PROJECT) + val df: NDataflow = dsMgr.getDataflow(MODEL_NAME1) + // cleanup all segments first + val update = new NDataflowUpdate(df.getUuid) + update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray) + dsMgr.updateDataflow(update) + + val seg = dsMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getConfig).thenReturn(getTestConfig) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + val buildParam = new BuildParam() + val exec = new RDSegmentBuildExec(segmentJob, seg, buildParam) + exec.initFlatTableOnDetectResource() + + val tableMetadataManager = NTableMetadataManager.getInstance(getTestConfig, PROJECT) + var result = exec.evaluateColumnTotalFromTableDesc(tableMetadataManager, 0, "", "UUID") + assertEquals(0, result) + + val flatTableDesc = buildParam.getFlatTableDesc + val columns = flatTableDesc.getColumns.asScala.toArray + var totalRows = 0 + val columnsIdentity = columns.map(col => col.getIdentity) + columns.map(colRef => tableMetadataManager.getTableDesc(flatTableDesc.getTableName(colRef.toString))) + .filter(tableMetadataDesc => tableMetadataDesc != null).distinct.foreach(tableMetadataDesc => { + val tableName = tableMetadataDesc.getName + mockTableStats(tableMetadataDesc, totalRows) + if (totalRows % 2 != 0) { + val baseDir = KapConfig.getInstanceFromEnv.getMetadataWorkingDirectory + val tmp = spark.range(totalRows) + tmp.write.mode(SaveMode.Overwrite).parquet(baseDir + tableName + ".parquet") + tableMetadataDesc.setLastSnapshotPath(tableName + ".parquet") + tableMetadataDesc.setLastSnapshotSize(totalRows) + } + val colRef = tableMetadataDesc.getColumns.find(columnDesc => columnsIdentity.contains(columnDesc.getIdentity)).get + val colIdentity = tableMetadataDesc.getIdentity + "." + colRef.getName + result = exec.evaluateColumnTotalFromTableDesc(tableMetadataManager, 0, tableMetadataDesc.getName, colIdentity) + assertEquals(totalRows, result) + totalRows += 1 + }) + } + + private def mockTableStats(tableDesc: TableDesc, totalRows: Int): TableExtDesc = { + val tableMetadataManager: NTableMetadataManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv, PROJECT) + var tableExt: TableExtDesc = tableMetadataManager.getOrCreateTableExt(tableDesc) + tableExt = tableMetadataManager.copyForWrite(tableExt) + val columnStats: util.List[TableExtDesc.ColumnStats] = Lists.newArrayList[TableExtDesc.ColumnStats] + for (columnDesc <- tableDesc.getColumns) { + if (!columnDesc.isComputedColumn) { + var colStats: TableExtDesc.ColumnStats = tableExt.getColumnStatsByName(columnDesc.getName) + if (colStats == null) { + colStats = new TableExtDesc.ColumnStats + colStats.setColumnName(columnDesc.getName) + } + if ("CAL_DT" == columnDesc.getName) colStats.setCardinality(1000) + else if ("TRANS_ID" == columnDesc.getName) colStats.setCardinality(10000) + else colStats.setCardinality(100) + columnStats.add(colStats) + } + } + if (totalRows % 2 == 0) { + tableExt.setTotalRows(totalRows) + } + + tableExt.setColumnStats(columnStats) + tableMetadataManager.saveTableExt(tableExt) + tableExt + } +} diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBaseTest.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBaseTest.scala new file mode 100644 index 0000000000..1356d19a22 --- /dev/null +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/kylin/engine/spark/job/stage/build/partition/PartitionFlatTableAndDictBaseTest.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.job.stage.build.partition + +import org.apache.kylin.common.util.RandomUtil +import org.apache.kylin.common.util.TestUtils.getTestConfig +import org.apache.kylin.engine.spark.job.SegmentJob +import org.apache.kylin.engine.spark.job.stage.BuildParam +import org.apache.kylin.metadata.cube.model.{NDataflow, NDataflowManager, NDataflowUpdate} +import org.apache.kylin.metadata.model.SegmentRange +import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.junit.jupiter.api.Assertions.{assertNotNull, assertNull} +import org.mockito.Mockito +import org.scalatest.PrivateMethodTester + +import scala.collection.JavaConverters._ + +class PartitionFlatTableAndDictBaseTest extends SparderBaseFunSuite with PrivateMethodTester + with AdaptiveSparkPlanHelper with SharedSparkSession with LocalMetadata { + val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6" + val project = "default" + + test("test materializedFactTableView") { + val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, project) + val df: NDataflow = dsMgr.getDataflow(modelId) + val update = new NDataflowUpdate(df.getUuid) + update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray) + dsMgr.updateDataflow(update) + + val seg = dsMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getConfig).thenReturn(getTestConfig) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + Mockito.when(segmentJob.getJobId).thenReturn(RandomUtil.randomUUIDStr()) + Mockito.when(segmentJob.isPartialBuild).thenReturn(true) + val buildParam = new BuildParam() + val exec = new PartitionMaterializedFactTableView(segmentJob, seg, buildParam) + exec.materializedFactTableView() + assertNull(buildParam.getFlatTableDesc) + assertNotNull(buildParam.getTableDesc) + assertNull(buildParam.getSpanningTree) + assertNotNull(buildParam.getPartitionSpanningTree) + assertNotNull(buildParam.getFactTableDS) + assertNotNull(buildParam.getFastFactTableDS) + } + + test("test materializedFactTableView with isPartialBuild{false}") { + val dsMgr: NDataflowManager = NDataflowManager.getInstance(getTestConfig, project) + val df: NDataflow = dsMgr.getDataflow(modelId) + val update = new NDataflowUpdate(df.getUuid) + update.setToRemoveSegsWithArray(df.getSegments.asScala.toArray) + dsMgr.updateDataflow(update) + + val seg = dsMgr.appendSegment(df, new SegmentRange.TimePartitionedSegmentRange(0L, 1356019200000L)) + val segmentJob = Mockito.mock(classOf[SegmentJob]) + Mockito.when(segmentJob.getConfig).thenReturn(getTestConfig) + Mockito.when(segmentJob.getSparkSession).thenReturn(spark) + Mockito.when(segmentJob.getJobId).thenReturn(RandomUtil.randomUUIDStr()) + Mockito.when(segmentJob.isPartialBuild).thenReturn(false) + val buildParam = new BuildParam() + val exec = new PartitionMaterializedFactTableView(segmentJob, seg, buildParam) + + exec.materializedFactTableView() + assertNull(buildParam.getFlatTableDesc) + assertNotNull(buildParam.getTableDesc) + assertNull(buildParam.getSpanningTree) + assertNotNull(buildParam.getPartitionSpanningTree) + assertNotNull(buildParam.getFactTableDS) + assertNotNull(buildParam.getFastFactTableDS) + } +}