This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 5764e47 KYLIN-4660 Normalize variable and file naming
5764e47 is described below
commit 5764e4797c90b3c9225c5c5c8c1d4fea27fe59a4
Author: rupengwang <[email protected]>
AuthorDate: Tue Jul 28 14:56:32 2020 +0800
KYLIN-4660 Normalize variable and file naming
---
.../kylin/common/util/TempMetadataBuilder.java | 1 -
.../kylin/spark/classloader/TomcatClassLoader.java | 2 +-
...ateTimeUtils.scala => KylinDateTimeUtils.scala} | 2 +-
.../org/apache/spark/dict/NBucketDictionary.java | 2 +-
.../spark/dict/NGlobalDictBuilderAssist.scala | 4 +--
...balDictionaryV2.java => NGlobalDictionary.java} | 8 ++---
.../apache/spark/sql/KylinDataFrameManager.scala | 3 +-
.../org/apache/spark/sql/KylinFunctions.scala | 34 +++++++++++-----------
.../sql/catalyst/expressions/DictEncodeImpl.scala | 4 +--
...pExpresssions.scala => KylinExpresssions.scala} | 32 ++++++++++----------
.../catalyst/expressions/TimestampAddImpl.scala | 2 +-
.../catalyst/expressions/TimestampDiffImpl.scala | 10 +++----
.../sql/execution/KylinFileSourceScanExec.scala | 1 -
.../datasource/ResetShufflePartition.scala | 6 ++--
.../engine/spark/builder/NBuildSourceInfo.java | 8 -----
.../kylin/engine/spark/job/NSparkExecutable.java | 3 +-
.../kylin/engine/spark/merger/MetadataMerger.java | 2 +-
.../kylin/engine/spark/source/CsvSource.java | 4 +--
.../engine/spark/builder/CreateFlatTable.scala | 6 ++--
...BuilderHelper.scala => CubeBuilderHelper.scala} | 4 +--
...ryBuilder.scala => CubeDictionaryBuilder.scala} | 14 ++++-----
...LayoutMergeAssist.java => CubeMergeAssist.java} | 4 +--
...shotBuilder.scala => CubeSnapshotBuilder.scala} | 2 +-
...DFTableEncoder.scala => CubeTableEncoder.scala} | 10 +++----
.../kylin/engine/spark/builder/DictHelper.scala | 4 +--
.../spark/builder/DictionaryBuilderHelper.java | 4 +--
.../kylin/engine/spark/job/CubeBuildJob.java | 6 ++--
.../kylin/engine/spark/job/CubeMergeJob.java | 24 +++++++--------
.../kylin/engine/spark/job/CuboidAggregator.scala | 2 +-
.../engine/spark/job/ParentSourceChooser.scala | 4 +--
.../spark/job/ResourceDetectBeforeMergingJob.java | 6 ++--
.../kylin/query/runtime/ExpressionConverter.scala | 4 +--
.../apache/spark/sql/udf/TimestampAddImpl.scala | 2 +-
.../apache/spark/sql/udf/TimestampDiffImpl.scala | 10 +++----
.../engine/spark/LocalWithSparkSessionTest.java | 2 +-
...onaryV2Test.java => NGlobalDictionaryTest.java} | 16 +++++-----
.../engine/spark/builder/TestCreateFlatTable.scala | 2 +-
.../engine/spark/builder/TestGlobalDictBuild.scala | 6 ++--
.../engine/spark/builder/TestSnapshotBuilder.scala | 6 ++--
.../kylin/engine/spark/job/TestUdfManager.scala | 2 +-
.../kylin/query/runtime/DerivedProcess.scala | 1 -
.../kylin/query/runtime/ExpressionConverter.scala | 4 +--
.../apache/kylin/query/runtime/RuntimeHelper.scala | 1 -
.../kylin/query/runtime/SparderRexVisitor.scala | 4 +--
.../kylin/query/runtime/plans/ProjectPlan.scala | 2 +-
.../kylin/query/runtime/plans/ResultPlan.scala | 6 ++--
.../apache/kylin/engine/spark2/NExecAndComp.java | 28 +++++++++---------
.../spark2/file_pruning/NFilePruningTest.java | 4 +--
.../kylin/engine/spark2/utils/QueryUtil.java | 6 ++--
webapp/app/js/model/cubeConfig.js | 2 +-
.../partials/cubeDesigner/advanced_settings.html | 2 +-
51 files changed, 153 insertions(+), 175 deletions(-)
diff --git
a/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java
b/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java
index 1b92532..3653bf3 100644
---
a/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java
+++
b/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java
@@ -83,7 +83,6 @@ public class TempMetadataBuilder {
FileUtils.deleteQuietly(new File(dst));
- // KAP files will overwrite Kylin files
for (String metaSrc : metaSrcs) {
FileUtils.copyDirectory(new File(metaSrc), new File(dst));
}
diff --git
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
index f826356..45007c5 100644
---
a/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
+++
b/kylin-spark-project/kylin-spark-classloader/src/main/java/org/apache/kylin/spark/classloader/TomcatClassLoader.java
@@ -137,7 +137,7 @@ public class TomcatClassLoader extends
ParallelWebappClassLoader {
if (sparkClassLoader.classNeedPreempt(name)) {
return sparkClassLoader.loadClass(name);
}
- // tomcat classpath include KAP_HOME/lib , ensure this classload can
load kap class
+ // tomcat classpath include KYLIN_HOME/lib , ensure this classload can
load kylin class
if (isParentCLPrecedent(name) && !isThisCLPrecedent(name)) {
logger.debug("delegate " + name + " directly to parent");
return parent.loadClass(name);
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KapDateTimeUtils.scala
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
similarity index 99%
rename from
kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KapDateTimeUtils.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
index ca882a5..a5a6451 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KapDateTimeUtils.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
@@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.common.util
import org.apache.calcite.avatica.util.TimeUnitRange
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-object KapDateTimeUtils {
+object KylinDateTimeUtils {
val MICROS_PER_MILLIS: Long = 1000L
val MILLIS_PER_SECOND: Long = 1000L
val MILLIS_PER_MINUTE: Long = MILLIS_PER_SECOND * 60L
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
index 7d08bef..bf2a351 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
@@ -27,7 +27,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
public class NBucketDictionary {
- protected static final Logger logger =
LoggerFactory.getLogger(NGlobalDictionaryV2.class);
+ protected static final Logger logger =
LoggerFactory.getLogger(NGlobalDictionary.class);
private String workingDir;
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
index 64e9bad..962cb40 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
@@ -35,7 +35,7 @@ object NGlobalDictBuilderAssist extends Logging {
@throws[IOException]
def resize(ref: ColumnDesc, desc: SegmentInfo, bucketPartitionSize: Int, ss:
SparkSession): Unit = {
- val globalDict = new NGlobalDictionaryV2(desc.project, ref.tableAliasName,
ref.columnName, desc.kylinconf.getHdfsWorkingDirectory)
+ val globalDict = new NGlobalDictionary(desc.project, ref.tableAliasName,
ref.columnName, desc.kylinconf.getHdfsWorkingDirectory)
val broadcastDict = ss.sparkContext.broadcast(globalDict)
globalDict.prepareWrite()
@@ -44,7 +44,7 @@ object NGlobalDictBuilderAssist extends Logging {
val existsDictDs = ss.createDataset(0 to bucketPartitionSize)
.flatMap {
bucketId =>
- val gDict: NGlobalDictionaryV2 = broadcastDict.value
+ val gDict: NGlobalDictionary = broadcastDict.value
val bucketDict: NBucketDictionary =
gDict.loadBucketDictionary(bucketId)
val tupleList = new util.ArrayList[(String,
Long)](bucketDict.getAbsoluteDictMap.size)
bucketDict.getAbsoluteDictMap.object2LongEntrySet.asScala
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
similarity index 94%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
rename to
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
index 5278dbf..651387d 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
@@ -24,9 +24,9 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NGlobalDictionaryV2 implements Serializable {
+public class NGlobalDictionary implements Serializable {
- protected static final Logger logger =
LoggerFactory.getLogger(NGlobalDictionaryV2.class);
+ protected static final Logger logger =
LoggerFactory.getLogger(NGlobalDictionary.class);
private final static String WORKING_DIR = "working";
@@ -48,7 +48,7 @@ public class NGlobalDictionaryV2 implements Serializable {
return baseDir + WORKING_DIR;
}
- public NGlobalDictionaryV2(String project, String sourceTable, String
sourceColumn, String baseDir)
+ public NGlobalDictionary(String project, String sourceTable, String
sourceColumn, String baseDir)
throws IOException {
this.project = project;
this.sourceTable = sourceTable;
@@ -60,7 +60,7 @@ public class NGlobalDictionaryV2 implements Serializable {
}
}
- public NGlobalDictionaryV2(String dictParams) throws IOException {
+ public NGlobalDictionary(String dictParams) throws IOException {
String[] dictInfo = dictParams.split(SEPARATOR);
this.project = dictInfo[0];
this.sourceTable = dictInfo[1];
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
index fb90f43..153786c 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
@@ -21,7 +21,6 @@ package org.apache.spark.sql
import org.apache.kylin.cube.CubeInstance
import org.apache.kylin.cube.cuboid.Cuboid
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasource.FilePruner
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -68,7 +67,7 @@ class KylinDataFrameManager(sparkSession: SparkSession) {
def cuboidTable(cubeInstance: CubeInstance, layout: Cuboid): DataFrame = {
option("project", cubeInstance.getProject)
- option("dataflowId", cubeInstance.getUuid)
+ option("cubeId", cubeInstance.getUuid)
option("cuboidId", layout.getId)
val indexCatalog = new FilePruner(cubeInstance, layout, sparkSession,
options = extraOptions.toMap)
sparkSession.baseRelationToDataFrame(
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 04d4a68..330ded2 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -17,12 +17,12 @@
*/
package org.apache.spark.sql
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.types.{AbstractDataType, DataType, DateType,
IntegerType}
-import org.apache.spark.sql.catalyst.expressions.{BinaryExpression,
DictEncode, Expression, ExpressionInfo, ExpressionUtils,
ImplicitCastInputTypes, In, KapAddMonths, Like, Literal, RoundBase, SplitPart,
Sum0, TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{BinaryExpression,
DictEncode, Expression, ExpressionInfo, ExpressionUtils,
ImplicitCastInputTypes, In, KylinAddMonths, Like, Literal, RoundBase,
SplitPart, Sum0, TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount,
PreciseCountDistinct}
@@ -33,15 +33,15 @@ object KylinFunctions {
Column(func.toAggregateExpression(isDistinct))
}
- def kap_add_months(startDate: Column, numMonths: Column): Column = {
- Column(KapAddMonths(startDate.expr, numMonths.expr))
+ def kylin_add_months(startDate: Column, numMonths: Column): Column = {
+ Column(KylinAddMonths(startDate.expr, numMonths.expr))
}
def dict_encode(column: Column, dictParams: Column, bucketSize: Column):
Column = {
Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr))
}
- // special lit for KE.
+ // special lit for KYLIN.
def k_lit(literal: Any): Column = literal match {
case c: Column => c
case s: Symbol => new ColumnName(s.name)
@@ -52,14 +52,14 @@ object KylinFunctions {
def in(value: Expression, list: Seq[Expression]): Column = Column(In(value,
list))
- def kap_day_of_week(date: Column): Column = Column(KapDayOfWeek(date.expr))
+ def kylin_day_of_week(date: Column): Column =
Column(KylinDayOfWeek(date.expr))
- def kap_truncate(column: Column, scale: Int): Column = {
+ def kylin_truncate(column: Column, scale: Int): Column = {
Column(TRUNCATE(column.expr, Literal(scale)))
}
- def kap_subtract_months(date0: Column, date1: Column): Column = {
- Column(KapSubtractMonths(date0.expr, date1.expr))
+ def kylin_subtract_months(date0: Column, date1: Column): Column = {
+ Column(KylinSubtractMonths(date0.expr, date1.expr))
}
def precise_count_distinct(column: Column): Column =
@@ -99,7 +99,7 @@ case class TRUNCATE(child: Expression, scale: Expression)
}
// scalastyle:on line.size.limit
-case class KapSubtractMonths(a: Expression, b: Expression)
+case class KylinSubtractMonths(a: Expression, b: Expression)
extends BinaryExpression
with ImplicitCastInputTypes {
@@ -112,21 +112,21 @@ case class KapSubtractMonths(a: Expression, b: Expression)
override def dataType: DataType = IntegerType
override def nullSafeEval(date0: Any, date1: Any): Any = {
- KapDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int],
+ KylinDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int],
date1.asInstanceOf[Int])
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$")
+ val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (d0, d1) => {
s"""$dtu.dateSubtractMonths($d0, $d1)"""
})
}
- override def prettyName: String = "kap_months_between"
+ override def prettyName: String = "kylin_months_between"
}
-case class KapDayOfWeek(a: Expression)
+case class KylinDayOfWeek(a: Expression)
extends UnaryExpression
with ImplicitCastInputTypes {
@@ -137,19 +137,19 @@ case class KapDayOfWeek(a: Expression)
override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode): ExprCode = {
- val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$")
+ val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (d) => {
s"""$dtu.dayOfWeek($d)"""
})
}
override def nullSafeEval(date: Any): Any = {
- KapDateTimeUtils.dayOfWeek(date.asInstanceOf[Int])
+ KylinDateTimeUtils.dayOfWeek(date.asInstanceOf[Int])
}
override def dataType: DataType = IntegerType
- override def prettyName: String = "kap_day_of_week"
+ override def prettyName: String = "kylin_day_of_week"
}
object FunctionEntity {
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala
index 356bd6d..1df301e 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/DictEncodeImpl.scala
@@ -22,7 +22,7 @@ import org.apache.spark.TaskContext
import java.util
import org.apache.spark.util.TaskCompletionListener
-import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionaryV2}
+import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary}
object DictEncodeImpl {
@@ -42,7 +42,7 @@ object DictEncodeImpl {
private def initBucketDict(dictParams: String, bucketSize: String):
NBucketDictionary = {
val partitionID = TaskContext.get.partitionId
val encodeBucketId = partitionID % bucketSize.toInt
- val globalDict = new NGlobalDictionaryV2(dictParams)
+ val globalDict = new NGlobalDictionary(dictParams)
val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId)
DictEncodeImpl.cacheBucketDict.get.put(dictParams, cachedBucketDict)
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
similarity index 93%
rename from
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
rename to
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index 573f4c4..1bb5be2 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -17,8 +17,8 @@
*/
package org.apache.spark.sql.catalyst.expressions
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils
-import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionaryV2}
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
+import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionary}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
@@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
"""
)
// scalastyle:on line.size.limit
-case class KapAddMonths(startDate: Expression, numMonths: Expression)
+case class KylinAddMonths(startDate: Expression, numMonths: Expression)
extends BinaryExpression
with ImplicitCastInputTypes {
@@ -52,17 +52,17 @@ case class KapAddMonths(startDate: Expression, numMonths:
Expression)
override def nullSafeEval(start: Any, months: Any): Any = {
val time = start.asInstanceOf[Long]
val month = months.asInstanceOf[Int]
- KapDateTimeUtils.addMonths(time, month)
+ KylinDateTimeUtils.addMonths(time, month)
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$")
+ val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (sd, m) => {
s"""$dtu.addMonths($sd, $m)"""
})
}
- override def prettyName: String = "kap_add_months"
+ override def prettyName: String = "kylin_add_months"
}
// Returns the date that is num_months after start_date.
@@ -78,7 +78,7 @@ case class KapAddMonths(startDate: Expression, numMonths:
Expression)
"""
)
// scalastyle:on line.size.limit
-case class KapSubtractMonths(a: Expression, b: Expression)
+case class KylinSubtractMonths(a: Expression, b: Expression)
extends BinaryExpression
with ImplicitCastInputTypes {
@@ -91,18 +91,18 @@ case class KapSubtractMonths(a: Expression, b: Expression)
override def dataType: DataType = IntegerType
override def nullSafeEval(date0: Any, date1: Any): Any = {
- KapDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int],
+ KylinDateTimeUtils.dateSubtractMonths(date0.asInstanceOf[Int],
date1.asInstanceOf[Int])
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
- val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$")
+ val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (d0, d1) => {
s"""$dtu.dateSubtractMonths($d0, $d1)"""
})
}
- override def prettyName: String = "kap_months_between"
+ override def prettyName: String = "kylin_months_between"
}
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
@@ -174,7 +174,7 @@ case class Sum0(child: Expression)
override lazy val evaluateExpression: Expression = sum
}
-case class KapDayOfWeek(a: Expression)
+case class KylinDayOfWeek(a: Expression)
extends UnaryExpression
with ImplicitCastInputTypes {
@@ -185,19 +185,19 @@ case class KapDayOfWeek(a: Expression)
override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode): ExprCode = {
- val dtu = KapDateTimeUtils.getClass.getName.stripSuffix("$")
+ val dtu = KylinDateTimeUtils.getClass.getName.stripSuffix("$")
defineCodeGen(ctx, ev, (d) => {
s"""$dtu.dayOfWeek($d)"""
})
}
override def nullSafeEval(date: Any): Any = {
- KapDateTimeUtils.dayOfWeek(date.asInstanceOf[Int])
+ KylinDateTimeUtils.dayOfWeek(date.asInstanceOf[Int])
}
override def dataType: DataType = IntegerType
- override def prettyName: String = "kap_day_of_week"
+ override def prettyName: String = "kylin_day_of_week"
}
case class TimestampAdd(left: Expression, mid: Expression, right: Expression)
extends TernaryExpression with ExpectsInputTypes {
@@ -293,7 +293,7 @@ case class DictEncode(left: Expression, mid: Expression,
right: Expression) exte
override protected def doGenCode(
ctx: CodegenContext,
ev: ExprCode): ExprCode = {
- val globalDictClass = classOf[NGlobalDictionaryV2].getName
+ val globalDictClass = classOf[NGlobalDictionary].getName
val bucketDictClass = classOf[NBucketDictionary].getName
val globalDictTerm = ctx.addMutableState(globalDictClass,
s"${mid.simpleString.replace("[", "").replace("]", "")}_globalDict")
@@ -307,7 +307,7 @@ case class DictEncode(left: Expression, mid: Expression,
right: Expression) exte
| private void init${bucketDictTerm.replace("[", "").replace("]",
"")}BucketDict(int idx) {
| try {
| int bucketId = idx % $bucketSizeTerm;
- | $globalDictTerm = new
org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm");
+ | $globalDictTerm = new
org.apache.spark.dict.NGlobalDictionary("$dictParamsTerm");
| $bucketDictTerm =
$globalDictTerm.loadBucketDictionary(bucketId);
| } catch (Exception e) {
| throw new RuntimeException(e);
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
index 1a32b34..55f683c 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
@@ -17,7 +17,7 @@
*/
package org.apache.spark.sql.catalyst.expressions
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils._
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
import java.util.{Calendar, Locale, TimeZone}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
index c0c9055..e85abe0 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
import java.util.Locale
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils._
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
object TimestampDiffImpl {
@@ -69,11 +69,11 @@ object TimestampDiffImpl {
case "WEEK" | "SQL_TSI_WEEK" =>
(aMillis - bMillis) / MILLIS_PER_DAY / DAYS_PER_WEEK
case "MONTH" | "SQL_TSI_MONTH" =>
- KapDateTimeUtils.subtractMonths(aMillis, bMillis)
+ KylinDateTimeUtils.subtractMonths(aMillis, bMillis)
case "QUARTER" | "SQL_TSI_QUARTER" =>
- KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER
+ KylinDateTimeUtils.subtractMonths(aMillis, bMillis) /
MONTHS_PER_QUARTER
case "YEAR" | "SQL_TSI_YEAR" =>
- KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER
/ QUARTERS_PER_YEAR
+ KylinDateTimeUtils.subtractMonths(aMillis, bMillis) /
MONTHS_PER_QUARTER / QUARTERS_PER_YEAR
case _ =>
throw new IllegalArgumentException(s"Illegal unit: $unit," +
s" only support [YEAR, SQL_TSI_YEAR, QUARTER, SQL_TSI_QUARTER,
MONTH, SQL_TSI_MONTH, WEEK, SQL_TSI_WEEK, DAY, SQL_TSI_DAY," +
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 4b5341e..9bc8801 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -26,7 +26,6 @@ import
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec}
import org.apache.spark.sql.types.StructType
diff --git
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
index d02be70..9308717 100644
---
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
+++
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/ResetShufflePartition.scala
@@ -26,9 +26,9 @@ trait ResetShufflePartition extends Logging {
def setShufflePartitions(bytes: Long, sparkSession: SparkSession): Unit = {
QueryContextFacade.current().addAndGetSourceScanBytes(bytes)
val defaultParallelism = sparkSession.sparkContext.defaultParallelism
- val kapConfig = KylinConfig.getInstanceFromEnv
- val partitionsNum = if (kapConfig.getSparkSqlShufflePartitions != -1) {
- kapConfig.getSparkSqlShufflePartitions
+ val kylinConfig = KylinConfig.getInstanceFromEnv
+ val partitionsNum = if (kylinConfig.getSparkSqlShufflePartitions != -1) {
+ kylinConfig.getSparkSqlShufflePartitions
} else {
Math.min(QueryContextFacade.current().getSourceScanBytes / (
KylinConfig.getInstanceFromEnv.getQueryPartitionSplitSizeMB * 1024 *
1024 * 2) + 1,
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
index fe1c619..092dc34 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/builder/NBuildSourceInfo.java
@@ -84,14 +84,6 @@ public class NBuildSourceInfo {
this.viewFactTablePath = viewFactTablePath;
}
- public void setCount(long count) {
- this.count = count;
- }
-
- public long getCount() {
- return count;
- }
-
public void setLayoutId(long layoutId) {
this.layoutId = layoutId;
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index a9c735b..7923605 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -304,8 +304,7 @@ public class NSparkExecutable extends AbstractExecutable {
protected String generateSparkCmd(KylinConfig config, String hadoopConf,
String jars, String kylinJobJar,
String appArgs) {
StringBuilder sb = new StringBuilder();
- sb.append(
- "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class
org.apache.kylin.engine.spark.application.SparkEntry ");
+ sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class
org.apache.kylin.engine.spark.application.SparkEntry ");
Map<String, String> sparkConfs = getSparkConfigOverride(config);
for (Entry<String, String> entry : sparkConfs.entrySet()) {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
index 9ff561d..b5e2d9e 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
@@ -33,7 +33,7 @@ public abstract class MetadataMerger {
return config;
}
- public abstract void merge(String dataflowId, String segmentIds,
ResourceStore remoteResourceStore, String jobType);
+ public abstract void merge(String cubeId, String segmentIds, ResourceStore
remoteResourceStore, String jobType);
public abstract void merge(AbstractExecutable abstractExecutable);
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java
index d0cb8b4..0d67265 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java
@@ -57,9 +57,7 @@ public class CsvSource implements ISource {
boolean withHeader = false;
if (kylinConfig.getDeployEnv().equals("UT")
&& (parameters != null && parameters.get("separator")
== null)) {
- path = "file:///" + new File(getUtMetaDir(),
- "../../examples/test_case_data/parquet_test/data/"
+ table.identity() + ".csv")
- .getAbsolutePath();
+ path = "file:///" + new File(getUtMetaDir(), "data/" +
table.identity() + ".csv").getAbsolutePath();
separator = "";
} else if (kylinConfig.getDeployEnv().equals("LOCAL")) {
path = "file:///" + new
File(kylinConfig.getMetadataUrlPrefix(),
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
index 1e2138b..8d11974 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
@@ -22,7 +22,7 @@ import java.util.Locale
import com.google.common.collect.Sets
import org.apache.commons.lang3.StringUtils
-import org.apache.kylin.engine.spark.builder.DFBuilderHelper.{ENCODE_SUFFIX, _}
+import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.{ENCODE_SUFFIX,
_}
import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
import org.apache.kylin.engine.spark.metadata._
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree
@@ -94,7 +94,7 @@ class CreateFlatTable(val seg: SegmentInfo,
private def buildDict(ds: Dataset[Row], dictCols: Set[ColumnDesc]): Unit = {
val matchedCols = filterCols(ds, dictCols)
if (!matchedCols.isEmpty) {
- val builder = new DFDictionaryBuilder(ds, seg, ss,
Sets.newHashSet(matchedCols.asJavaCollection))
+ val builder = new CubeDictionaryBuilder(ds, seg, ss,
Sets.newHashSet(matchedCols.asJavaCollection))
builder.buildDictSet()
}
}
@@ -103,7 +103,7 @@ class CreateFlatTable(val seg: SegmentInfo,
val matchedCols = filterCols(ds, encodeCols)
var encodeDs = ds
if (!matchedCols.isEmpty) {
- encodeDs = DFTableEncoder.encodeTable(ds, seg, matchedCols.asJava)
+ encodeDs = CubeTableEncoder.encodeTable(ds, seg, matchedCols.asJava)
}
encodeDs
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFBuilderHelper.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeBuilderHelper.scala
similarity index 97%
rename from
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFBuilderHelper.scala
rename to
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeBuilderHelper.scala
index 5f37bb4..985f8a5 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFBuilderHelper.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeBuilderHelper.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.{Column, Dataset, Row}
import scala.util.{Failure, Success, Try}
-object DFBuilderHelper extends Logging {
+object CubeBuilderHelper extends Logging {
- val ENCODE_SUFFIX = "_KE_ENCODE"
+ val ENCODE_SUFFIX = "_KYLIN_ENCODE"
def filterCols(dsSeq: Seq[Dataset[Row]], needCheckCols: Set[ColumnDesc]):
Set[ColumnDesc] = {
needCheckCols -- dsSeq.flatMap(ds => filterCols(ds, needCheckCols))
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
similarity index 88%
rename from
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
rename to
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index 2adac10..009be80 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -23,10 +23,10 @@ import java.util
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.lock.DistributedLock
import org.apache.kylin.common.util.HadoopUtil
-import org.apache.kylin.engine.spark.builder.DFBuilderHelper._
+import org.apache.kylin.engine.spark.builder.CubeBuilderHelper._
import org.apache.kylin.engine.spark.job.NSparkCubingUtil
import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo}
-import org.apache.spark.dict.NGlobalDictionaryV2
+import org.apache.spark.dict.NGlobalDictionary
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.{col, expr}
@@ -35,10 +35,10 @@ import org.apache.spark.sql.{Column, Dataset, Row,
SparkSession}
import scala.collection.JavaConverters._
-class DFDictionaryBuilder(val dataset: Dataset[Row],
- val seg: SegmentInfo,
- val ss: SparkSession,
- val colRefSet: util.Set[ColumnDesc]) extends Logging
with Serializable {
+class CubeDictionaryBuilder(val dataset: Dataset[Row],
+ val seg: SegmentInfo,
+ val ss: SparkSession,
+ val colRefSet: util.Set[ColumnDesc]) extends
Logging with Serializable {
@transient
val lock: DistributedLock =
KylinConfig.getInstanceFromEnv.getDistributedLockFactory.lockForCurrentThread
@@ -70,7 +70,7 @@ class DFDictionaryBuilder(val dataset: Dataset[Row],
val columnName = ref.identity
logInfo(s"Start building global dictionaries V2 for column $columnName.")
- val globalDict = new NGlobalDictionaryV2(seg.project, ref.tableAliasName,
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
+ val globalDict = new NGlobalDictionary(seg.project, ref.tableAliasName,
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
globalDict.prepareWrite()
val broadcastDict = ss.sparkContext.broadcast(globalDict)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFLayoutMergeAssist.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
similarity index 97%
rename from
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFLayoutMergeAssist.java
rename to
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
index c1d56ce..c2b59e7 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFLayoutMergeAssist.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeMergeAssist.java
@@ -34,8 +34,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-public class DFLayoutMergeAssist implements Serializable {
- protected static final Logger logger =
LoggerFactory.getLogger(DFLayoutMergeAssist.class);
+public class CubeMergeAssist implements Serializable {
+ protected static final Logger logger =
LoggerFactory.getLogger(CubeMergeAssist.class);
private static final int DEFAULT_BUFFER_SIZE = 256;
private LayoutEntity layout;
private SegmentInfo newSegment;
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFSnapshotBuilder.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
similarity index 99%
rename from
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFSnapshotBuilder.scala
rename to
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
index 535e716..557f89a 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFSnapshotBuilder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala
@@ -41,7 +41,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.Breaks._
import scala.util.{Failure, Success, Try}
-class DFSnapshotBuilder extends Logging {
+class CubeSnapshotBuilder extends Logging {
var ss: SparkSession = _
var seg: SegmentInfo = _
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
similarity index 87%
rename from
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
rename to
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index c11370f..e4a77c3 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DFTableEncoder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -19,10 +19,10 @@ package org.apache.kylin.engine.spark.builder
import java.util
-import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX
+import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.ENCODE_SUFFIX
import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
-import org.apache.kylin.engine.spark.metadata.{SegmentInfo, ColumnDesc}
-import org.apache.spark.dict.NGlobalDictionaryV2
+import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo}
+import org.apache.spark.dict.NGlobalDictionary
import org.apache.spark.internal.Logging
import org.apache.spark.sql.KylinFunctions._
import org.apache.spark.sql.functions.{col, _}
@@ -32,7 +32,7 @@ import org.apache.spark.sql.{Dataset, Row}
import scala.collection.JavaConverters._
import scala.collection.mutable._
-object DFTableEncoder extends Logging {
+object CubeTableEncoder extends Logging {
def encodeTable(ds: Dataset[Row], seg: SegmentInfo, cols:
util.Set[ColumnDesc]): Dataset[Row] = {
val structType = ds.schema
@@ -45,7 +45,7 @@ object DFTableEncoder extends Logging {
cols.asScala.foreach(
ref => {
- val globalDict = new NGlobalDictionaryV2(seg.project,
ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
+ val globalDict = new NGlobalDictionary(seg.project,
ref.tableAliasName, ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
val bucketSize =
globalDict.getBucketSizeOrDefault(seg.kylinconf.getGlobalDictV2MinHashPartitions)
val enlargedBucketSize = (((minBucketSize / bucketSize) + 1) *
bucketSize).toInt
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala
index d43b56b..18041c8 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictHelper.scala
@@ -19,16 +19,16 @@
package org.apache.kylin.engine.spark.builder
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.dict.NGlobalDictionaryV2
import org.apache.spark.sql.Row
import org.apache.spark.TaskContext
+import org.apache.spark.dict.NGlobalDictionary
import org.apache.spark.internal.Logging
import scala.collection.mutable.ListBuffer
object DictHelper extends Logging{
- def genDict(columnName: String, broadcastDict:
Broadcast[NGlobalDictionaryV2], iter: Iterator[Row]) = {
+ def genDict(columnName: String, broadcastDict: Broadcast[NGlobalDictionary],
iter: Iterator[Row]) = {
val partitionID = TaskContext.get().partitionId()
logInfo(s"Build partition dict col: ${columnName}, partitionId:
$partitionID")
val broadcastGlobalDict = broadcastDict.value
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
index 0ff3906..c42b437 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.ColumnDesc;
import org.apache.spark.dict.NGlobalDictMetaInfo;
-import org.apache.spark.dict.NGlobalDictionaryV2;
+import org.apache.spark.dict.NGlobalDictionary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
@@ -43,7 +43,7 @@ public class DictionaryBuilderHelper {
* than the threshold multiplied by
KylinConfigBase.getGlobalDictV2BucketOverheadFactor
*/
public static int calculateBucketSize(SegmentInfo desc, ColumnDesc col,
Dataset<Row> afterDistinct) throws IOException {
- NGlobalDictionaryV2 globalDict = new
NGlobalDictionaryV2(desc.project(), col.tableAliasName(), col.columnName(),
+ NGlobalDictionary globalDict = new NGlobalDictionary(desc.project(),
col.tableAliasName(), col.columnName(),
desc.kylinconf().getHdfsWorkingDirectory());
int bucketPartitionSize =
globalDict.getBucketSizeOrDefault(desc.kylinconf().getGlobalDictV2MinHashPartitions());
int bucketThreshold =
desc.kylinconf().getGlobalDictV2ThresholdBucketSize();
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index a554c0a..e55dfcf 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -78,8 +78,8 @@ public class CubeBuildJob extends SparkApplication {
private BuildLayoutWithUpdate buildLayoutWithUpdate;
private Map<Long, Short> cuboidShardNum = Maps.newHashMap();
public static void main(String[] args) {
- CubeBuildJob nDataflowBuildJob = new CubeBuildJob();
- nDataflowBuildJob.execute(args);
+ CubeBuildJob cubeBuildJob = new CubeBuildJob();
+ cubeBuildJob.execute(args);
}
@Override
@@ -120,7 +120,7 @@ public class CubeBuildJob extends SparkApplication {
infos.recordSpanningTree(segId, spanningTree);
logger.info("Updating segment info");
- updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg,
buildFromFlatTable.getCount());
+ updateSegmentInfo(getParam(MetadataConstants.P_CUBE_ID), seg,
buildFromFlatTable.getFlattableDS().count());
}
updateSegmentSourceBytesSize(getParam(MetadataConstants.P_CUBE_ID),
ResourceDetectUtils.getSegmentSourceSize(shareDir));
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index a64e5b8..aaa3c21 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -19,7 +19,6 @@
package org.apache.kylin.engine.spark.job;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -27,7 +26,6 @@ import java.util.UUID;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
@@ -48,7 +46,7 @@ import com.google.common.collect.Maps;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.application.SparkApplication;
-import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist;
+import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
import org.apache.kylin.engine.spark.utils.BuildUtils;
import org.apache.kylin.engine.spark.utils.JobMetrics;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
@@ -85,8 +83,8 @@ public class CubeMergeJob extends SparkApplication {
CubeSegment mergedSeg = cube.getSegmentById(segmentId);
SegmentInfo mergedSegInfo = ManagerHub.getSegmentInfo(config,
getParam(MetadataConstants.P_CUBE_ID), mergedSeg.getUuid());
- Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist =
generateMergeAssist(mergingSegInfos, ss);
- for (DFLayoutMergeAssist assist : mergeCuboidsAssist.values()) {
+ Map<Long, CubeMergeAssist> mergeCuboidsAssist =
generateMergeAssist(mergingSegInfos, ss);
+ for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
SpanningTree spanningTree = new
ForestSpanningTree(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts()));
Dataset<Row> afterMerge = assist.merge(config, cube.getName());
LayoutEntity layout = assist.getLayout();
@@ -116,19 +114,19 @@ public class CubeMergeJob extends SparkApplication {
}
}
- public static Map<Long, DFLayoutMergeAssist>
generateMergeAssist(List<SegmentInfo> mergingSegments,
-
SparkSession ss) {
+ public static Map<Long, CubeMergeAssist>
generateMergeAssist(List<SegmentInfo> mergingSegments,
+ SparkSession
ss) {
// collect layouts need to merge
- Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist =
Maps.newConcurrentMap();
+ Map<Long, CubeMergeAssist> mergeCuboidsAssist =
Maps.newConcurrentMap();
for (SegmentInfo seg : mergingSegments) {
scala.collection.immutable.List<LayoutEntity> cuboids =
seg.layouts();
for (int i = 0; i < cuboids.size(); i++) {
LayoutEntity cuboid = cuboids.apply(i);
long layoutId = cuboid.getId();
- DFLayoutMergeAssist assist = mergeCuboidsAssist.get(layoutId);
+ CubeMergeAssist assist = mergeCuboidsAssist.get(layoutId);
if (assist == null) {
- assist = new DFLayoutMergeAssist();
+ assist = new CubeMergeAssist();
assist.addCuboid(cuboid);
assist.setSs(ss);
assist.setLayout(cuboid);
@@ -144,7 +142,7 @@ public class CubeMergeJob extends SparkApplication {
}
private LayoutEntity saveAndUpdateCuboid(Dataset<Row> dataset, SegmentInfo
seg, LayoutEntity layout,
- DFLayoutMergeAssist assist)
throws IOException {
+ CubeMergeAssist assist) throws
IOException {
long layoutId = layout.getId();
long sourceCount = 0L;
@@ -191,8 +189,8 @@ public class CubeMergeJob extends SparkApplication {
}
public static void main(String[] args) {
- CubeMergeJob nDataflowBuildJob = new CubeMergeJob();
- nDataflowBuildJob.execute(args);
+ CubeMergeJob cubeMergeJob = new CubeMergeJob();
+ cubeMergeJob.execute(args);
}
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index a854242..1da2af5 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.job
import java.util
import java.util.Locale
-import org.apache.kylin.engine.spark.builder.DFBuilderHelper.ENCODE_SUFFIX
+import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.ENCODE_SUFFIX
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree
import org.apache.kylin.engine.spark.metadata.{ColumnDesc, DTType,
FunctionDesc, LiteralColumnDesc}
import org.apache.kylin.measure.bitmap.BitmapMeasureType
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index d533970..b5a7830 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -68,7 +68,7 @@ class ParentSourceChooser(
// hacked, for some case, you do not want to trigger buildSnapshot
// eg: resource detect
// Move this to a more suitable place
- val builder = new DFSnapshotBuilder(seg, ss)
+ val builder = new CubeSnapshotBuilder(seg, ss)
seg = builder.buildSnapshot
}
flatTableSource = getFlatTable()
@@ -143,7 +143,6 @@ class ParentSourceChooser(
val buildSource = new NBuildSourceInfo
buildSource.setParentStoragePath("NSparkCubingUtil.getStoragePath(dataCuboid)")
buildSource.setSparkSession(ss)
- buildSource.setCount(layout.getRows)
buildSource.setLayoutId(layout.getId)
buildSource.setLayout(layout)
buildSource.setByteSize(layout.getByteSize)
@@ -163,7 +162,6 @@ class ParentSourceChooser(
val flatTable = new CreateFlatTable(seg, toBuildTree, ss, sourceInfo)
val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true)
sourceInfo.setFlattableDS(afterJoin)
- sourceInfo.setCount(afterJoin.count())
logInfo("No suitable ready layouts could be reused, generate dataset from
flat table.")
sourceInfo
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
index 6f04932..4e771f9 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ResourceDetectBeforeMergingJob.java
@@ -28,7 +28,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.spark.application.SparkApplication;
-import org.apache.kylin.engine.spark.builder.DFLayoutMergeAssist;
+import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
import org.apache.kylin.engine.spark.metadata.MetadataConverter;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.metadata.MetadataConstants;
@@ -70,14 +70,14 @@ public class ResourceDetectBeforeMergingJob extends
SparkApplication {
}
infos.clearMergingSegments();
infos.recordMergingSegments(segmentInfos);
- Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist =
CubeMergeJob.generateMergeAssist(segmentInfos, ss);
+ Map<Long, CubeMergeAssist> mergeCuboidsAssist =
CubeMergeJob.generateMergeAssist(segmentInfos, ss);
ResourceDetectUtils.write(
new Path(config.getJobTmpShareDir(project, jobId),
ResourceDetectUtils.countDistinctSuffix()),
ResourceDetectUtils
.findCountDistinctMeasure(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts())));
Map<String, List<String>> resourcePaths = Maps.newHashMap();
infos.clearSparkPlans();
- for (Map.Entry<Long, DFLayoutMergeAssist> entry :
mergeCuboidsAssist.entrySet()) {
+ for (Map.Entry<Long, CubeMergeAssist> entry :
mergeCuboidsAssist.entrySet()) {
Dataset<Row> afterMerge = entry.getValue().merge(config,
getParam(MetadataConstants.P_CUBE_NAME));
infos.recordSparkPlan(afterMerge.queryExecution().sparkPlan());
List<Path> paths = JavaConversions
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
index 7db98fb..49aa30f 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
@@ -107,7 +107,7 @@ object ExpressionConverter {
case "WEEK" => weekofyear(k_lit(inputAsTS))
case "DOY" => dayofyear(k_lit(inputAsTS))
case "DAY" => dayofmonth(k_lit(inputAsTS))
- case "DOW" => kap_day_of_week(k_lit(inputAsTS))
+ case "DOW" => kylin_day_of_week(k_lit(inputAsTS))
case "HOUR" => hour(k_lit(inputAsTS))
case "MINUTE" => minute(k_lit(inputAsTS))
case "SECOND" => second(k_lit(inputAsTS))
@@ -158,7 +158,7 @@ object ExpressionConverter {
k_lit(children.head),
children.apply(1).asInstanceOf[Int])
case "truncate" =>
- kap_truncate(k_lit(children.head),
children.apply(1).asInstanceOf[Int])
+ kylin_truncate(k_lit(children.head),
children.apply(1).asInstanceOf[Int])
case "cot" =>
k_lit(1).divide(tan(k_lit(children.head)))
// null handling funcs
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
index 38f0903..7413e4b 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.udf
import java.util.{Calendar, Locale, TimeZone}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.kylin.engine.spark.common.util.KapDateTimeUtils.{MICROS_PER_MILLIS,
MONTHS_PER_QUARTER}
+import
org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils.{MICROS_PER_MILLIS,
MONTHS_PER_QUARTER}
object TimestampAddImpl {
private val localCalendar = new ThreadLocal[Calendar] {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
index 56733b5..f0d4d9e 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.udf
import java.util.Locale
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils
-import org.apache.kylin.engine.spark.common.util.KapDateTimeUtils._
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
+import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
object TimestampDiffImpl {
@@ -69,11 +69,11 @@ object TimestampDiffImpl {
case "WEEK" | "SQL_TSI_WEEK" =>
(aMillis - bMillis) / MILLIS_PER_DAY / DAYS_PER_WEEK
case "MONTH" | "SQL_TSI_MONTH" =>
- KapDateTimeUtils.subtractMonths(aMillis, bMillis)
+ KylinDateTimeUtils.subtractMonths(aMillis, bMillis)
case "QUARTER" | "SQL_TSI_QUARTER" =>
- KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER
+ KylinDateTimeUtils.subtractMonths(aMillis, bMillis) /
MONTHS_PER_QUARTER
case "YEAR" | "SQL_TSI_YEAR" =>
- KapDateTimeUtils.subtractMonths(aMillis, bMillis) / MONTHS_PER_QUARTER
/ QUARTERS_PER_YEAR
+ KylinDateTimeUtils.subtractMonths(aMillis, bMillis) /
MONTHS_PER_QUARTER / QUARTERS_PER_YEAR
case _ =>
throw new IllegalArgumentException(s"Illegal unit: $unit," +
s" only support [YEAR, SQL_TSI_YEAR, QUARTER, SQL_TSI_QUARTER,
MONTH, SQL_TSI_MONTH, WEEK, SQL_TSI_WEEK, DAY, SQL_TSI_DAY," +
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index 5b758c1..7ca1cca 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -256,7 +256,7 @@ public class LocalWithSparkSessionTest extends
LocalFileMetadataTestCase impleme
if (type.isBoolean())
return DataTypes.BooleanType;
- throw new IllegalArgumentException("KAP data type: " + type + " can
not be converted to spark's type.");
+ throw new IllegalArgumentException("Kylin data type: " + type + " can
not be converted to spark's type.");
}
public void buildMultiSegs(String cubeName) throws Exception {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryTest.java
similarity index 89%
rename from
kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
rename to
kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryTest.java
index 1516c99..46b0905 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryV2Test.java
+++
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/dict/NGlobalDictionaryTest.java
@@ -34,7 +34,7 @@ import org.apache.spark.dict.NBucketDictionary;
import org.apache.spark.dict.NGlobalDictHDFSStore;
import org.apache.spark.dict.NGlobalDictMetaInfo;
import org.apache.spark.dict.NGlobalDictStore;
-import org.apache.spark.dict.NGlobalDictionaryV2;
+import org.apache.spark.dict.NGlobalDictionary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -52,7 +52,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-public class NGlobalDictionaryV2Test extends LocalWithSparkSessionTest {
+public class NGlobalDictionaryTest extends LocalWithSparkSessionTest {
private final static int BUCKET_SIZE = 10;
@@ -84,8 +84,8 @@ public class NGlobalDictionaryV2Test extends
LocalWithSparkSessionTest {
private void roundTest(int size) throws IOException {
System.out.println("GlobalDictionaryV2Test -> roundTest -> " +
System.currentTimeMillis());
KylinConfig config = KylinConfig.getInstanceFromEnv();
- NGlobalDictionaryV2 dict1 = new NGlobalDictionaryV2("t1", "a",
"spark", config.getHdfsWorkingDirectory());
- NGlobalDictionaryV2 dict2 = new NGlobalDictionaryV2("t2", "a",
"local", config.getHdfsWorkingDirectory());
+ NGlobalDictionary dict1 = new NGlobalDictionary("t1", "a", "spark",
config.getHdfsWorkingDirectory());
+ NGlobalDictionary dict2 = new NGlobalDictionary("t2", "a", "local",
config.getHdfsWorkingDirectory());
List<String> stringList = generateRandomData(size);
Collections.sort(stringList);
runWithSparkBuildGlobalDict(dict1, stringList);
@@ -102,7 +102,7 @@ public class NGlobalDictionaryV2Test extends
LocalWithSparkSessionTest {
return stringList;
}
- private void runWithSparkBuildGlobalDict(NGlobalDictionaryV2 dict,
List<String> stringSet) throws IOException {
+ private void runWithSparkBuildGlobalDict(NGlobalDictionary dict,
List<String> stringSet) throws IOException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
dict.prepareWrite();
List<Row> rowList = Lists.newLinkedList();
@@ -129,7 +129,7 @@ public class NGlobalDictionaryV2Test extends
LocalWithSparkSessionTest {
dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(),
config.getGlobalDictV2VersionTTL());
}
- private void runWithLocalBuildGlobalDict(NGlobalDictionaryV2 dict,
List<String> stringSet) throws IOException {
+ private void runWithLocalBuildGlobalDict(NGlobalDictionary dict,
List<String> stringSet) throws IOException {
KylinConfig config = KylinConfig.getInstanceFromEnv();
dict.prepareWrite();
HashPartitioner partitioner = new HashPartitioner(BUCKET_SIZE);
@@ -153,13 +153,13 @@ public class NGlobalDictionaryV2Test extends
LocalWithSparkSessionTest {
dict.writeMetaDict(BUCKET_SIZE, config.getGlobalDictV2MaxVersions(),
config.getGlobalDictV2VersionTTL());
}
- private void compareTwoModeVersionNum(NGlobalDictionaryV2 dict1,
NGlobalDictionaryV2 dict2) throws IOException {
+ private void compareTwoModeVersionNum(NGlobalDictionary dict1,
NGlobalDictionary dict2) throws IOException {
NGlobalDictStore store1 = new
NGlobalDictHDFSStore(dict1.getResourceDir());
NGlobalDictStore store2 = new
NGlobalDictHDFSStore(dict2.getResourceDir());
Assert.assertEquals(store1.listAllVersions().length,
store2.listAllVersions().length);
}
- private void compareTwoVersionDict(NGlobalDictionaryV2 dict1,
NGlobalDictionaryV2 dict2) throws IOException {
+ private void compareTwoVersionDict(NGlobalDictionary dict1,
NGlobalDictionary dict2) throws IOException {
NGlobalDictMetaInfo metadata1 = dict1.getMetaInfo();
NGlobalDictMetaInfo metadata2 = dict2.getMetaInfo();
// compare dict meta info
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index 9fd613e..fe84f82 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -114,7 +114,7 @@ class TestCreateFlatTable extends SparderBaseFunSuite with
SharedSparkSession wi
private def checkEncodeCols(ds: Dataset[Row], segment: CubeSegment,
needEncode: Boolean) = {
val seg = MetadataConverter.getSegmentInfo(segment.getCubeInstance,
segment.getUuid, segment.getName, segment.getStorageLocationIdentifier)
val globalDictSet = seg.toBuildDictColumns
- val actualEncodeDictSize =
ds.schema.count(_.name.endsWith(DFBuilderHelper.ENCODE_SUFFIX))
+ val actualEncodeDictSize =
ds.schema.count(_.name.endsWith(CubeBuilderHelper.ENCODE_SUFFIX))
if (needEncode) {
Assert.assertEquals(globalDictSet.size, actualEncodeDictSize)
} else {
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
index 003aefa..a0b33dc 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestGlobalDictBuild.scala
@@ -32,7 +32,7 @@ import org.apache.kylin.job.impl.threadpool.DefaultScheduler
import org.apache.kylin.job.lock.MockJobLock
import org.apache.kylin.metadata.model.SegmentRange.TSRange
import org.apache.spark.TaskContext
-import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionaryV2}
+import org.apache.spark.dict.{NGlobalDictMetaInfo, NGlobalDictionary}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession,
SparderBaseFunSuite}
import org.apache.spark.sql.functions.col
@@ -112,12 +112,12 @@ class TestGlobalDictBuild extends SparderBaseFunSuite
with SharedSparkSession wi
}
def buildDict(segInfo: SegmentInfo, seg: CubeSegment, randomDataSet:
Dataset[Row], dictColSet: Set[ColumnDesc]): NGlobalDictMetaInfo = {
- val dictionaryBuilder = new DFDictionaryBuilder(randomDataSet, segInfo,
randomDataSet.sparkSession, dictColSet)
+ val dictionaryBuilder = new CubeDictionaryBuilder(randomDataSet, segInfo,
randomDataSet.sparkSession, dictColSet)
val col = dictColSet.iterator().next()
val ds = randomDataSet.select("26").distinct()
val bucketPartitionSize =
DictionaryBuilderHelper.calculateBucketSize(segInfo, col, ds)
dictionaryBuilder.build(col, bucketPartitionSize, ds)
- val dict = new NGlobalDictionaryV2(seg.getProject, col.tableName,
col.columnName,
+ val dict = new NGlobalDictionary(seg.getProject, col.tableName,
col.columnName,
seg.getConfig.getHdfsWorkingDirectory)
dict.getMetaInfo
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala
index cae9d1d..dab19a8 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestSnapshotBuilder.scala
@@ -167,7 +167,7 @@ class TestSnapshotBuilder extends SparderBaseFunSuite with
SharedSparkSession wi
val segCopy = cubeCopy.getSegmentById(seg.getUuid)
segCopy.setSnapshots(new ConcurrentHashMap())
- var snapshotBuilder = new DFSnapshotBuilder(
+ var snapshotBuilder = new CubeSnapshotBuilder(
MetadataConverter.getSegmentInfo(segCopy.getCubeInstance,
segCopy.getUuid, segCopy.getName, segCopy.getStorageLocationIdentifier), spark)
val snapshot = snapshotBuilder.buildSnapshot
cube.getSegments.asScala.foreach(_.getConfig.setProperty("kylin.snapshot.parallel-build-enabled",
"true"))
@@ -183,7 +183,7 @@ class TestSnapshotBuilder extends SparderBaseFunSuite with
SharedSparkSession wi
val segCopy = cubeCopy.getSegmentById(segment.getUuid)
segCopy.setSnapshots(new ConcurrentHashMap())
val segInfo = MetadataConverter.getSegmentInfo(segCopy.getCubeInstance,
segCopy.getUuid, segCopy.getName, segCopy.getStorageLocationIdentifier)
- var snapshotBuilder = new DFSnapshotBuilder(segInfo, spark)
+ var snapshotBuilder = new CubeSnapshotBuilder(segInfo, spark)
snapshotBuilder.buildSnapshot
}
@@ -201,7 +201,7 @@ class TestSnapshotBuilder extends SparderBaseFunSuite with
SharedSparkSession wi
val cubeCopy = segment.getCubeInstance.latestCopyForWrite()
val segCopy = cubeCopy.getSegmentById(segment.getUuid)
segCopy.setSnapshots(new ConcurrentHashMap())
- var snapshotBuilder = new DFSnapshotBuilder(
+ var snapshotBuilder = new CubeSnapshotBuilder(
MetadataConverter.getSegmentInfo(segCopy.getCubeInstance,
segCopy.getUuid, segCopy.getName, segCopy.getStorageLocationIdentifier), spark)
snapshotBuilder.buildSnapshot
}
diff --git
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala
index de2c24a..0d70aef 100644
---
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala
+++
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/job/TestUdfManager.scala
@@ -35,7 +35,7 @@ class TestUdfManager extends SparderBaseFunSuite with
SharedSparkSession with Be
}
// ignore for dev, if you want to run this case, modify udfCache maximum to
Long.MaxValue
- // and if (funcName == "TOP_N") to if (funcName.startsWith("TOP_N")) in
io.kyligence.kap.engine.spark.job.UdfManager
+ // and if (funcName == "TOP_N") to if (funcName.startsWith("TOP_N")) in
org.apache.kylin.engine.spark.job.UdfManager
ignore("test register udf in multi-thread") {
import functions.udf
val testFunc = udf(() => "test")
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala
index 6c64e00..bd4cd46 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/DerivedProcess.scala
@@ -64,7 +64,6 @@ object DerivedProcess {
}
val hostFkIdx = hostFkCols.map(hostCol => indexOnTheGTValues(hostCol))
- // fix for test
src/kap-it/src/test/resources/query/sql_rawtable/query03.sql
if (!hostFkIdx.exists(_ >= 0)) {
return
}
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
index 2e620a3..bad566f 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
@@ -109,7 +109,7 @@ object ExpressionConverter {
case "WEEK" => weekofyear(k_lit(inputAsTS))
case "DOY" => dayofyear(k_lit(inputAsTS))
case "DAY" => dayofmonth(k_lit(inputAsTS))
- case "DOW" => kap_day_of_week(k_lit(inputAsTS))
+ case "DOW" => kylin_day_of_week(k_lit(inputAsTS))
case "HOUR" => hour(k_lit(inputAsTS))
case "MINUTE" => minute(k_lit(inputAsTS))
case "SECOND" => second(k_lit(inputAsTS))
@@ -160,7 +160,7 @@ object ExpressionConverter {
k_lit(children.head),
children.apply(1).asInstanceOf[Int])
case "truncate" =>
- kap_truncate(k_lit(children.head),
children.apply(1).asInstanceOf[Int])
+ kylin_truncate(k_lit(children.head),
children.apply(1).asInstanceOf[Int])
case "cot" =>
k_lit(1).divide(tan(k_lit(children.head)))
// null handling funcs
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala
index c01170b..a617cf1 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/RuntimeHelper.scala
@@ -117,7 +117,6 @@ object RuntimeHelper {
} else if (deriveMap.contains(index)) {
deriveMap.apply(index)
} else if
(DataType.DATETIME_FAMILY.contains(column.getType.getName)) {
- // https://github.com/Kyligence/KAP/issues/14561
literalTs.as(s"${factTableName}_${columnName}")
} else {
literalOne.as(s"${factTableName}_${columnName}")
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
index bfd2a39..dcb4f14 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
@@ -160,7 +160,7 @@ class SparderRexVisitor(
case num: MonthNum => {
// both add_month and add_year case
val ts = k_lit(children.head).cast(TimestampType)
- return k_lit(kap_add_months(k_lit(ts), num.num))
+ return k_lit(kylin_add_months(k_lit(ts), num.num))
}
case _ =>
}
@@ -242,7 +242,7 @@ class SparderRexVisitor(
val ts1 = k_lit(children.head).cast(TimestampType)
val ts2 = k_lit(children.last).cast(TimestampType)
- kap_subtract_months(ts1, ts2)
+ kylin_subtract_months(ts1, ts2)
} else {
throw new IllegalStateException(
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala
index 13c8c7c..f1348da 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala
@@ -52,7 +52,7 @@ object ProjectPlan extends Logging {
k_lit(c._1._1).as(s"${System.identityHashCode(rel)}_prj${c._2}")
}
})
- .map(c => { // find and rename the duplicated columns KAP#16751
+ .map(c => { // find and rename the duplicated columns
if (!(duplicatedColumnsCount contains c)) {
duplicatedColumnsCount += (c -> 0)
c
diff --git
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index 73d7424..f0077af 100644
---
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -62,11 +62,11 @@ object ResultPlan extends Logging {
val resultTypes = rowType.getFieldList.asScala
val jobGroup = Thread.currentThread().getName
val sparkContext = SparderContext.getSparkSession.sparkContext
- val kapConfig = KylinConfig.getInstanceFromEnv
+ val kylinConfig = KylinConfig.getInstanceFromEnv
var pool = "heavy_tasks"
val partitionsNum =
- if (kapConfig.getSparkSqlShufflePartitions != -1) {
- kapConfig.getSparkSqlShufflePartitions
+ if (kylinConfig.getSparkSqlShufflePartitions != -1) {
+ kylinConfig.getSparkSqlShufflePartitions
} else {
Math.min(QueryContextFacade.current().getSourceScanBytes /
PARTITION_SPLIT_BYTES + 1,
SparderContext.getTotalCore).toInt
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java
index ddcc79f..113d530 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java
@@ -81,13 +81,13 @@ public class NExecAndComp {
appendLimitQueries++;
}
- Dataset<Row> kapResult = (recAndQueryResult == null) ?
queryWithKap(prj, joinType, sqlAndAddedLimitSql)
- : queryWithKap(prj, joinType, sqlAndAddedLimitSql,
recAndQueryResult);
+ Dataset<Row> kylinResult = (recAndQueryResult == null) ?
queryWithKylin(prj, joinType, sqlAndAddedLimitSql)
+ : queryWithKylin(prj, joinType, sqlAndAddedLimitSql,
recAndQueryResult);
addQueryPath(recAndQueryResult, query, sql);
Dataset<Row> sparkResult = queryWithSpark(prj, sql,
query.getFirst());
- List<Row> kapRows = SparkQueryTest.castDataType(kapResult,
sparkResult).toJavaRDD().collect();
+ List<Row> kylinRows = SparkQueryTest.castDataType(kylinResult,
sparkResult).toJavaRDD().collect();
List<Row> sparkRows = sparkResult.toJavaRDD().collect();
- if (!compareResults(normRows(sparkRows), normRows(kapRows),
CompareLevel.SUBSET)) {
+ if (!compareResults(normRows(sparkRows), normRows(kylinRows),
CompareLevel.SUBSET)) {
throw new IllegalArgumentException("Result not match");
}
}
@@ -115,8 +115,8 @@ public class NExecAndComp {
// Query from Cube
long startTime = System.currentTimeMillis();
- Dataset<Row> cubeResult = (recAndQueryResult == null) ?
queryWithKap(prj, joinType, Pair.newPair(sql, sql))
- : queryWithKap(prj, joinType, Pair.newPair(sql, sql),
recAndQueryResult);
+ Dataset<Row> cubeResult = (recAndQueryResult == null) ?
queryWithKylin(prj, joinType, Pair.newPair(sql, sql))
+ : queryWithKylin(prj, joinType, Pair.newPair(sql, sql),
recAndQueryResult);
addQueryPath(recAndQueryResult, query, sql);
if (compareLevel == CompareLevel.SAME) {
Dataset<Row> sparkResult = queryWithSpark(prj, sql,
query.getFirst());
@@ -145,7 +145,7 @@ public class NExecAndComp {
}
}
- public static boolean execAndCompareQueryResult(Pair<String, String>
queryForKap,
+ public static boolean execAndCompareQueryResult(Pair<String, String>
queryForKylin,
Pair<String, String>
queryForSpark, String joinType, String prj,
Map<String, CompareEntity>
recAndQueryResult) {
String sqlForSpark = changeJoinType(queryForSpark.getSecond(),
joinType);
@@ -153,11 +153,11 @@ public class NExecAndComp {
Dataset<Row> sparkResult = queryWithSpark(prj,
queryForSpark.getSecond(), queryForSpark.getFirst());
List<Row> sparkRows = sparkResult.toJavaRDD().collect();
- String sqlForKap = changeJoinType(queryForKap.getSecond(), joinType);
- Dataset<Row> cubeResult = queryWithKap(prj, joinType,
Pair.newPair(sqlForKap, sqlForKap));
- List<Row> kapRows = SparkQueryTest.castDataType(cubeResult,
sparkResult).toJavaRDD().collect();
+ String sqlForKylin = changeJoinType(queryForKylin.getSecond(),
joinType);
+ Dataset<Row> cubeResult = queryWithKylin(prj, joinType,
Pair.newPair(sqlForKylin, sqlForKylin));
+ List<Row> kylinRows = SparkQueryTest.castDataType(cubeResult,
sparkResult).toJavaRDD().collect();
- return sparkRows.equals(kapRows);
+ return sparkRows.equals(kylinRows);
}
private static List<Row> normRows(List<Row> rows) {
@@ -184,8 +184,8 @@ public class NExecAndComp {
"The method has deprecated, please call
org.apache.kylin.engine.spark2.NExecAndComp.execAndCompareNew");
}
- private static Dataset<Row> queryWithKap(String prj, String joinType,
Pair<String, String> pair,
- Map<String, CompareEntity>
compareEntityMap) {
+ private static Dataset<Row> queryWithKylin(String prj, String joinType,
Pair<String, String> pair,
+ Map<String, CompareEntity>
compareEntityMap) {
compareEntityMap.putIfAbsent(pair.getFirst(), new CompareEntity());
final CompareEntity entity = compareEntityMap.get(pair.getFirst());
@@ -196,7 +196,7 @@ public class NExecAndComp {
return rowDataset;
}
- private static Dataset<Row> queryWithKap(String prj, String joinType,
Pair<String, String> sql) {
+ private static Dataset<Row> queryWithKylin(String prj, String joinType,
Pair<String, String> sql) {
return queryFromCube(prj, changeJoinType(sql.getSecond(), joinType));
}
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java
index 897cef2..cf58cac 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/file_pruning/NFilePruningTest.java
@@ -264,7 +264,7 @@ public class NFilePruningTest extends
LocalWithSparkSessionTest {
//pruningWithVariousTypesScenario();
} finally {
- System.clearProperty("kap.storage.columnar.shard-rowcount");
+ System.clearProperty("kylin.storage.columnar.shard-rowcount");
}
}
@@ -287,7 +287,7 @@ public class NFilePruningTest extends
LocalWithSparkSessionTest {
NExecAndComp.execAndCompare(query, getProject(),
NExecAndComp.CompareLevel.SAME, "left");
} finally {
- System.clearProperty("kap.storage.columnar.shard-rowcount");
+ System.clearProperty("kylin.storage.columnar.shard-rowcount");
}
}
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java
index bfa1d1c..ba17302 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java
@@ -28,7 +28,6 @@ import
org.apache.kylin.query.util.QueryUtil.IQueryTransformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -70,15 +69,14 @@ public class QueryUtil {
String[] currentTransformers =
queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName)
.toArray(String[]::new);
String[] configTransformers = kylinConfig.getQueryTransformers();
- boolean containsCCTransformer =
Arrays.stream(configTransformers).anyMatch(t ->
t.equals("io.kyligence.kap.query.util.ConvertToComputedColumn"));
boolean transformersEqual = Objects.deepEquals(currentTransformers,
configTransformers);
- if (transformersEqual && (isCCNeeded || !containsCCTransformer)) {
+ if (transformersEqual && (isCCNeeded)) {
return;
}
List<IQueryTransformer> transformers = Lists.newArrayList();
for (String clz : configTransformers) {
- if (!isCCNeeded &&
clz.equals("io.kyligence.kap.query.util.ConvertToComputedColumn"))
+ if (!isCCNeeded)
continue;
try {
diff --git a/webapp/app/js/model/cubeConfig.js
b/webapp/app/js/model/cubeConfig.js
index 80f0ddd..6d57c42 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -26,7 +26,7 @@ KylinApp.constant('cubeConfig', {
engineType:[
//{name:'MapReduce',value: 2},
//{name:'Spark',value: 4},
- {name:'Spark2',value: 6}
+ {name:'Spark',value: 6}
//{name:'Flink',value: 5}
],
joinTypes: [
diff --git a/webapp/app/partials/cubeDesigner/advanced_settings.html
b/webapp/app/partials/cubeDesigner/advanced_settings.html
index 733b341..98bc1ab 100755
--- a/webapp/app/partials/cubeDesigner/advanced_settings.html
+++ b/webapp/app/partials/cubeDesigner/advanced_settings.html
@@ -382,7 +382,7 @@
</select>
<span
ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==2">MapReduce</span>
<span
ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==4">Spark</span>
- <span
ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==6">Spark2</span>
+ <span
ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==6">Spark</span>
<span
ng-if="state.mode=='view'&&cubeMetaFrame.engine_type==5">Flink</span>
</div>
</div>