This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new 5764e47 KYLIN-4660 Normalize variable and file naming 5764e47 is described below commit 5764e4797c90b3c9225c5c5c8c1d4fea27fe59a4 Author: rupengwang <wangrup...@live.cn> AuthorDate: Tue Jul 28 14:56:32 2020 +0800 KYLIN-4660 Normalize variable and file naming --- .../kylin/common/util/TempMetadataBuilder.java | 1 - .../kylin/spark/classloader/TomcatClassLoader.java | 2 +- ...ateTimeUtils.scala => KylinDateTimeUtils.scala} | 2 +- .../org/apache/spark/dict/NBucketDictionary.java | 2 +- .../spark/dict/NGlobalDictBuilderAssist.scala | 4 +-- ...balDictionaryV2.java => NGlobalDictionary.java} | 8 ++--- .../apache/spark/sql/KylinDataFrameManager.scala | 3 +- .../org/apache/spark/sql/KylinFunctions.scala | 34 +++++++++++----------- .../sql/catalyst/expressions/DictEncodeImpl.scala | 4 +-- ...pExpresssions.scala => KylinExpresssions.scala} | 32 ++++++++++---------- .../catalyst/expressions/TimestampAddImpl.scala | 2 +- .../catalyst/expressions/TimestampDiffImpl.scala | 10 +++---- .../sql/execution/KylinFileSourceScanExec.scala | 1 - .../datasource/ResetShufflePartition.scala | 6 ++-- .../engine/spark/builder/NBuildSourceInfo.java | 8 ----- .../kylin/engine/spark/job/NSparkExecutable.java | 3 +- .../kylin/engine/spark/merger/MetadataMerger.java | 2 +- .../kylin/engine/spark/source/CsvSource.java | 4 +-- .../engine/spark/builder/CreateFlatTable.scala | 6 ++-- ...BuilderHelper.scala => CubeBuilderHelper.scala} | 4 +-- ...ryBuilder.scala => CubeDictionaryBuilder.scala} | 14 ++++----- ...LayoutMergeAssist.java => CubeMergeAssist.java} | 4 +-- ...shotBuilder.scala => CubeSnapshotBuilder.scala} | 2 +- ...DFTableEncoder.scala => CubeTableEncoder.scala} | 10 +++---- .../kylin/engine/spark/builder/DictHelper.scala | 4 +-- .../spark/builder/DictionaryBuilderHelper.java | 4 +-- .../kylin/engine/spark/job/CubeBuildJob.java | 6 ++-- .../kylin/engine/spark/job/CubeMergeJob.java | 24 +++++++-------- .../kylin/engine/spark/job/CuboidAggregator.scala | 2 +- .../engine/spark/job/ParentSourceChooser.scala | 4 +-- .../spark/job/ResourceDetectBeforeMergingJob.java | 6 ++-- .../kylin/query/runtime/ExpressionConverter.scala | 4 +-- .../apache/spark/sql/udf/TimestampAddImpl.scala | 2 +- .../apache/spark/sql/udf/TimestampDiffImpl.scala | 10 +++---- .../engine/spark/LocalWithSparkSessionTest.java | 2 +- ...onaryV2Test.java => NGlobalDictionaryTest.java} | 16 +++++----- .../engine/spark/builder/TestCreateFlatTable.scala | 2 +- .../engine/spark/builder/TestGlobalDictBuild.scala | 6 ++-- .../engine/spark/builder/TestSnapshotBuilder.scala | 6 ++-- .../kylin/engine/spark/job/TestUdfManager.scala | 2 +- .../kylin/query/runtime/DerivedProcess.scala | 1 - .../kylin/query/runtime/ExpressionConverter.scala | 4 +-- .../apache/kylin/query/runtime/RuntimeHelper.scala | 1 - .../kylin/query/runtime/SparderRexVisitor.scala | 4 +-- .../kylin/query/runtime/plans/ProjectPlan.scala | 2 +- .../kylin/query/runtime/plans/ResultPlan.scala | 6 ++-- .../apache/kylin/engine/spark2/NExecAndComp.java | 28 +++++++++--------- .../spark2/file_pruning/NFilePruningTest.java | 4 +-- .../kylin/engine/spark2/utils/QueryUtil.java | 6 ++-- webapp/app/js/model/cubeConfig.js | 2 +- .../partials/cubeDesigner/advanced_settings.html | 2 +- 51 files changed, 153 insertions(+), 175 deletions(-) diff --git a/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java b/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java index 1b92532..3653bf3 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java @@ -83,7 +83,6 @@ public class TempMetadataBuilder { FileUtils.deleteQuietly(new File(dst)); - // KAP files will overwrite Kylin files for (String metaSrc : metaSrcs) { FileUtils.copyDirectory(new File(metaSrc), new File(dst)); } diff --git a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java index f826356..45007c5 100644 --- a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java +++ b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java @@ -137,7 +137,7 @@ public class TomcatClassLoader extends ParallelWebappClassLoader { if (sparkClassLoader.classNeedPreempt(name)) { return sparkClassLoader.loadClass(name); } - // tomcat classpath include KAP_HOME/lib , ensure this classload can load kap class + // tomcat classpath include KYLIN_HOME/lib , ensure this classload can load kylin class if (isParentCLPrecedent(name) && !isThisCLPrecedent(name)) { logger.debug("delegate " + name + " directly to parent"); return parent.loadClass(name); diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KapDateTimeUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala similarity index 99% rename from kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KapDateTimeUtils.scala rename to kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala index ca882a5..a5a6451 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KapDateTimeUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala @@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.common.util import org.apache.calcite.avatica.util.TimeUnitRange import org.apache.spark.sql.catalyst.util.DateTimeUtils -object KapDateTimeUtils { +object KylinDateTimeUtils { val MICROS_PER_MILLIS: Long = 1000L val MILLIS_PER_SECOND: Long = 1000L val MILLIS_PER_MINUTE: Long = MILLIS_PER_SECOND * 60L diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java index 7d08bef..bf2a351 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java @@ -27,7 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; public class NBucketDictionary { - protected static final Logger logger = LoggerFactory.getLogger(NGlobalDictionaryV2.class); + protected static final Logger logger = LoggerFactory.getLogger(NGlobalDictionary.class); private String workingDir; diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala index 64e9bad..962cb40 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala @@ -35,7 +35,7 @@ object NGlobalDictBuilderAssist extends Logging { @throws[IOException] def resize(ref: ColumnDesc, desc: SegmentInfo, bucketPartitionSize: Int, ss: SparkSession): Unit = { - val globalDict = new NGlobalDictionaryV2(desc.project, ref.tableAliasName, ref.columnName, desc.kylinconf.getHdfsWorkingDirectory) + val globalDict = new NGlobalDictionary(desc.project, ref.tableAliasName, ref.columnName, desc.kylinconf.getHdfsWorkingDirectory) val broadcastDict = ss.sparkContext.broadcast(globalDict) globalDict.prepareWrite() @@ -44,7 +44,7 @@ object NGlobalDictBuilderAssist extends Logging { val existsDictDs = ss.createDataset(0 to bucketPartitionSize) .flatMap { bucketId => - val gDict: NGlobalDictionaryV2 = broadcastDict.value + val gDict: NGlobalDictionary = broadcastDict.value val bucketDict: NBucketDictionary = gDict.loadBucketDictionary(bucketId) val tupleList = new util.ArrayList[(String, Long)](bucketDict.getAbsoluteDictMap.size) bucketDict.getAbsoluteDictMap.object2LongEntrySet.asScala diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java similarity index 94% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java rename to kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java index 5278dbf..651387d 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java @@ -24,9 +24,9 @@ import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class NGlobalDictionaryV2 implements Serializable { +public class NGlobalDictionary implements Serializable { - protected static final Logger logger = LoggerFactory.getLogger(NGlobalDictionaryV2.class); + protected static final Logger logger = LoggerFactory.getLogger(NGlobalDictionary.class); private final static String WORKING_DIR = "working"; @@ -48,7 +48,7 @@ public class NGlobalDictionaryV2 implements Serializable { return baseDir + WORKING_DIR; } - public NGlobalDictionaryV2(String project, String sourceTable, String sourceColumn, String baseDir) + public NGlobalDictionary(String project, String sourceTable, String sourceColumn, String baseDir) throws IOException { this.project = project; this.sourceTable = sourceTable; @@ -60,7 +60,7 @@ public class NGlobalDictionaryV2 implements Serializable { } } - public NGlobalDictionaryV2(String dictParams) throws IOException { + public NGlobalDictionary(String dictParams) throws IOException { String[] dictInfo = dictParams.split(SEPARATOR); this.project = dictInfo[0]; this.sourceTable = dictInfo[1]; diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala index fb90f43..153786c 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala @@ -21,7 +21,6 @@ package org.apache.spark.sql import org.apache.kylin.cube.CubeInstance import org.apache.kylin.cube.cuboid.Cuboid import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasource.FilePruner import org.apache.spark.sql.execution.datasources.HadoopFsRelation import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -68,7 +67,7 @@ class KylinDataFrameManager(sparkSession: SparkSession) { def cuboidTable(cubeInstance: CubeInstance, layout: Cuboid): DataFrame = { option("project", cubeInstance.getProject) - option("dataflowId", cubeInstance.getUuid) + option("cubeId", cubeInstance.getUuid) option("cuboidId", layout.getId) val indexCatalog = new FilePruner(cubeInstance, layout, sparkSession, options = extraOptions.toMap) sparkSession.baseRelationToDataFrame( diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala index 04d4a68..330ded2 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala @@ -17,12 +17,12 @@ */ package org.apache.spark.sql -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.types.{AbstractDataType, DataType, DateType, IntegerType} -import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, DictEncode, Expression, ExpressionInfo, ExpressionUtils, ImplicitCastInputTypes, In, KapAddMonths, Like, Literal, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, DictEncode, Expression, ExpressionInfo, ExpressionUtils, ImplicitCastInputTypes, In, KylinAddMonths, Like, Literal, RoundBase, SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, PreciseCountDistinct} @@ -33,15 +33,15 @@ object KylinFunctions { Column(func.toAggregateExpression(isDistinct)) } - def kap_add_months(startDate: Column, numMonths: Column): Column = { - Column(KapAddMonths(startDate.expr, numMonths.expr)) + def kylin_add_months(startDate: Column, numMonths: Column): Column = { + Column(KylinAddMonths(startDate.expr, numMonths.expr)) } def dict_encode(column: Column, dictParams: Column, bucketSize: Column): Column = { Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr)) } - // special lit for KE. + // special lit for KYLIN. def k_lit(literal: Any): Column = literal match { case c: Column => c case s: Symbol => new ColumnName(s.name) @@ -52,14 +52,14 @@ object KylinFunctions { def in(value: Expression, list: Seq[Expression]): Column = Column(In(value, list)) - def kap_day_of_week(date: Column): Column = Column(KapDayOfWeek(date.expr)) + def kylin_day_of_week(date: Column): Column = Column(KylinDayOfWeek(date.expr)) - def kap_truncate(column: Column, scale: Int): Column = { + def kylin_truncate(column: Column, scale: Int): Column = { Column(TRUNCATE(column.expr, Literal(scale))) } - def kap_subtract_months(date0: Column, date1: Column): Column = { - Column(KapSubtractMonths(date0.expr, date1.expr)) + def kylin_subtract_months(date0: Column, date1: Column): Column = { + Column(KylinSubtractMonths(date0.expr, date1.expr)) } def precise_count_distinct(column: Column): Column = @@ -99,7 +99,7 @@ case class TRUNCATE(child: Expression, scale: Expression) } // scalastyle:on line.size.limit -case class KapSubtractMonths(a: Expression, b: Expression) +case class KylinSubtractMonths(a: Expression, b: Expression) extends BinaryExpression with ImplicitCastInputTypes { @@ -112,21 +112,21 @@ case class KapSubtractMonths(a: Expression, b: Expression) override def dataType: DataType = IntegerType override def nullSafeEval(date0: Any, date1: Any): Any = { - KapDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int], + KylinDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int], date1.asInstanceOf[Int]) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$") + val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (d0, d1) => { s"""$dtu.dateSubtractMonths($d0, $d1)""" }) } - override def prettyName: String = "kap_months_between" + override def prettyName: String = "kylin_months_between" } -case class KapDayOfWeek(a: Expression) +case class KylinDayOfWeek(a: Expression) extends UnaryExpression with ImplicitCastInputTypes { @@ -137,19 +137,19 @@ case class KapDayOfWeek(a: Expression) override protected def doGenCode( ctx: CodegenContext, ev: ExprCode): ExprCode = { - val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$") + val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (d) => { s"""$dtu.dayOfWeek($d)""" }) } override def nullSafeEval(date: Any): Any = { - KapDateTimeUtils.dayOfWeek(date.asInstanceOf[Int]) + KylinDateTimeUtils.dayOfWeek(date.asInstanceOf[Int]) } override def dataType: DataType = IntegerType - override def prettyName: String = "kap_day_of_week" + override def prettyName: String = "kylin_day_of_week" } object FunctionEntity { diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala index 356bd6d..1df301e 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala @@ -22,7 +22,7 @@ import org.apache.spark.TaskContext import java.util import org.apache.spark.util.TaskCompletionListener -import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionaryV2} +import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary} object DictEncodeImpl { @@ -42,7 +42,7 @@ object DictEncodeImpl { private def initBucketDict(dictParams: String, bucketSize: String): NBucketDictionary = { val partitionID = TaskContext.get.partitionId val encodeBucketId = partitionID % bucketSize.toInt - val globalDict = new NGlobalDictionaryV2(dictParams) + val globalDict = new NGlobalDictionary(dictParams) val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId) DictEncodeImpl.cacheBucketDict.get.put(dictParams, cachedBucketDict) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala similarity index 93% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala rename to kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala index 573f4c4..1bb5be2 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala @@ -17,8 +17,8 @@ */ package org.apache.spark.sql.catalyst.expressions -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils -import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionaryV2} +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils +import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -37,7 +37,7 @@ import org.apache.spark.sql.types._ """ ) // scalastyle:on line.size.limit -case class KapAddMonths(startDate: Expression, numMonths: Expression) +case class KylinAddMonths(startDate: Expression, numMonths: Expression) extends BinaryExpression with ImplicitCastInputTypes { @@ -52,17 +52,17 @@ case class KapAddMonths(startDate: Expression, numMonths: Expression) override def nullSafeEval(start: Any, months: Any): Any = { val time = start.asInstanceOf[Long] val month = months.asInstanceOf[Int] - KapDateTimeUtils.addMonths(time, month) + KylinDateTimeUtils.addMonths(time, month) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$") + val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (sd, m) => { s"""$dtu.addMonths($sd, $m)""" }) } - override def prettyName: String = "kap_add_months" + override def prettyName: String = "kylin_add_months" } // Returns the date that is num_months after start_date. @@ -78,7 +78,7 @@ case class KapAddMonths(startDate: Expression, numMonths: Expression) """ ) // scalastyle:on line.size.limit -case class KapSubtractMonths(a: Expression, b: Expression) +case class KylinSubtractMonths(a: Expression, b: Expression) extends BinaryExpression with ImplicitCastInputTypes { @@ -91,18 +91,18 @@ case class KapSubtractMonths(a: Expression, b: Expression) override def dataType: DataType = IntegerType override def nullSafeEval(date0: Any, date1: Any): Any = { - KapDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int], + KylinDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int], date1.asInstanceOf[Int]) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$") + val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (d0, d1) => { s"""$dtu.dateSubtractMonths($d0, $d1)""" }) } - override def prettyName: String = "kap_months_between" + override def prettyName: String = "kylin_months_between" } import org.apache.spark.sql.catalyst.analysis.TypeCheckResult @@ -174,7 +174,7 @@ case class Sum0(child: Expression) override lazy val evaluateExpression: Expression = sum } -case class KapDayOfWeek(a: Expression) +case class KylinDayOfWeek(a: Expression) extends UnaryExpression with ImplicitCastInputTypes { @@ -185,19 +185,19 @@ case class KapDayOfWeek(a: Expression) override protected def doGenCode( ctx: CodegenContext, ev: ExprCode): ExprCode = { - val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$") + val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$") defineCodeGen(ctx, ev, (d) => { s"""$dtu.dayOfWeek($d)""" }) } override def nullSafeEval(date: Any): Any = { - KapDateTimeUtils.dayOfWeek(date.asInstanceOf[Int]) + KylinDateTimeUtils.dayOfWeek(date.asInstanceOf[Int]) } override def dataType: DataType = IntegerType - override def prettyName: String = "kap_day_of_week" + override def prettyName: String = "kylin_day_of_week" } case class TimestampAdd(left: Expression, mid: Expression, right: Expression) extends TernaryExpression with ExpectsInputTypes { @@ -293,7 +293,7 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte override protected def doGenCode( ctx: CodegenContext, ev: ExprCode): ExprCode = { - val globalDictClass = classOf[NGlobalDictionaryV2].getName + val globalDictClass = classOf[NGlobalDictionary].getName val bucketDictClass = classOf[NBucketDictionary].getName val globalDictTerm = ctx.addMutableState(globalDictClass, s"${mid.simpleString.replace("[", "").replace("]", "")}_globalDict") @@ -307,7 +307,7 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte | private void init${bucketDictTerm.replace("[", "").replace("]", "")}BucketDict(int idx) { | try { | int bucketId = idx % $bucketSizeTerm; - | $globalDictTerm = new org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm"); + | $globalDictTerm = new org.apache.spark.dict.NGlobalDictionary("$dictParamsTerm"); | $bucketDictTerm = $globalDictTerm.loadBucketDictionary(bucketId); | } catch (Exception e) { | throw new RuntimeException(e); diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala index 1a32b34..55f683c 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala @@ -17,7 +17,7 @@ */ package org.apache.spark.sql.catalyst.expressions -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils._ +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._ import java.util.{Calendar, Locale, TimeZone} import org.apache.spark.sql.catalyst.util.DateTimeUtils diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala index c0c9055..e85abe0 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Locale -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils._ -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._ +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils object TimestampDiffImpl { @@ -69,11 +69,11 @@ object TimestampDiffImpl { case "WEEK" | "SQL_TSI_WEEK" => (aMillis - bMillis) / MILLIS_PER_DAY / DAYS_PER_WEEK case "MONTH" | "SQL_TSI_MONTH" => - KapDateTimeUtils.subtractMonths(aMillis, bMillis) + KylinDateTimeUtils.subtractMonths(aMillis, bMillis) case "QUARTER" | "SQL_TSI_QUARTER" => - KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER + KylinDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER case "YEAR" | "SQL_TSI_YEAR" => - KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER / QUARTERS_PER_YEAR + KylinDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER / QUARTERS_PER_YEAR case _ => throw new IllegalArgumentException(s"Illegal unit: $unit," + s" only support [YEAR, SQL_TSI_YEAR, QUARTER, SQL_TSI_QUARTER, MONTH, SQL_TSI_MONTH, WEEK, SQL_TSI_WEEK, DAY, SQL_TSI_DAY," + diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala index 4b5341e..9bc8801 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec} import org.apache.spark.sql.types.StructType diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala index d02be70..9308717 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala @@ -26,9 +26,9 @@ trait ResetShufflePartition extends Logging { def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = { QueryContextFacade.current().addAndGetSourceScanBytes(bytes) val defaultParallelism = sparkSession.sparkContext.defaultParallelism - val kapConfig = KylinConfig.getInstanceFromEnv - val partitionsNum = if (kapConfig.getSparkSqlShufflePartitions != -1) { - kapConfig.getSparkSqlShufflePartitions + val kylinConfig = KylinConfig.getInstanceFromEnv + val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) { + kylinConfig.getSparkSqlShufflePartitions } else { Math.min(QueryContextFacade.current().getSourceScanBytes / ( KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 * 1024 * 2) + 1, diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java index fe1c619..092dc34 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java @@ -84,14 +84,6 @@ public class NBuildSourceInfo { this.viewFactTablePath = viewFactTablePath; } - public void setCount(long count) { - this.count = count; - } - - public long getCount() { - return count; - } - public void setLayoutId(long layoutId) { this.layoutId = layoutId; } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index a9c735b..7923605 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -304,8 +304,7 @@ public class NSparkExecutable extends AbstractExecutable { protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar, String appArgs) { StringBuilder sb = new StringBuilder(); - sb.append( - "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry "); + sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry "); Map<String, String> sparkConfs = getSparkConfigOverride(config); for (Entry<String, String> entry : sparkConfs.entrySet()) { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java index 9ff561d..b5e2d9e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java @@ -33,7 +33,7 @@ public abstract class MetadataMerger { return config; } - public abstract void merge(String dataflowId, String segmentIds, ResourceStore remoteResourceStore, String jobType); + public abstract void merge(String cubeId, String segmentIds, ResourceStore remoteResourceStore, String jobType); public abstract void merge(AbstractExecutable abstractExecutable); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java index d0cb8b4..0d67265 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java @@ -57,9 +57,7 @@ public class CsvSource implements ISource { boolean withHeader = false; if (kylinConfig.getDeployEnv().equals("UT") && (parameters != null && parameters.get("separator") == null)) { - path = "file:///" + new File(getUtMetaDir(), - "../../examples/test_case_data/parquet_test/data/" + table.identity() + ".csv") - .getAbsolutePath(); + path = "file:///" + new File(getUtMetaDir(), "data/" + table.identity() + ".csv").getAbsolutePath(); separator = ""; } else if (kylinConfig.getDeployEnv().equals("LOCAL")) { path = "file:///" + new File(kylinConfig.getMetadataUrlPrefix(), diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala index 1e2138b..8d11974 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala @@ -22,7 +22,7 @@ import java.util.Locale import com.google.common.collect.Sets import org.apache.commons.lang3.StringUtils -import org.apache.kylin.engine.spark.builder.DFBuilderHelper.{ENCODE_SUFFIX, _} +import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.{ENCODE_SUFFIX, _} import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ import org.apache.kylin.engine.spark.metadata._ import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree @@ -94,7 +94,7 @@ class CreateFlatTable(val seg: SegmentInfo, private def buildDict(ds: Dataset[Row], dictCols: Set[ColumnDesc]): Unit = { val matchedCols = filterCols(ds, dictCols) if (!matchedCols.isEmpty) { - val builder = new DFDictionaryBuilder(ds, seg, ss, Sets.newHashSet(matchedCols.asJavaCollection)) + val builder = new CubeDictionaryBuilder(ds, seg, ss, Sets.newHashSet(matchedCols.asJavaCollection)) builder.buildDictSet() } } @@ -103,7 +103,7 @@ class CreateFlatTable(val seg: SegmentInfo, val matchedCols = filterCols(ds, encodeCols) var encodeDs = ds if (!matchedCols.isEmpty) { - encodeDs = DFTableEncoder.encodeTable(ds, seg, matchedCols.asJava) + encodeDs = CubeTableEncoder.encodeTable(ds, seg, matchedCols.asJava) } encodeDs } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFBuilderHelper.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeBuilderHelper.scala similarity index 97% rename from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFBuilderHelper.scala rename to kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeBuilderHelper.scala index 5f37bb4..985f8a5 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFBuilderHelper.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeBuilderHelper.scala @@ -26,9 +26,9 @@ import org.apache.spark.sql.{Column, Dataset, Row} import scala.util.{Failure, Success, Try} -object DFBuilderHelper extends Logging { +object CubeBuilderHelper extends Logging { - val ENCODE_SUFFIX = "_KE_ENCODE" + val ENCODE_SUFFIX = "_KYLIN_ENCODE" def filterCols(dsSeq: Seq[Dataset[Row]], needCheckCols: Set[ColumnDesc]): Set[ColumnDesc] = { needCheckCols -- dsSeq.flatMap(ds => filterCols(ds, needCheckCols)) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala similarity index 88% rename from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala rename to kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala index 2adac10..009be80 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala @@ -23,10 +23,10 @@ import java.util import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.lock.DistributedLock import org.apache.kylin.common.util.HadoopUtil -import org.apache.kylin.engine.spark.builder.DFBuilderHelper._ +import org.apache.kylin.engine.spark.builder.CubeBuilderHelper._ import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo} -import org.apache.spark.dict.NGlobalDictionaryV2 +import org.apache.spark.dict.NGlobalDictionary import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.functions.{col, expr} @@ -35,10 +35,10 @@ import org.apache.spark.sql.{Column, Dataset, Row, SparkSession} import scala.collection.JavaConverters._ -class DFDictionaryBuilder(val dataset: Dataset[Row], - val seg: SegmentInfo, - val ss: SparkSession, - val colRefSet: util.Set[ColumnDesc]) extends Logging with Serializable { +class CubeDictionaryBuilder(val dataset: Dataset[Row], + val seg: SegmentInfo, + val ss: SparkSession, + val colRefSet: util.Set[ColumnDesc]) extends Logging with Serializable { @transient val lock: DistributedLock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory.lockForCurrentThread @@ -70,7 +70,7 @@ class DFDictionaryBuilder(val dataset: Dataset[Row], val columnName = ref.identity logInfo(s"Start building global dictionaries V2 for column $columnName.") - val globalDict = new NGlobalDictionaryV2(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory) + val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory) globalDict.prepareWrite() val broadcastDict = ss.sparkContext.broadcast(globalDict) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFLayoutMergeAssist.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java similarity index 97% rename from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFLayoutMergeAssist.java rename to kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java index c1d56ce..c2b59e7 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFLayoutMergeAssist.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java @@ -34,8 +34,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -public class DFLayoutMergeAssist implements Serializable { - protected static final Logger logger = LoggerFactory.getLogger(DFLayoutMergeAssist.class); +public class CubeMergeAssist implements Serializable { + protected static final Logger logger = LoggerFactory.getLogger(CubeMergeAssist.class); private static final int DEFAULT_BUFFER_SIZE = 256; private LayoutEntity layout; private SegmentInfo newSegment; diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFSnapshotBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala similarity index 99% rename from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFSnapshotBuilder.scala rename to kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala index 535e716..557f89a 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFSnapshotBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala @@ -41,7 +41,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.util.control.Breaks._ import scala.util.{Failure, Success, Try} -class DFSnapshotBuilder extends Logging { +class CubeSnapshotBuilder extends Logging { var ss: SparkSession = _ var seg: SegmentInfo = _ diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala similarity index 87% rename from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala rename to kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala index c11370f..e4a77c3 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala @@ -19,10 +19,10 @@ package org.apache.kylin.engine.spark.builder import java.util -import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX +import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.ENCODE_SUFFIX import org.apache.kylin.engine.spark.job.NSparkCubingUtil._ -import org.apache.kylin.engine.spark.metadata.{SegmentInfo, ColumnDesc} -import org.apache.spark.dict.NGlobalDictionaryV2 +import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo} +import org.apache.spark.dict.NGlobalDictionary import org.apache.spark.internal.Logging import org.apache.spark.sql.KylinFunctions._ import org.apache.spark.sql.functions.{col, _} @@ -32,7 +32,7 @@ import org.apache.spark.sql.{Dataset, Row} import scala.collection.JavaConverters._ import scala.collection.mutable._ -object DFTableEncoder extends Logging { +object CubeTableEncoder extends Logging { def encodeTable(ds: Dataset[Row], seg: SegmentInfo, cols: util.Set[ColumnDesc]): Dataset[Row] = { val structType = ds.schema @@ -45,7 +45,7 @@ object DFTableEncoder extends Logging { cols.asScala.foreach( ref => { - val globalDict = new NGlobalDictionaryV2(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory) + val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory) val bucketSize = globalDict.getBucketSizeOrDefault(seg.kylinconf.getGlobalDictV2MinHashPartitions) val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) * bucketSize).toInt diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala index d43b56b..18041c8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala @@ -19,16 +19,16 @@ package org.apache.kylin.engine.spark.builder import org.apache.spark.broadcast.Broadcast -import org.apache.spark.dict.NGlobalDictionaryV2 import org.apache.spark.sql.Row import org.apache.spark.TaskContext +import org.apache.spark.dict.NGlobalDictionary import org.apache.spark.internal.Logging import scala.collection.mutable.ListBuffer object DictHelper extends Logging{ - def genDict(columnName: String, broadcastDict: Broadcast[NGlobalDictionaryV2], iter: Iterator[Row]) = { + def genDict(columnName: String, broadcastDict: Broadcast[NGlobalDictionary], iter: Iterator[Row]) = { val partitionID = TaskContext.get().partitionId() logInfo(s"Build partition dict col: ${columnName}, partitionId: $partitionID") val broadcastGlobalDict = broadcastDict.value diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java index 0ff3906..c42b437 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.engine.spark.metadata.ColumnDesc; import org.apache.spark.dict.NGlobalDictMetaInfo; -import org.apache.spark.dict.NGlobalDictionaryV2; +import org.apache.spark.dict.NGlobalDictionary; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.slf4j.Logger; @@ -43,7 +43,7 @@ public class DictionaryBuilderHelper { * than the threshold multiplied by KylinConfigBase.getGlobalDictV2BucketOverheadFactor */ public static int calculateBucketSize(SegmentInfo desc, ColumnDesc col, Dataset<Row> afterDistinct) throws IOException { - NGlobalDictionaryV2 globalDict = new NGlobalDictionaryV2(desc.project(), col.tableAliasName(), col.columnName(), + NGlobalDictionary globalDict = new NGlobalDictionary(desc.project(), col.tableAliasName(), col.columnName(), desc.kylinconf().getHdfsWorkingDirectory()); int bucketPartitionSize = globalDict.getBucketSizeOrDefault(desc.kylinconf().getGlobalDictV2MinHashPartitions()); int bucketThreshold = desc.kylinconf().getGlobalDictV2ThresholdBucketSize(); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index a554c0a..e55dfcf 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -78,8 +78,8 @@ public class CubeBuildJob extends SparkApplication { private BuildLayoutWithUpdate buildLayoutWithUpdate; private Map<Long, Short> cuboidShardNum = Maps.newHashMap(); public static void main(String[] args) { - CubeBuildJob nDataflowBuildJob = new CubeBuildJob(); - nDataflowBuildJob.execute(args); + CubeBuildJob cubeBuildJob = new CubeBuildJob(); + cubeBuildJob.execute(args); } @Override @@ -120,7 +120,7 @@ public class CubeBuildJob extends SparkApplication { infos.recordSpanningTree(segId, spanningTree); logger.info("Updating segment info"); - updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getCount()); + updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg, buildFromFlatTable.getFlattableDS().count()); } updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID), ResourceDetectUtils.getSegmentSourceSize(shareDir)); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index a64e5b8..aaa3c21 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -19,7 +19,6 @@ package org.apache.kylin.engine.spark.job; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -27,7 +26,6 @@ import java.util.UUID; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.engine.spark.metadata.cube.ManagerHub; import org.apache.kylin.engine.spark.metadata.cube.PathManager; @@ -48,7 +46,7 @@ import com.google.common.collect.Maps; import org.apache.kylin.engine.spark.NSparkCubingEngine; import org.apache.kylin.engine.spark.application.SparkApplication; -import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist; +import org.apache.kylin.engine.spark.builder.CubeMergeAssist; import org.apache.kylin.engine.spark.utils.BuildUtils; import org.apache.kylin.engine.spark.utils.JobMetrics; import org.apache.kylin.engine.spark.utils.JobMetricsUtils; @@ -85,8 +83,8 @@ public class CubeMergeJob extends SparkApplication { CubeSegment mergedSeg = cube.getSegmentById(segmentId); SegmentInfo mergedSegInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), mergedSeg.getUuid()); - Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss); - for (DFLayoutMergeAssist assist : mergeCuboidsAssist.values()) { + Map<Long, CubeMergeAssist> mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss); + for (CubeMergeAssist assist : mergeCuboidsAssist.values()) { SpanningTree spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts())); Dataset<Row> afterMerge = assist.merge(config, cube.getName()); LayoutEntity layout = assist.getLayout(); @@ -116,19 +114,19 @@ public class CubeMergeJob extends SparkApplication { } } - public static Map<Long, DFLayoutMergeAssist> generateMergeAssist(List<SegmentInfo> mergingSegments, - SparkSession ss) { + public static Map<Long, CubeMergeAssist> generateMergeAssist(List<SegmentInfo> mergingSegments, + SparkSession ss) { // collect layouts need to merge - Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = Maps.newConcurrentMap(); + Map<Long, CubeMergeAssist> mergeCuboidsAssist = Maps.newConcurrentMap(); for (SegmentInfo seg : mergingSegments) { scala.collection.immutable.List<LayoutEntity> cuboids = seg.layouts(); for (int i = 0; i < cuboids.size(); i++) { LayoutEntity cuboid = cuboids.apply(i); long layoutId = cuboid.getId(); - DFLayoutMergeAssist assist = mergeCuboidsAssist.get(layoutId); + CubeMergeAssist assist = mergeCuboidsAssist.get(layoutId); if (assist == null) { - assist = new DFLayoutMergeAssist(); + assist = new CubeMergeAssist(); assist.addCuboid(cuboid); assist.setSs(ss); assist.setLayout(cuboid); @@ -144,7 +142,7 @@ public class CubeMergeJob extends SparkApplication { } private LayoutEntity saveAndUpdateCuboid(Dataset<Row> dataset, SegmentInfo seg, LayoutEntity layout, - DFLayoutMergeAssist assist) throws IOException { + CubeMergeAssist assist) throws IOException { long layoutId = layout.getId(); long sourceCount = 0L; @@ -191,8 +189,8 @@ public class CubeMergeJob extends SparkApplication { } public static void main(String[] args) { - CubeMergeJob nDataflowBuildJob = new CubeMergeJob(); - nDataflowBuildJob.execute(args); + CubeMergeJob cubeMergeJob = new CubeMergeJob(); + cubeMergeJob.execute(args); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala index a854242..1da2af5 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala @@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.job import java.util import java.util.Locale -import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX +import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.ENCODE_SUFFIX import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree import org.apache.kylin.engine.spark.metadata.{ColumnDesc, DTType, FunctionDesc, LiteralColumnDesc} import org.apache.kylin.measure.bitmap.BitmapMeasureType diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala index d533970..b5a7830 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala @@ -68,7 +68,7 @@ class ParentSourceChooser( // hacked, for some case, you do not want to trigger buildSnapshot // eg: resource detect // Move this to a more suitable place - val builder = new DFSnapshotBuilder(seg, ss) + val builder = new CubeSnapshotBuilder(seg, ss) seg = builder.buildSnapshot } flatTableSource = getFlatTable() @@ -143,7 +143,6 @@ class ParentSourceChooser( val buildSource = new NBuildSourceInfo buildSource.setParentStoragePath("NSparkCubingUtil.getStoragePath(dataCuboid)") buildSource.setSparkSession(ss) - buildSource.setCount(layout.getRows) buildSource.setLayoutId(layout.getId) buildSource.setLayout(layout) buildSource.setByteSize(layout.getByteSize) @@ -163,7 +162,6 @@ class ParentSourceChooser( val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo) val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true) sourceInfo.setFlattableDS(afterJoin) - sourceInfo.setCount(afterJoin.count()) logInfo("No suitable ready layouts could be reused, generate dataset from flat table.") sourceInfo diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java index 6f04932..4e771f9 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java @@ -28,7 +28,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.spark.application.SparkApplication; -import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist; +import org.apache.kylin.engine.spark.builder.CubeMergeAssist; import org.apache.kylin.engine.spark.metadata.MetadataConverter; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.metadata.MetadataConstants; @@ -70,14 +70,14 @@ public class ResourceDetectBeforeMergingJob extends SparkApplication { } infos.clearMergingSegments(); infos.recordMergingSegments(segmentInfos); - Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = CubeMergeJob.generateMergeAssist(segmentInfos, ss); + Map<Long, CubeMergeAssist> mergeCuboidsAssist = CubeMergeJob.generateMergeAssist(segmentInfos, ss); ResourceDetectUtils.write( new Path(config.getJobTmpShareDir(project, jobId), ResourceDetectUtils.countDistinctSuffix()), ResourceDetectUtils .findCountDistinctMeasure(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts()))); Map<String, List<String>> resourcePaths = Maps.newHashMap(); infos.clearSparkPlans(); - for (Map.Entry<Long, DFLayoutMergeAssist> entry : mergeCuboidsAssist.entrySet()) { + for (Map.Entry<Long, CubeMergeAssist> entry : mergeCuboidsAssist.entrySet()) { Dataset<Row> afterMerge = entry.getValue().merge(config, getParam(MetadataConstants.P_CUBE_NAME)); infos.recordSparkPlan(afterMerge.queryExecution().sparkPlan()); List<Path> paths = JavaConversions diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala index 7db98fb..49aa30f 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala @@ -107,7 +107,7 @@ object ExpressionConverter { case "WEEK" => weekofyear(k_lit(inputAsTS)) case "DOY" => dayofyear(k_lit(inputAsTS)) case "DAY" => dayofmonth(k_lit(inputAsTS)) - case "DOW" => kap_day_of_week(k_lit(inputAsTS)) + case "DOW" => kylin_day_of_week(k_lit(inputAsTS)) case "HOUR" => hour(k_lit(inputAsTS)) case "MINUTE" => minute(k_lit(inputAsTS)) case "SECOND" => second(k_lit(inputAsTS)) @@ -158,7 +158,7 @@ object ExpressionConverter { k_lit(children.head), children.apply(1).asInstanceOf[Int]) case "truncate" => - kap_truncate(k_lit(children.head), children.apply(1).asInstanceOf[Int]) + kylin_truncate(k_lit(children.head), children.apply(1).asInstanceOf[Int]) case "cot" => k_lit(1).divide(tan(k_lit(children.head))) // null handling funcs diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala index 38f0903..7413e4b 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.udf import java.util.{Calendar, Locale, TimeZone} import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils.{MICROS_PER_MILLIS, MONTHS_PER_QUARTER} +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils.{MICROS_PER_MILLIS, MONTHS_PER_QUARTER} object TimestampAddImpl { private val localCalendar = new ThreadLocal[Calendar] { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala index 56733b5..f0d4d9e 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.udf import java.util.Locale -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils -import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils._ +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils +import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils object TimestampDiffImpl { @@ -69,11 +69,11 @@ object TimestampDiffImpl { case "WEEK" | "SQL_TSI_WEEK" => (aMillis - bMillis) / MILLIS_PER_DAY / DAYS_PER_WEEK case "MONTH" | "SQL_TSI_MONTH" => - KapDateTimeUtils.subtractMonths(aMillis, bMillis) + KylinDateTimeUtils.subtractMonths(aMillis, bMillis) case "QUARTER" | "SQL_TSI_QUARTER" => - KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER + KylinDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER case "YEAR" | "SQL_TSI_YEAR" => - KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER / QUARTERS_PER_YEAR + KylinDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER / QUARTERS_PER_YEAR case _ => throw new IllegalArgumentException(s"Illegal unit: $unit," + s" only support [YEAR, SQL_TSI_YEAR, QUARTER, SQL_TSI_QUARTER, MONTH, SQL_TSI_MONTH, WEEK, SQL_TSI_WEEK, DAY, SQL_TSI_DAY," + diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index 5b758c1..7ca1cca 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -256,7 +256,7 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme if (type.isBoolean()) return DataTypes.BooleanType; - throw new IllegalArgumentException("KAP data type: " + type + " can not be converted to spark's type."); + throw new IllegalArgumentException("Kylin data type: " + type + " can not be converted to spark's type."); } public void buildMultiSegs(String cubeName) throws Exception { diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryTest.java similarity index 89% rename from kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java rename to kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryTest.java index 1516c99..46b0905 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryTest.java @@ -34,7 +34,7 @@ import org.apache.spark.dict.NBucketDictionary; import org.apache.spark.dict.NGlobalDictHDFSStore; import org.apache.spark.dict.NGlobalDictMetaInfo; import org.apache.spark.dict.NGlobalDictStore; -import org.apache.spark.dict.NGlobalDictionaryV2; +import org.apache.spark.dict.NGlobalDictionary; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -52,7 +52,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -public class NGlobalDictionaryV2Test extends LocalWithSparkSessionTest { +public class NGlobalDictionaryTest extends LocalWithSparkSessionTest { private final static int BUCKET_SIZE = 10; @@ -84,8 +84,8 @@ public class NGlobalDictionaryV2Test extends LocalWithSparkSessionTest { private void roundTest(int size) throws IOException { System.out.println("GlobalDictionaryV2Test -> roundTest -> " + System.currentTimeMillis()); KylinConfig config = KylinConfig.getInstanceFromEnv(); - NGlobalDictionaryV2 dict1 = new NGlobalDictionaryV2("t1", "a", "spark", config.getHdfsWorkingDirectory()); - NGlobalDictionaryV2 dict2 = new NGlobalDictionaryV2("t2", "a", "local", config.getHdfsWorkingDirectory()); + NGlobalDictionary dict1 = new NGlobalDictionary("t1", "a", "spark", config.getHdfsWorkingDirectory()); + NGlobalDictionary dict2 = new NGlobalDictionary("t2", "a", "local", config.getHdfsWorkingDirectory()); List<String> stringList = generateRandomData(size); Collections.sort(stringList); runWithSparkBuildGlobalDict(dict1, stringList); @@ -102,7 +102,7 @@ public class NGlobalDictionaryV2Test extends LocalWithSparkSessionTest { return stringList; } - private void runWithSparkBuildGlobalDict(NGlobalDictionaryV2 dict, List<String> stringSet) throws IOException { + private void runWithSparkBuildGlobalDict(NGlobalDictionary dict, List<String> stringSet) throws IOException { KylinConfig config = KylinConfig.getInstanceFromEnv(); dict.prepareWrite(); List<Row> rowList = Lists.newLinkedList(); @@ -129,7 +129,7 @@ public class NGlobalDictionaryV2Test extends LocalWithSparkSessionTest { dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); } - private void runWithLocalBuildGlobalDict(NGlobalDictionaryV2 dict, List<String> stringSet) throws IOException { + private void runWithLocalBuildGlobalDict(NGlobalDictionary dict, List<String> stringSet) throws IOException { KylinConfig config = KylinConfig.getInstanceFromEnv(); dict.prepareWrite(); HashPartitioner partitioner = new HashPartitioner(BUCKET_SIZE); @@ -153,13 +153,13 @@ public class NGlobalDictionaryV2Test extends LocalWithSparkSessionTest { dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(), config.getGlobalDictV2VersionTTL()); } - private void compareTwoModeVersionNum(NGlobalDictionaryV2 dict1, NGlobalDictionaryV2 dict2) throws IOException { + private void compareTwoModeVersionNum(NGlobalDictionary dict1, NGlobalDictionary dict2) throws IOException { NGlobalDictStore store1 = new NGlobalDictHDFSStore(dict1.getResourceDir()); NGlobalDictStore store2 = new NGlobalDictHDFSStore(dict2.getResourceDir()); Assert.assertEquals(store1.listAllVersions().length, store2.listAllVersions().length); } - private void compareTwoVersionDict(NGlobalDictionaryV2 dict1, NGlobalDictionaryV2 dict2) throws IOException { + private void compareTwoVersionDict(NGlobalDictionary dict1, NGlobalDictionary dict2) throws IOException { NGlobalDictMetaInfo metadata1 = dict1.getMetaInfo(); NGlobalDictMetaInfo metadata2 = dict2.getMetaInfo(); // compare dict meta info diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala index 9fd613e..fe84f82 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala @@ -114,7 +114,7 @@ class TestCreateFlatTable extends SparderBaseFunSuite with SharedSparkSession wi private def checkEncodeCols(ds: Dataset[Row], segment: CubeSegment, needEncode: Boolean) = { val seg = MetadataConverter.getSegmentInfo(segment.getCubeInstance, segment.getUuid, segment.getName, segment.getStorageLocationIdentifier) val globalDictSet = seg.toBuildDictColumns - val actualEncodeDictSize = ds.schema.count(_.name.endsWith(DFBuilderHelper.ENCODE_SUFFIX)) + val actualEncodeDictSize = ds.schema.count(_.name.endsWith(CubeBuilderHelper.ENCODE_SUFFIX)) if (needEncode) { Assert.assertEquals(globalDictSet.size, actualEncodeDictSize) } else { diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala index 003aefa..a0b33dc 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala @@ -32,7 +32,7 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler import org.apache.kylin.job.lock.MockJobLock import org.apache.kylin.metadata.model.SegmentRange.TSRange import org.apache.spark.TaskContext -import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionaryV2} +import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionary} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} import org.apache.spark.sql.functions.col @@ -112,12 +112,12 @@ class TestGlobalDictBuild extends SparderBaseFunSuite with SharedSparkSession wi } def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet: Dataset[Row], dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = { - val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, segInfo, randomDataSet.sparkSession, dictColSet) + val dictionaryBuilder = new CubeDictionaryBuilder(randomDataSet, segInfo, randomDataSet.sparkSession, dictColSet) val col = dictColSet.iterator().next() val ds = randomDataSet.select("26").distinct() val bucketPartitionSize = DictionaryBuilderHelper.calculateBucketSize(segInfo, col, ds) dictionaryBuilder.build(col, bucketPartitionSize, ds) - val dict = new NGlobalDictionaryV2(seg.getProject, col.tableName, col.columnName, + val dict = new NGlobalDictionary(seg.getProject, col.tableName, col.columnName, seg.getConfig.getHdfsWorkingDirectory) dict.getMetaInfo } diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala index cae9d1d..dab19a8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala @@ -167,7 +167,7 @@ class TestSnapshotBuilder extends SparderBaseFunSuite with SharedSparkSession wi val segCopy = cubeCopy.getSegmentById(seg.getUuid) segCopy.setSnapshots(new ConcurrentHashMap()) - var snapshotBuilder = new DFSnapshotBuilder( + var snapshotBuilder = new CubeSnapshotBuilder( MetadataConverter.getSegmentInfo(segCopy.getCubeInstance, segCopy.getUuid, segCopy.getName, segCopy.getStorageLocationIdentifier), spark) val snapshot = snapshotBuilder.buildSnapshot cube.getSegments.asScala.foreach(_.getConfig.setProperty("kylin.snapshot.parallel-build-enabled", "true")) @@ -183,7 +183,7 @@ class TestSnapshotBuilder extends SparderBaseFunSuite with SharedSparkSession wi val segCopy = cubeCopy.getSegmentById(segment.getUuid) segCopy.setSnapshots(new ConcurrentHashMap()) val segInfo = MetadataConverter.getSegmentInfo(segCopy.getCubeInstance, segCopy.getUuid, segCopy.getName, segCopy.getStorageLocationIdentifier) - var snapshotBuilder = new DFSnapshotBuilder(segInfo, spark) + var snapshotBuilder = new CubeSnapshotBuilder(segInfo, spark) snapshotBuilder.buildSnapshot } @@ -201,7 +201,7 @@ class TestSnapshotBuilder extends SparderBaseFunSuite with SharedSparkSession wi val cubeCopy = segment.getCubeInstance.latestCopyForWrite() val segCopy = cubeCopy.getSegmentById(segment.getUuid) segCopy.setSnapshots(new ConcurrentHashMap()) - var snapshotBuilder = new DFSnapshotBuilder( + var snapshotBuilder = new CubeSnapshotBuilder( MetadataConverter.getSegmentInfo(segCopy.getCubeInstance, segCopy.getUuid, segCopy.getName, segCopy.getStorageLocationIdentifier), spark) snapshotBuilder.buildSnapshot } diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala index de2c24a..0d70aef 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala @@ -35,7 +35,7 @@ class TestUdfManager extends SparderBaseFunSuite with SharedSparkSession with Be } // ignore for dev, if you want to run this case, modify udfCache maximum to Long.MaxValue - // and if (funcName == "TOP_N") to if (funcName.startsWith("TOP_N")) in io.kyligence.kap.engine.spark.job.UdfManager + // and if (funcName == "TOP_N") to if (funcName.startsWith("TOP_N")) in org.apache.kylin.engine.spark.job.UdfManager ignore("test register udf in multi-thread") { import functions.udf val testFunc = udf(() => "test") diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala index 6c64e00..bd4cd46 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala @@ -64,7 +64,6 @@ object DerivedProcess { } val hostFkIdx = hostFkCols.map(hostCol => indexOnTheGTValues(hostCol)) - // fix for test src/kap-it/src/test/resources/query/sql_rawtable/query03.sql if (!hostFkIdx.exists(_ >= 0)) { return } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala index 2e620a3..bad566f 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala @@ -109,7 +109,7 @@ object ExpressionConverter { case "WEEK" => weekofyear(k_lit(inputAsTS)) case "DOY" => dayofyear(k_lit(inputAsTS)) case "DAY" => dayofmonth(k_lit(inputAsTS)) - case "DOW" => kap_day_of_week(k_lit(inputAsTS)) + case "DOW" => kylin_day_of_week(k_lit(inputAsTS)) case "HOUR" => hour(k_lit(inputAsTS)) case "MINUTE" => minute(k_lit(inputAsTS)) case "SECOND" => second(k_lit(inputAsTS)) @@ -160,7 +160,7 @@ object ExpressionConverter { k_lit(children.head), children.apply(1).asInstanceOf[Int]) case "truncate" => - kap_truncate(k_lit(children.head), children.apply(1).asInstanceOf[Int]) + kylin_truncate(k_lit(children.head), children.apply(1).asInstanceOf[Int]) case "cot" => k_lit(1).divide(tan(k_lit(children.head))) // null handling funcs diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala index c01170b..a617cf1 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala @@ -117,7 +117,6 @@ object RuntimeHelper { } else if (deriveMap.contains(index)) { deriveMap.apply(index) } else if (DataType.DATETIME_FAMILY.contains(column.getType.getName)) { - // https://github.com/Kyligence/KAP/issues/14561 literalTs.as(s"${factTableName}_${columnName}") } else { literalOne.as(s"${factTableName}_${columnName}") diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala index bfd2a39..dcb4f14 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala @@ -160,7 +160,7 @@ class SparderRexVisitor( case num: MonthNum => { // both add_month and add_year case val ts = k_lit(children.head).cast(TimestampType) - return k_lit(kap_add_months(k_lit(ts), num.num)) + return k_lit(kylin_add_months(k_lit(ts), num.num)) } case _ => } @@ -242,7 +242,7 @@ class SparderRexVisitor( val ts1 = k_lit(children.head).cast(TimestampType) val ts2 = k_lit(children.last).cast(TimestampType) - kap_subtract_months(ts1, ts2) + kylin_subtract_months(ts1, ts2) } else { throw new IllegalStateException( diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala index 13c8c7c..f1348da 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala @@ -52,7 +52,7 @@ object ProjectPlan extends Logging { k_lit(c._1._1).as(s"${System.identityHashCode(rel)}_prj${c._2}") } }) - .map(c => { // find and rename the duplicated columns KAP#16751 + .map(c => { // find and rename the duplicated columns if (!(duplicatedColumnsCount contains c)) { duplicatedColumnsCount += (c -> 0) c diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index 73d7424..f0077af 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -62,11 +62,11 @@ object ResultPlan extends Logging { val resultTypes = rowType.getFieldList.asScala val jobGroup = Thread.currentThread().getName val sparkContext = SparderContext.getSparkSession.sparkContext - val kapConfig = KylinConfig.getInstanceFromEnv + val kylinConfig = KylinConfig.getInstanceFromEnv var pool = "heavy_tasks" val partitionsNum = - if (kapConfig.getSparkSqlShufflePartitions != -1) { - kapConfig.getSparkSqlShufflePartitions + if (kylinConfig.getSparkSqlShufflePartitions != -1) { + kylinConfig.getSparkSqlShufflePartitions } else { Math.min(QueryContextFacade.current().getSourceScanBytes / PARTITION_SPLIT_BYTES + 1, SparderContext.getTotalCore).toInt diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java index ddcc79f..113d530 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java @@ -81,13 +81,13 @@ public class NExecAndComp { appendLimitQueries++; } - Dataset<Row> kapResult = (recAndQueryResult == null) ? queryWithKap(prj, joinType, sqlAndAddedLimitSql) - : queryWithKap(prj, joinType, sqlAndAddedLimitSql, recAndQueryResult); + Dataset<Row> kylinResult = (recAndQueryResult == null) ? queryWithKylin(prj, joinType, sqlAndAddedLimitSql) + : queryWithKylin(prj, joinType, sqlAndAddedLimitSql, recAndQueryResult); addQueryPath(recAndQueryResult, query, sql); Dataset<Row> sparkResult = queryWithSpark(prj, sql, query.getFirst()); - List<Row> kapRows = SparkQueryTest.castDataType(kapResult, sparkResult).toJavaRDD().collect(); + List<Row> kylinRows = SparkQueryTest.castDataType(kylinResult, sparkResult).toJavaRDD().collect(); List<Row> sparkRows = sparkResult.toJavaRDD().collect(); - if (!compareResults(normRows(sparkRows), normRows(kapRows), CompareLevel.SUBSET)) { + if (!compareResults(normRows(sparkRows), normRows(kylinRows), CompareLevel.SUBSET)) { throw new IllegalArgumentException("Result not match"); } } @@ -115,8 +115,8 @@ public class NExecAndComp { // Query from Cube long startTime = System.currentTimeMillis(); - Dataset<Row> cubeResult = (recAndQueryResult == null) ? queryWithKap(prj, joinType, Pair.newPair(sql, sql)) - : queryWithKap(prj, joinType, Pair.newPair(sql, sql), recAndQueryResult); + Dataset<Row> cubeResult = (recAndQueryResult == null) ? queryWithKylin(prj, joinType, Pair.newPair(sql, sql)) + : queryWithKylin(prj, joinType, Pair.newPair(sql, sql), recAndQueryResult); addQueryPath(recAndQueryResult, query, sql); if (compareLevel == CompareLevel.SAME) { Dataset<Row> sparkResult = queryWithSpark(prj, sql, query.getFirst()); @@ -145,7 +145,7 @@ public class NExecAndComp { } } - public static boolean execAndCompareQueryResult(Pair<String, String> queryForKap, + public static boolean execAndCompareQueryResult(Pair<String, String> queryForKylin, Pair<String, String> queryForSpark, String joinType, String prj, Map<String, CompareEntity> recAndQueryResult) { String sqlForSpark = changeJoinType(queryForSpark.getSecond(), joinType); @@ -153,11 +153,11 @@ public class NExecAndComp { Dataset<Row> sparkResult = queryWithSpark(prj, queryForSpark.getSecond(), queryForSpark.getFirst()); List<Row> sparkRows = sparkResult.toJavaRDD().collect(); - String sqlForKap = changeJoinType(queryForKap.getSecond(), joinType); - Dataset<Row> cubeResult = queryWithKap(prj, joinType, Pair.newPair(sqlForKap, sqlForKap)); - List<Row> kapRows = SparkQueryTest.castDataType(cubeResult, sparkResult).toJavaRDD().collect(); + String sqlForKylin = changeJoinType(queryForKylin.getSecond(), joinType); + Dataset<Row> cubeResult = queryWithKylin(prj, joinType, Pair.newPair(sqlForKylin, sqlForKylin)); + List<Row> kylinRows = SparkQueryTest.castDataType(cubeResult, sparkResult).toJavaRDD().collect(); - return sparkRows.equals(kapRows); + return sparkRows.equals(kylinRows); } private static List<Row> normRows(List<Row> rows) { @@ -184,8 +184,8 @@ public class NExecAndComp { "The method has deprecated, please call org.apache.kylin.engine.spark2.NExecAndComp.execAndCompareNew"); } - private static Dataset<Row> queryWithKap(String prj, String joinType, Pair<String, String> pair, - Map<String, CompareEntity> compareEntityMap) { + private static Dataset<Row> queryWithKylin(String prj, String joinType, Pair<String, String> pair, + Map<String, CompareEntity> compareEntityMap) { compareEntityMap.putIfAbsent(pair.getFirst(), new CompareEntity()); final CompareEntity entity = compareEntityMap.get(pair.getFirst()); @@ -196,7 +196,7 @@ public class NExecAndComp { return rowDataset; } - private static Dataset<Row> queryWithKap(String prj, String joinType, Pair<String, String> sql) { + private static Dataset<Row> queryWithKylin(String prj, String joinType, Pair<String, String> sql) { return queryFromCube(prj, changeJoinType(sql.getSecond(), joinType)); } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java index 897cef2..cf58cac 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java @@ -264,7 +264,7 @@ public class NFilePruningTest extends LocalWithSparkSessionTest { //pruningWithVariousTypesScenario(); } finally { - System.clearProperty("kap.storage.columnar.shard-rowcount"); + System.clearProperty("kylin.storage.columnar.shard-rowcount"); } } @@ -287,7 +287,7 @@ public class NFilePruningTest extends LocalWithSparkSessionTest { NExecAndComp.execAndCompare(query, getProject(), NExecAndComp.CompareLevel.SAME, "left"); } finally { - System.clearProperty("kap.storage.columnar.shard-rowcount"); + System.clearProperty("kylin.storage.columnar.shard-rowcount"); } } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java index bfa1d1c..ba17302 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java @@ -28,7 +28,6 @@ import org.apache.kylin.query.util.QueryUtil.IQueryTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; @@ -70,15 +69,14 @@ public class QueryUtil { String[] currentTransformers = queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName) .toArray(String[]::new); String[] configTransformers = kylinConfig.getQueryTransformers(); - boolean containsCCTransformer = Arrays.stream(configTransformers).anyMatch(t -> t.equals("io.kyligence.kap.query.util.ConvertToComputedColumn")); boolean transformersEqual = Objects.deepEquals(currentTransformers, configTransformers); - if (transformersEqual && (isCCNeeded || !containsCCTransformer)) { + if (transformersEqual && (isCCNeeded)) { return; } List<IQueryTransformer> transformers = Lists.newArrayList(); for (String clz : configTransformers) { - if (!isCCNeeded && clz.equals("io.kyligence.kap.query.util.ConvertToComputedColumn")) + if (!isCCNeeded) continue; try { diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js index 80f0ddd..6d57c42 100644 --- a/webapp/app/js/model/cubeConfig.js +++ b/webapp/app/js/model/cubeConfig.js @@ -26,7 +26,7 @@ KylinApp.constant('cubeConfig', { engineType:[ //{name:'MapReduce',value: 2}, //{name:'Spark',value: 4}, - {name:'Spark2',value: 6} + {name:'Spark',value: 6} //{name:'Flink',value: 5} ], joinTypes: [ diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html b/webapp/app/partials/cubeDesigner/advanced_settings.html index 733b341..98bc1ab 100755 --- a/webapp/app/partials/cubeDesigner/advanced_settings.html +++ b/webapp/app/partials/cubeDesigner/advanced_settings.html @@ -382,7 +382,7 @@ </select> <span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==2">MapReduce</span> <span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==4">Spark</span> - <span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==6">Spark2</span> + <span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==6">Spark</span> <span ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==5">Flink</span> </div> </div>