This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new ed241cd  KYLIN-4925 Use Spark 3.1 for Kylin 4.0 (#1601)
ed241cd is described below

commit ed241cd753efd40720d205d90e0ea54c46b0f9ee
Author: Congling Xia <xiacongl...@xiaomi.com>
AuthorDate: Sat Jun 26 23:32:49 2021 +0800

    KYLIN-4925 Use Spark 3.1 for Kylin 4.0 (#1601)
---
 .travis.yml                                        |   3 +-
 assembly/pom.xml                                   |   2 +-
 engine-flink/pom.xml                               |   4 +-
 engine-spark/pom.xml                               |   6 +-
 kylin-it/pom.xml                                   |   8 +-
 kylin-spark-project/kylin-spark-common/pom.xml     |  60 ++++-
 .../spark/common/util/KylinDateTimeUtils.scala     |   8 +-
 .../spark/dict/NGlobalDictBuilderAssist.scala      |   2 +-
 .../org/apache/spark/sql/KylinFunctions.scala      |   2 +-
 .../catalyst/expressions/KylinExpresssions.scala   |   8 +-
 .../catalyst/expressions/TimestampAddImpl.scala    |   6 +-
 .../catalyst/expressions/TimestampDiffImpl.scala   |  11 +-
 .../sql/execution/datasource/FilePruner.scala      |  34 ++-
 .../execution/datasource/KylinSourceStrategy.scala |  29 ++-
 .../org/apache/spark/utils/KylinReflectUtils.scala |  24 +-
 .../spark/monitor/MonitorExecutorExtension.scala   |   0
 .../sql/catalyst/expressions/ExpressionUtils.scala |  10 +-
 .../sql/execution/KylinFileSourceScanExec.scala    |   9 +-
 .../spark/sql/execution/KylinJoinSelection.scala   |   0
 .../sql/execution/datasource/FilterExt.scala}      |  22 +-
 .../spark/sql/hive/utils/QueryMetricUtils.scala    |   1 +
 .../spark/monitor/MonitorExecutorExtension.scala   |   8 +-
 .../sql/catalyst/expressions/ExpressionUtils.scala |  12 +-
 .../sql/execution/KylinFileSourceScanExec.scala    |  55 +++--
 .../spark/sql/execution/KylinJoinSelection.scala   | 249 +++++++++++++++++++++
 .../spark/sql/hive/utils/QueryMetricUtils.scala    |  52 +++--
 kylin-spark-project/kylin-spark-engine/pom.xml     |  32 ++-
 .../spark/SparkBatchCubingEngineParquet.java       |   2 +-
 .../kylin/engine/spark/job/NSparkCubingJob.java    |   2 +-
 .../kylin/engine/spark/job/NSparkCubingUtil.java   |   2 +-
 .../kylin/cluster/ClusterInfoFetcherFactory.scala  |   2 +-
 .../spark/builder/CubeDictionaryBuilder.scala      |   2 +-
 .../kylin/engine/spark/job/CuboidAggregator.scala  |  18 +-
 .../kylin/query/runtime/ExpressionConverter.scala  |   5 +-
 .../apache/spark/sql/udf/TimestampAddImpl.scala    |   5 +-
 .../apache/spark/sql/udf/TimestampDiffImpl.scala   |  11 +-
 .../org/apache/spark/util/KylinReflectUtils.scala  |  58 -----
 .../engine/spark/LocalWithSparkSessionTest.java    |   2 +-
 .../kylin/engine/spark/job/JobStepFactoryTest.java |   2 +-
 .../kylin/engine/spark/job/SparkCubingJobTest.java |   2 +-
 .../engine/spark/builder/TestCreateFlatTable.scala |  27 ++-
 kylin-spark-project/kylin-spark-metadata/pom.xml   |  38 +++-
 .../org/apache/spark/sql/utils/SparkTypeUtil.scala |  17 +-
 .../engine/spark/cross/CrossDateTimeUtils.scala    |  52 +++++
 .../engine/spark/cross/CrossDateTimeUtils.scala    |  55 +++++
 kylin-spark-project/kylin-spark-query/pom.xml      |  77 ++++---
 .../kylin/query/runtime/ExpressionConverter.scala  |   6 +-
 .../kylin/query/runtime/SparderRexVisitor.scala    |   6 +-
 .../sql/hive/KylinHiveSessionStateBuilder.scala    |   0
 .../sql/hive/KylinHiveSessionStateBuilder.scala    |  14 +-
 kylin-spark-project/kylin-spark-test/pom.xml       |  10 +-
 .../kylin/engine/spark2/NBuildAndQueryTest.java    |   2 +-
 .../spark2/NManualBuildAndQueryCuboidTest.java     |   2 +-
 kylin-spark-project/pom.xml                        |  12 +-
 metastore-hbase/pom.xml                            |   2 +-
 metrics-reporter-kafka/pom.xml                     |   2 +-
 parquet-assembly/pom.xml                           |   8 +
 pom.xml                                            |  77 ++++++-
 server-base/pom.xml                                |   4 +-
 .../apache/kylin/rest/service/QueryService.java    |   2 +-
 server/pom.xml                                     |   8 +-
 source-kafka/pom.xml                               |   2 +-
 storage-hbase/pom.xml                              |   2 +-
 stream-receiver/pom.xml                            |   2 +-
 stream-source-kafka/pom.xml                        |   2 +-
 65 files changed, 904 insertions(+), 295 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 3dcb586..46236bc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -46,7 +46,8 @@ before_script:
 script:
   # mvn clean org.jacoco:jacoco-maven-plugin:prepare-agent test 
coveralls:report -e
   # Skip coveralls temporarily, fix it asap
-  - mvn clean test
+  - mvn clean test -q
+  - mvn clean test -q -Psandbox -Pspark3
   - if [[ -n "${TRAVIS_PULL_REQUEST_SLUG}" && "${TRAVIS_PULL_REQUEST_SLUG}" != 
"${TRAVIS_REPO_SLUG}" ]]; then
         echo "The pull request from ${TRAVIS_PULL_REQUEST_SLUG} is an EXTERNAL 
pull request. Skip sonar analysis.";
     else
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 041b2c2..2035e84 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -169,7 +169,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml
index b10f3e5..2a99fe3 100644
--- a/engine-flink/pom.xml
+++ b/engine-flink/pom.xml
@@ -60,13 +60,13 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_2.11</artifactId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+            
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
         </dependency>
 
         <!-- Hadoop dependency -->
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index cc1d9d6..af6486c 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -46,19 +46,19 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.11</artifactId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index b7986ab..56a3dfb 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -272,7 +272,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
@@ -292,7 +292,7 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -305,7 +305,7 @@
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -318,7 +318,7 @@
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.11</artifactId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
             <exclusions>
diff --git a/kylin-spark-project/kylin-spark-common/pom.xml 
b/kylin-spark-project/kylin-spark-common/pom.xml
index 44be54b..b027793 100644
--- a/kylin-spark-project/kylin-spark-common/pom.xml
+++ b/kylin-spark-project/kylin-spark-common/pom.xml
@@ -17,7 +17,8 @@
  limitations under the License.
 -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <name>Apache Kylin 4.X - Common</name>
@@ -41,6 +42,7 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-common</artifactId>
             <type>test-jar</type>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.kylin</groupId>
@@ -48,6 +50,60 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
-    
 
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/${spark.version.dir}</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>compile-version-dependent-source</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDir>${spark.version.dir}</sourceDir>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>compile-common-scala-source</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDir>scala</sourceDir>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
index a5a6451..c03c1d0 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala
@@ -19,7 +19,7 @@
 package org.apache.kylin.engine.spark.common.util
 
 import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 
 object KylinDateTimeUtils {
   val MICROS_PER_MILLIS: Long = 1000L
@@ -34,7 +34,7 @@ object KylinDateTimeUtils {
   def addMonths(timestamp: Long, m: Int): Long = {
     // spark ts unit is microsecond
     val ms = timestamp / 1000
-    val day0 = DateTimeUtils.millisToDays(ms)
+    val day0 = CrossDateTimeUtils.millisToDays(ms)
     val millis = ms - day0 * MILLIS_PER_DAY
     val x = dateAddMonths(day0, m)
     (x * MILLIS_PER_DAY + millis) * 1000
@@ -63,9 +63,9 @@ object KylinDateTimeUtils {
 
   def subtractMonths(t0: Long, t1: Long): Int = {
     val millis0 = floorMod(t0, MILLIS_PER_DAY)
-    val d0 = DateTimeUtils.millisToDays(t0)
+    val d0 = CrossDateTimeUtils.millisToDays(t0)
     val millis1 = floorMod(t1, MILLIS_PER_DAY)
-    val d1 = DateTimeUtils.millisToDays(t1)
+    val d1 = CrossDateTimeUtils.millisToDays(t1)
     var x = dateSubtractMonths(d0, d1)
     val d2 = dateAddMonths(d1, x)
     if (x > 0 && d2 == d0 && millis0 < millis1) x -= 1
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
index dbac8b8..3ce9bca 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala
@@ -55,7 +55,7 @@ object NGlobalDictBuilderAssist extends Logging {
     existsDictDs
       .repartition(bucketPartitionSize, 
col(existsDictDs.schema.head.name).cast(StringType))
       .foreachPartition {
-        iter =>
+        iter: Iterator[(String, Long)] =>
           val partitionID = TaskContext.get().partitionId()
           logInfo(s"Rebuild partition dict col: ${ref.identity}, partitionId: 
$partitionID")
           val d = broadcastDict.value
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 22cfb0d..8e112c1 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -51,7 +51,7 @@ object KylinFunctions {
     case _ => Column(Literal(literal))
   }
 
-  def k_like(left: Column, right: Column): Column = Column(Like(left.expr, 
right.expr))
+  def k_like(left: Column, right: Column): Column = Column(new Like(left.expr, 
right.expr))
 
   def in(value: Expression, list: Seq[Expression]): Column = Column(In(value, 
list))
 
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index 331309a..a1a45fa 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -302,11 +302,11 @@ case class DictEncode(left: Expression, mid: Expression, 
right: Expression) exte
     val globalDictClass = classOf[NGlobalDictionary].getName
     val bucketDictClass = classOf[NBucketDictionary].getName
 
-    val globalDictTerm = ctx.addMutableState(globalDictClass, 
s"${mid.simpleString.replace("[", "").replace("]", "")}_globalDict")
-    val bucketDictTerm = ctx.addMutableState(bucketDictClass, 
s"${mid.simpleString.replace("[", "").replace("]", "")}_bucketDict")
+    val globalDictTerm = ctx.addMutableState(globalDictClass, 
s"${ExpressionUtils.simpleString(mid).replace("[", "").replace("]", 
"")}_globalDict")
+    val bucketDictTerm = ctx.addMutableState(bucketDictClass, 
s"${ExpressionUtils.simpleString(mid).replace("[", "").replace("]", 
"")}_bucketDict")
 
-    val dictParamsTerm = mid.simpleString
-    val bucketSizeTerm = right.simpleString.toInt
+    val dictParamsTerm = ExpressionUtils.simpleString(mid)
+    val bucketSizeTerm = ExpressionUtils.simpleString(right).toInt
 
     val initBucketDictFuncName = 
ctx.addNewFunction(s"init${bucketDictTerm.replace("[", "").replace("]", 
"")}BucketDict",
       s"""
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
index 55f683c..e1a4d35 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
-import java.util.{Calendar, Locale, TimeZone}
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import java.util.{Calendar, Locale, TimeZone}
 
 object TimestampAddImpl {
   private val localCalendar = new ThreadLocal[Calendar] {
@@ -34,7 +34,7 @@ object TimestampAddImpl {
     calendar.clear()
     addTime("DAY", time, calendar)
     addTime(unit, increment, calendar)
-    DateTimeUtils.millisToDays(calendar.getTimeInMillis)
+    CrossDateTimeUtils.millisToDays(calendar.getTimeInMillis)
   }
 
   // add long on DateType
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
index e85abe0..cbf42cf 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala
@@ -19,17 +19,16 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import java.util.Locale
-
 import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
 import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 
 object TimestampDiffImpl {
 
   // TimestampType -> DateType
   def evaluate(unit: String, timestamp: Long, date: Int): Long = {
     val before = timestamp / MICROS_PER_MILLIS
-    val after = DateTimeUtils.daysToMillis(date)
+    val after = CrossDateTimeUtils.daysToMillis(date)
     convertDuration(unit, before, after)
   }
 
@@ -42,14 +41,14 @@ object TimestampDiffImpl {
 
   // DateType -> DateType
   def evaluate(unit: String, date1: Int, date2: Int): Long = {
-    val before = DateTimeUtils.daysToMillis(date1)
-    val after = DateTimeUtils.daysToMillis(date2)
+    val before = CrossDateTimeUtils.daysToMillis(date1)
+    val after = CrossDateTimeUtils.daysToMillis(date2)
     convertDuration(unit, before, after)
   }
 
   // DateType -> TimestampType
   def evaluate(unit: String, date: Int, timestamp: Long): Long = {
-    val before = DateTimeUtils.daysToMillis(date)
+    val before = CrossDateTimeUtils.daysToMillis(date)
     val after = timestamp / MICROS_PER_MILLIS
     convertDuration(unit, before, after)
   }
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 91faab4..2784170 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -19,7 +19,6 @@
 package org.apache.spark.sql.execution.datasource
 
 import java.sql.{Date, Timestamp}
-
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.kylin.common.util.DateFormat
 import org.apache.kylin.cube.cuboid.Cuboid
@@ -29,7 +28,7 @@ import 
org.apache.kylin.engine.spark.metadata.MetadataConverter
 import org.apache.kylin.metadata.model.{PartitionDesc, SegmentStatusEnum}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
EmptyRow, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
EmptyRow, Expression, ExpressionUtils, Literal}
 import org.apache.spark.sql.catalyst.{InternalRow, expressions}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources._
@@ -346,7 +345,7 @@ class FilePruner(cubeInstance: CubeInstance,
       segDirs
     } else {
       val translatedFilter = filters.map(filter => convertCastFilter(filter))
-        .flatMap(DataSourceStrategy.translateFilter)
+        .flatMap(ExpressionUtils.translateFilter)
       if (translatedFilter.isEmpty) {
         logInfo("Can not use filters to prune segments.")
         segDirs
@@ -357,8 +356,8 @@ class FilePruner(cubeInstance: CubeInstance,
             val tsRange = cubeInstance.getSegment(e.segmentName, 
SegmentStatusEnum.READY).getTSRange
             SegFilters(tsRange.startValue, tsRange.endValue, pattern)
               .foldFilter(reducedFilter) match {
-              case Trivial(true) => true
-              case Trivial(false) => false
+              case AlwaysTrue => true
+              case AlwaysFalse => false
             }
           }
         }
@@ -555,22 +554,20 @@ case class SegFilters(start: Long, end: Long, pattern: 
String) extends Logging {
         }
       case And(left: Filter, right: Filter) =>
         And(foldFilter(left), foldFilter(right)) match {
-          case And(Trivial(false), _) => Trivial(false)
-          case And(_, Trivial(false)) => Trivial(false)
-          case And(Trivial(true), right) => right
-          case And(left, Trivial(true)) => left
+          case And(AlwaysFalse, _) => Trivial(false)
+          case And(_, AlwaysFalse) => Trivial(false)
+          case And(AlwaysTrue, right) => right
+          case And(left, AlwaysTrue) => left
           case other => other
         }
       case Or(left: Filter, right: Filter) =>
         Or(foldFilter(left), foldFilter(right)) match {
-          case Or(Trivial(true), _) => Trivial(true)
-          case Or(_, Trivial(true)) => Trivial(true)
-          case Or(Trivial(false), right) => right
-          case Or(left, Trivial(false)) => left
+          case Or(AlwaysTrue, _) => Trivial(true)
+          case Or(_, AlwaysTrue) => Trivial(true)
+          case Or(AlwaysFalse, right) => right
+          case Or(left, AlwaysFalse) => left
           case other => other
         }
-      case trivial: Trivial =>
-        trivial
       case unsupportedFilter =>
         // return 'true' to scan all partitions
         // currently unsupported filters are:
@@ -581,8 +578,7 @@ case class SegFilters(start: Long, end: Long, pattern: 
String) extends Logging {
         Trivial(true)
     }
   }
-}
-
-case class Trivial(value: Boolean) extends Filter {
-  override def references: Array[String] = findReferences(value)
+  def Trivial(value: Boolean): Filter = {
+    if (value) AlwaysTrue else AlwaysFalse
+  }
 }
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
index 9e9a29c..0713b7c 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala
@@ -17,14 +17,16 @@
  */
 package org.apache.spark.sql.execution.datasource
 
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.{execution, Strategy}
+import org.apache.spark.sql.{Strategy, execution}
 import org.apache.spark.sql.execution.{KylinFileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, ExpressionSet, NamedExpression, SubqueryExpression}
+import org.apache.spark.utils.KylinReflectUtils
 
 /**
  * A strategy for planning scans over collections of files that might be 
partitioned or bucketed
@@ -119,15 +121,34 @@ object KylinSourceStrategy extends Strategy with Logging {
       val outputAttributes = readDataColumns ++ partitionColumns
       // to trigger setShufflePartitions
       filePruner.listFiles(partitionKeyFilters.iterator.toSeq, 
dataFilters.iterator.toSeq)
-      val scan =
-        new KylinFileSourceScanExec(
+      val className = "org.apache.spark.sql.execution.KylinFileSourceScanExec"
+      val (scan: KylinFileSourceScanExec, ignored: Class[_]) = if 
(SPARK_VERSION.startsWith("2.4")) {
+        KylinReflectUtils.createObject(
+          className,
           fsRelation,
           outputAttributes,
           outputSchema,
           partitionKeyFilters.toSeq,
           filePruner.getShardSpec,
           dataFilters,
-          table.map(_.identifier))
+          table.map(_.identifier)
+        )
+      } else if (SPARK_VERSION.startsWith("3.1")) {
+        KylinReflectUtils.createObject(
+          className,
+          fsRelation,
+          outputAttributes,
+          outputSchema,
+          partitionKeyFilters.toSeq,
+          filePruner.getShardSpec,
+          None,
+          dataFilters,
+          table.map(_.identifier),
+          java.lang.Boolean.TRUE
+        )
+      } else {
+        throw new UnsupportedOperationException(s"Spark version 
${SPARK_VERSION} is not supported.")
+      }
 
       val afterScanFilter = 
afterScanFilters.toSeq.reduceOption(expressions.And)
       val withFilter = afterScanFilter.map(execution.FilterExec(_, 
scan)).getOrElse(scan)
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
index 783eeb4..28b3a6e 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala
@@ -27,19 +27,21 @@ object KylinReflectUtils {
   private val rm = universe.runtimeMirror(getClass.getClassLoader)
 
   def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any = 
{
-    if (SPARK_VERSION.startsWith("2.4")) {
-      var className: String =
-        "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
-      if (!"hive".equals(sparkContext.getConf
-        .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
-        className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
-      }
-      val tuple = createObject(className, kylinSession, None)
-      val method = tuple._2.getMethod("build")
-      method.invoke(tuple._1)
+    var className: String =
+      "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
+    if (!"hive".equals(sparkContext.getConf
+      .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
+      className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
+    }
+
+    val (instance, clazz) = if (SPARK_VERSION.startsWith("2.4")) {
+      createObject(className, kylinSession, None)
+    } else if (SPARK_VERSION.startsWith("3.1")) {
+      createObject(className, kylinSession, None, Map.empty)
     } else {
-      throw new UnsupportedOperationException("Spark version not supported")
+      throw new UnsupportedOperationException(s"Spark version ${SPARK_VERSION} 
not supported")
     }
+    clazz.getMethod("build").invoke(instance)
   }
 
   def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/monitor/MonitorExecutorExtension.scala
similarity index 100%
copy from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
copy to 
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/monitor/MonitorExecutorExtension.scala
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
similarity index 91%
copy from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
copy to 
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index d9bdcc5..232f6cc 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -21,7 +21,9 @@ package org.apache.spark.sql.catalyst.expressions
 import scala.util.{Failure, Success, Try}
 import scala.reflect.ClassTag
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import 
org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FunctionBuilder, 
expressions}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
 
 object
 ExpressionUtils {
@@ -78,6 +80,12 @@ ExpressionUtils {
     (name, (expressionInfo[T](name), builder))
   }
 
+  def simpleString(expression: Expression): String = expression.simpleString
+
+  def translateFilter(expression: Expression): Option[Filter] = {
+    DataSourceStrategy.translateFilter(expression)
+  }
+
   private def expressionInfo[T <: Expression : ClassTag](name: String): 
ExpressionInfo = {
     val clazz = scala.reflect.classTag[T].runtimeClass
     val df = clazz.getAnnotation(classOf[ExpressionDescription])
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
similarity index 98%
copy from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
copy to 
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 90ff597..0fbb39d 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -21,7 +21,7 @@ package org.apache.spark.sql.execution
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
 import org.apache.kylin.common.KylinConfig
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Expression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Expression, ExpressionUtils, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources._
@@ -57,7 +57,7 @@ class KylinFileSourceScanExec(
     ret
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = {
+  private lazy val _inputRDD: RDD[InternalRow] = {
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -77,11 +77,12 @@ class KylinFileSourceScanExec(
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    inputRDD :: Nil
+    _inputRDD :: Nil
   }
 
   @transient
-  private val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
+  private val pushedDownFilters = dataFilters
+    .flatMap(ExpressionUtils.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinJoinSelection.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
similarity index 100%
rename from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinJoinSelection.scala
rename to 
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala
similarity index 68%
copy from 
kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
copy to 
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala
index 9f709f4..d9ca8ab 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala
@@ -14,17 +14,23 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.cluster
+package org.apache.spark.sql.execution.datasource
 
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.util.KylinReflectUtils
+import org.apache.spark.sql.sources.Filter
 
+case class AlwaysTrue() extends Filter {
+  override def references: Array[String] = Array.empty
+}
+
+object AlwaysTrue extends AlwaysTrue {
+}
 
-object ClusterInfoFetcherFactory {
 
-  def create(kylinConfig: KylinConfig): ClusterInfoFetcher = {
-    
KylinReflectUtils.createObject(kylinConfig.getClusterInfoFetcherClassName)._1.asInstanceOf[ClusterInfoFetcher]
-  }
+case class AlwaysFalse() extends Filter {
+  override def references: Array[String] = Array.empty
 }
+
+object AlwaysFalse extends AlwaysFalse {
+}
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
similarity index 99%
copy from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
copy to 
kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index c928e84..cabe9c6 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -42,6 +42,7 @@ object QueryMetricUtils extends Logging {
           // There is only 'numOutputRows' metric in HiveTableScanExec
           (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l)
       }
+
       val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1))
         .filter(_ >= 0L).toList.asJava
       val scanFiles = metrics.map(metrics => 
java.lang.Long.valueOf(metrics._2))
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala
similarity index 90%
rename from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
rename to 
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala
index d291faf..984fe45 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala
@@ -18,10 +18,12 @@
 
 package org.apache.spark.memory
 
+import java.util
+
+import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
-import org.apache.spark.util.RpcUtils
-import org.apache.spark.{ExecutorPlugin, SparkConf, SparkEnv}
+import org.apache.spark.{SparkConf, SparkEnv}
 
 class MonitorExecutorExtension extends ExecutorPlugin with Logging {
 
@@ -31,7 +33,7 @@ class MonitorExecutorExtension extends ExecutorPlugin with 
Logging {
 
   val sparkConf: SparkConf = env.conf
 
-  override def init(): Unit = {
+  override def init(pluginContext: PluginContext, extraConf: util.Map[String, 
String]): Unit = {
 
     initMonitorEnv()
 
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
similarity index 91%
rename from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
rename to 
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
index d9bdcc5..8405493 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala
@@ -22,6 +22,8 @@ import scala.util.{Failure, Success, Try}
 import scala.reflect.ClassTag
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
 
 object
 ExpressionUtils {
@@ -78,6 +80,12 @@ ExpressionUtils {
     (name, (expressionInfo[T](name), builder))
   }
 
+  def simpleString(expr: Expression): String = expr.simpleString(1)
+
+  def translateFilter(expr: Expression): Option[Filter] ={
+    DataSourceStrategy.translateFilter(expr, supportNestedPredicatePushdown = 
true)
+  }
+
   private def expressionInfo[T <: Expression : ClassTag](name: String): 
ExpressionInfo = {
     val clazz = scala.reflect.classTag[T].runtimeClass
     val df = clazz.getAnnotation(classOf[ExpressionDescription])
@@ -91,7 +99,9 @@ ExpressionUtils {
           df.arguments(),
           df.examples(),
           df.note(),
-          df.since())
+          df.group(),
+          df.since(),
+          df.deprecated())
       } else {
         // This exists for the backward compatibility with old 
`ExpressionDescription`s defining
         // the extended description in `extended()`.
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
similarity index 86%
rename from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
rename to 
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
index 90ff597..957944b 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala
@@ -21,43 +21,55 @@ package org.apache.spark.sql.execution
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, 
Path}
 import org.apache.kylin.common.KylinConfig
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Expression, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Expression, ExpressionUtils, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec}
 import org.apache.spark.sql.types.StructType
 
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.TimeUnit.NANOSECONDS
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 
 class KylinFileSourceScanExec(
   @transient override val relation: HadoopFsRelation,
   override val output: Seq[Attribute],
   override val requiredSchema: StructType,
   override val partitionFilters: Seq[Expression],
-  val optionalShardSpec: Option[ShardSpec],
+  optionalShardSpec: Option[ShardSpec],
+  ignoredNumCoalescedBuckets: Option[Int],
   override val dataFilters: Seq[Expression],
-  override val tableIdentifier: Option[TableIdentifier]) extends 
FileSourceScanExec(
-  relation, output, requiredSchema, partitionFilters, None, dataFilters, 
tableIdentifier) {
+  override val tableIdentifier: Option[TableIdentifier],
+  ignoredDisableBucketedScan: Boolean = true) extends FileSourceScanExec(
+  relation, output, requiredSchema, partitionFilters, None, None, dataFilters, 
tableIdentifier, true) {
+
+  private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
 
-  @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+  private def sendDriverMetrics(): Unit = {
+    driverMetrics.foreach(e => metrics(e._1).add(e._2))
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+  }
+
+  @transient lazy val _selectedPartitions: Seq[PartitionDirectory] = {
     val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
-    val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
-
-    metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-    metrics("metadataTime").add(timeTakenMs)
 
-    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-      metrics("numFiles") :: metrics("metadataTime") :: Nil)
+    driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum
+    driverMetrics("filesSize") = ret.map(_.files.map(_.getLen).sum).sum
+    if (relation.partitionSchemaOption.isDefined) {
+      driverMetrics("numPartitions") = ret.length
+    }
 
+    val timeTakenMs = NANOSECONDS.toMillis((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs)
+    driverMetrics("metadataTime") = timeTakenMs
     ret
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = {
+  private lazy val _inputRDD: RDD[InternalRow] = {
     val readFile: (PartitionedFile) => Iterator[InternalRow] =
       relation.fileFormat.buildReaderWithPartitionValues(
         sparkSession = relation.sparkSession,
@@ -68,20 +80,23 @@ class KylinFileSourceScanExec(
         options = relation.options,
         hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 
-    optionalShardSpec match {
+    val readRDD = optionalShardSpec match {
       case Some(spec) if 
KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled =>
-        createShardingReadRDD(spec, readFile, selectedPartitions, relation)
+        createShardingReadRDD(spec, readFile, _selectedPartitions, relation)
       case _ =>
-        createNonShardingReadRDD(readFile, selectedPartitions, relation)
+        createNonShardingReadRDD(readFile, _selectedPartitions, relation)
     }
+    sendDriverMetrics()
+    readRDD
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    inputRDD :: Nil
+    _inputRDD :: Nil
   }
 
   @transient
-  private val pushedDownFilters = 
dataFilters.flatMap(DataSourceStrategy.translateFilter)
+  private val pushedDownFilters = dataFilters
+    .flatMap(ExpressionUtils.translateFilter)
   logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
 
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
new file mode 100644
index 0000000..243ffd6
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.execution
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
+import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
JoinSelectionHelper}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractSingleColumnNullAwareAntiJoin}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.{SparkSession, Strategy}
+
+import javax.annotation.concurrent.GuardedBy
+
+/**
+ * .
+ */
+case class KylinJoinSelection(session: SparkSession) extends Strategy
+  with JoinSelectionHelper
+  with PredicateHelper
+  with Logging {
+
+  val conf: SQLConf = session.sessionState.conf
+
+  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+
+    // If it is an equi-join, we first look at the join hints w.r.t. the 
following order:
+    //   1. broadcast hint: pick broadcast hash join if the join type is 
supported. If both sides
+    //      have the broadcast hints, choose the smaller side (based on stats) 
to broadcast.
+    //   2. sort merge hint: pick sort merge join if join keys are sortable.
+    //   3. shuffle hash hint: We pick shuffle hash join if the join type is 
supported. If both
+    //      sides have the shuffle hash hints, choose the smaller side (based 
on stats) as the
+    //      build side.
+    //   4. shuffle replicate NL hint: pick cartesian product if join type is 
inner like.
+    //
+    // If there is no hint or the hints are not applicable, we follow these 
rules one by one:
+    //   1. Pick broadcast hash join if one side is small enough to broadcast, 
and the join type
+    //      is supported. If both sides are small, choose the smaller side 
(based on stats)
+    //      to broadcast.
+    //   2. Pick shuffle hash join if one side is small enough to build local 
hash map, and is
+    //      much smaller than the other side, and 
`spark.sql.join.preferSortMergeJoin` is false.
+    //   3. Pick sort merge join if the join keys are sortable.
+    //   4. Pick cartesian product if join type is inner like.
+    //   5. Pick broadcast nested loop join as the final solution. It may OOM 
but we don't have
+    //      other choice.
+    case j@ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, 
left, right, hint) =>
+      def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
+        getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, 
conf).map {
+          buildSide =>
+            Seq(joins.BroadcastHashJoinExec(
+              leftKeys,
+              rightKeys,
+              joinType,
+              buildSide,
+              nonEquiCond,
+              planLater(left),
+              planLater(right)))
+        }
+      }
+
+      def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
+        getShuffleHashJoinBuildSide(left, right, joinType, hint, 
onlyLookingAtHint, conf).map {
+          buildSide =>
+            Seq(joins.ShuffledHashJoinExec(
+              leftKeys,
+              rightKeys,
+              joinType,
+              buildSide,
+              nonEquiCond,
+              planLater(left),
+              planLater(right)))
+        }
+      }
+
+      def createSortMergeJoin() = {
+        if (RowOrdering.isOrderable(leftKeys)) {
+          Some(Seq(joins.SortMergeJoinExec(
+            leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), 
planLater(right))))
+        } else {
+          None
+        }
+      }
+
+      def createCartesianProduct() = {
+        if (joinType.isInstanceOf[InnerLike]) {
+          // `CartesianProductExec` can't implicitly evaluate equal join 
condition, here we should
+          // pass the original condition which includes both equal and 
non-equal conditions.
+          Some(Seq(joins.CartesianProductExec(planLater(left), 
planLater(right), j.condition)))
+        } else {
+          None
+        }
+      }
+
+      def createJoinWithoutHint() = {
+        createBroadcastHashJoin(false)
+          .orElse {
+            if (!conf.preferSortMergeJoin) {
+              createShuffleHashJoin(false)
+            } else {
+              None
+            }
+          }
+          .orElse(createSortMergeJoin())
+          .orElse(createCartesianProduct())
+          .getOrElse {
+            // This join could be very slow or OOM
+            val buildSide = getSmallerSide(left, right)
+            Seq(joins.BroadcastNestedLoopJoinExec(
+              planLater(left), planLater(right), buildSide, joinType, 
nonEquiCond))
+          }
+      }
+
+      createBroadcastHashJoin(true)
+        .orElse {
+          if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None
+        }
+        .orElse(createShuffleHashJoin(true))
+        .orElse {
+          if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else 
None
+        }
+        .getOrElse(createJoinWithoutHint())
+
+    case j@ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) =>
+      Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, 
BuildRight,
+        None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = 
true))
+
+    // If it is not an equi-join, we first look at the join hints w.r.t. the 
following order:
+    //   1. broadcast hint: pick broadcast nested loop join. If both sides 
have the broadcast
+    //      hints, choose the smaller side (based on stats) to broadcast for 
inner and full joins,
+    //      choose the left side for right join, and choose right side for 
left join.
+    //   2. shuffle replicate NL hint: pick cartesian product if join type is 
inner like.
+    //
+    // If there is no hint or the hints are not applicable, we follow these 
rules one by one:
+    //   1. Pick broadcast nested loop join if one side is small enough to 
broadcast. If only left
+    //      side is broadcast-able and it's left join, or only right side is 
broadcast-able and
+    //      it's right join, we skip this rule. If both sides are small, 
broadcasts the smaller
+    //      side for inner and full joins, broadcasts the left side for right 
join, and broadcasts
+    //      right side for left join.
+    //   2. Pick cartesian product if join type is inner like.
+    //   3. Pick broadcast nested loop join as the final solution. It may OOM 
but we don't have
+    //      other choice. It broadcasts the smaller side for inner and full 
joins, broadcasts the
+    //      left side for right join, and broadcasts right side for left join.
+    case logical.Join(left, right, joinType, condition, hint) =>
+      val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType 
== FullOuter) {
+        getSmallerSide(left, right)
+      } else {
+        // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to 
broadcast left side if
+        // it's a right join, and broadcast right side if it's a left join.
+        // TODO: revisit it. If left side is much smaller than the right side, 
it may be better
+        // to broadcast the left side even if it's a left join.
+        if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight
+      }
+
+      def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
+        val maybeBuildSide = if (buildLeft && buildRight) {
+          Some(desiredBuildSide)
+        } else if (buildLeft) {
+          Some(BuildLeft)
+        } else if (buildRight) {
+          Some(BuildRight)
+        } else {
+          None
+        }
+
+        maybeBuildSide.map { buildSide =>
+          Seq(joins.BroadcastNestedLoopJoinExec(
+            planLater(left), planLater(right), buildSide, joinType, condition))
+        }
+      }
+
+      def createCartesianProduct() = {
+        if (joinType.isInstanceOf[InnerLike]) {
+          Some(Seq(joins.CartesianProductExec(planLater(left), 
planLater(right), condition)))
+        } else {
+          None
+        }
+      }
+
+      def createJoinWithoutHint() = {
+        createBroadcastNLJoin(canBroadcastBySize(left, conf), 
canBroadcastBySize(right, conf))
+          .orElse(createCartesianProduct())
+          .getOrElse {
+            // This join could be very slow or OOM
+            Seq(joins.BroadcastNestedLoopJoinExec(
+              planLater(left), planLater(right), desiredBuildSide, joinType, 
condition))
+          }
+      }
+
+      createBroadcastNLJoin(hintToBroadcastLeft(hint), 
hintToBroadcastRight(hint))
+        .orElse {
+          if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else 
None
+        }
+        .getOrElse(createJoinWithoutHint())
+
+    // --- Cases where this strategy does not apply 
---------------------------------------------
+    case _ => Nil
+  }
+
+  override def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = 
{
+    val size = plan.stats.sizeInBytes
+    size >= 0 && size <= conf.autoBroadcastJoinThreshold && 
JoinMemoryManager.acquireMemory(size.toLong)
+  }
+}
+
+object JoinMemoryManager extends Logging {
+
+  @GuardedBy("this")
+  private[this] var memoryUsed: Long = 0
+
+  def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized {
+    assert(numBytesToAcquire >= 0)
+    val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed)
+    if (enoughMemory) {
+      memoryUsed += numBytesToAcquire
+      logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used 
$memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.")
+    } else {
+      logInfo("Driver memory is not enough for BHJ.")
+    }
+    enoughMemory
+  }
+
+  private def maxMemoryJoinCanUse: Long = {
+    val joinMemoryFraction = 
KylinConfig.getInstanceFromEnv.getJoinMemoryFraction
+    (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong
+  }
+
+  def releaseAllMemory(): Unit = synchronized {
+    memoryUsed = 0
+  }
+
+}
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
similarity index 61%
rename from 
kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
rename to 
kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
index c928e84..0fc917f 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala
@@ -19,29 +19,22 @@
 package org.apache.spark.sql.hive.utils
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
ShuffleQueryStageExec}
 import org.apache.spark.sql.execution.{FileSourceScanExec, 
KylinFileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.seqAsJavaListConverter
 
 object QueryMetricUtils extends Logging {
+
   def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long], 
java.util.List[java.lang.Long],
-          java.util.List[java.lang.Long], java.util.List[java.lang.Long], 
java.util.List[java.lang.Long]) = {
+    java.util.List[java.lang.Long], java.util.List[java.lang.Long], 
java.util.List[java.lang.Long]) = {
     try {
       val metrics = plan.collect {
-        case exec: KylinFileSourceScanExec =>
-          //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("numFiles").value,
-                  exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
-        case exec: FileSourceScanExec =>
-          //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          (exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("numFiles").value,
-                  exec.metrics.apply("metadataTime").value, 
exec.metrics.apply("scanTime").value, -1l)
-        case exec: HiveTableScanExec =>
-          //(exec.metrics.apply("numOutputRows").value, 
exec.metrics.apply("readBytes").value)
-          // There is only 'numOutputRows' metric in HiveTableScanExec
-          (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l)
+        case exec: AdaptiveSparkPlanExec => 
metricLine(recursiveGetSparkPlan(exec.executedPlan))
+        case exec: SparkPlan => metricLine(exec)
       }
+
       val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1))
         .filter(_ >= 0L).toList.asJava
       val scanFiles = metrics.map(metrics => 
java.lang.Long.valueOf(metrics._2))
@@ -58,7 +51,36 @@ object QueryMetricUtils extends Logging {
       case throwable: Throwable =>
         logWarning("Error occurred when collect query scan metrics.", 
throwable)
         (List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava, 
List.empty[java.lang.Long].asJava,
-                List.empty[java.lang.Long].asJava, 
List.empty[java.lang.Long].asJava)
+          List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava)
+    }
+  }
+
+  private def metricLine(exec: SparkPlan) = {
+    (
+      exec.metrics.get("numOutputRows").map(_.value).getOrElse(-1L),
+      exec.metrics.get("numFiles").map(_.value).getOrElse(-1L),
+      exec.metrics.get("metadataTime").map(_.value).getOrElse(-1L),
+      exec.metrics.get("scanTime").map(_.value).getOrElse(-1L),
+      exec.metrics.get("filesSize").map(_.value).getOrElse(-1L)
+    )
+  }
+
+  private def recursiveGetSparkPlan(sparkPlan: SparkPlan): SparkPlan = {
+    sparkPlan match {
+      case exec: ShuffleQueryStageExec =>
+        recursiveGetSparkPlan(exec.plan)
+      case exec: KylinFileSourceScanExec =>
+        exec
+      case exec: FileSourceScanExec =>
+        exec
+      case exec: HiveTableScanExec =>
+        exec
+      case _ => {
+        if (sparkPlan.children.isEmpty) {
+          return null
+        }
+        recursiveGetSparkPlan(sparkPlan.children.head)
+      }
     }
   }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/pom.xml 
b/kylin-spark-project/kylin-spark-engine/pom.xml
index 9954afe..de582b8 100644
--- a/kylin-spark-project/kylin-spark-engine/pom.xml
+++ b/kylin-spark-project/kylin-spark-engine/pom.xml
@@ -63,6 +63,21 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-app</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <scope>provided</scope>
         </dependency>
@@ -79,16 +94,29 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <type>test-jar</type>
             <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
             <groupId>org.scalamock</groupId>
-            <artifactId>scalamock_2.11</artifactId>
+            <artifactId>scalamock_${scala.binary.version}</artifactId>
             <version>4.1.0</version>
             <scope>test</scope>
         </dependency>
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
index b6f4bb8..3b353ed 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java
@@ -26,7 +26,7 @@ import org.apache.kylin.engine.IBatchCubingEngine;
 import org.apache.kylin.engine.spark.job.NSparkMergingJob;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.spark_project.guava.collect.Sets;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 
 public class SparkBatchCubingEngineParquet implements IBatchCubingEngine {
     @Override
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index da00f6d..886e476 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -37,9 +37,9 @@ import 
org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.spark_project.guava.base.Preconditions;
 
 /**
  *
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
index 146a1f2..2b10518 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java
@@ -21,8 +21,8 @@ package org.apache.kylin.engine.spark.job;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.spark.sql.Column;
-import org.spark_project.guava.collect.Sets;
 
 import java.util.Collection;
 import java.util.LinkedHashSet;
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
index 9f709f4..092eeef 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala
@@ -19,7 +19,7 @@
 package org.apache.kylin.cluster
 
 import org.apache.kylin.common.KylinConfig
-import org.apache.spark.util.KylinReflectUtils
+import org.apache.spark.utils.KylinReflectUtils
 
 
 object ClusterInfoFetcherFactory {
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
index dea6ede..f7ec55d 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala
@@ -83,7 +83,7 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row],
       .filter(dictCol.isNotNull)
       .repartition(bucketPartitionSize, dictCol)
       .foreachPartition {
-        iter =>
+        iter: Iterator[Row] =>
           DictHelper.genDict(columnName, broadcastDict, iter)
       }
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
index 84beaf2..35ac1af 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala
@@ -28,10 +28,11 @@ import org.apache.kylin.measure.bitmap.BitmapMeasureType
 import org.apache.kylin.measure.hllc.HLLCMeasureType
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.functions.{col, _}
-import org.apache.spark.sql.types.{StringType, _}
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DoubleType, FloatType, ShortType, StringType, _}
 import org.apache.spark.sql.udaf._
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -72,8 +73,19 @@ object CuboidAggregator {
           val colIndex = dataSet.schema.fieldNames.zipWithIndex.map(tp => 
(tp._2, tp._1)).toMap
           columns.appendAll(measure.pra.map(p =>col(p.id.toString)))
         } else {
-          val value = measure.pra.head.asInstanceOf[LiteralColumnDesc].value
-            columns.append(new Column(Literal.create(value, 
measure.pra.head.dataType)))
+          var value = measure.pra.head.asInstanceOf[LiteralColumnDesc].value
+          value = measure.pra.head.dataType match {
+            case BooleanType => value.asInstanceOf[String].toBoolean
+            case ByteType => value.asInstanceOf[String].toByte
+            case ShortType => value.asInstanceOf[String].toShort
+            case IntegerType | DateType => value.asInstanceOf[String].toInt
+            case LongType | TimestampType => value.asInstanceOf[String].toLong
+            case FloatType => value.asInstanceOf[String].toFloat
+            case DoubleType => value.asInstanceOf[String].toDouble
+            case BinaryType => value.asInstanceOf[String].toArray
+            case StringType => value.asInstanceOf[UTF8String]
+          }
+          columns.append(new Column(Literal.create(value, 
measure.pra.head.dataType)))
         }
       }
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
index 49aa30f..a08affe 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
@@ -24,11 +24,11 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.SqlKind._
 import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 import org.apache.kylin.query.util.UnsupportedSparkFunctionException
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.KylinFunctions._
 import org.apache.spark.sql.catalyst.expressions.{If, IfNull, StringLocate}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.utils.SparkTypeUtil
 
@@ -213,8 +213,7 @@ object ExpressionConverter {
           // time_funcs
           case "current_date" =>
             k_lit(
-              DateTimeUtils.dateToString(
-                DateTimeUtils.millisToDays(System.currentTimeMillis())))
+              CrossDateTimeUtils.dateToString())
           case "current_timestamp" =>
             current_timestamp()
           case "to_timestamp" =>
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
index 7413e4b..3beddcf 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala
@@ -19,9 +19,8 @@
 package org.apache.spark.sql.udf
 
 import java.util.{Calendar, Locale, TimeZone}
-
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import 
org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils.{MICROS_PER_MILLIS,
 MONTHS_PER_QUARTER}
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 
 object TimestampAddImpl {
   private val localCalendar = new ThreadLocal[Calendar] {
@@ -35,7 +34,7 @@ object TimestampAddImpl {
     calendar.clear()
     addTime("DAY", time, calendar)
     addTime(unit, increment, calendar)
-    DateTimeUtils.millisToDays(calendar.getTimeInMillis)
+    CrossDateTimeUtils.millisToDays(calendar.getTimeInMillis)
   }
 
   // add long on DateType
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
index f0d4d9e..f8453a2 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala
@@ -19,17 +19,16 @@
 package org.apache.spark.sql.udf
 
 import java.util.Locale
-
 import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils
 import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 
 object TimestampDiffImpl {
 
   // TimestampType -> DateType
   def evaluate(unit: String, timestamp: Long, date: Int): Long = {
     val before = timestamp / MICROS_PER_MILLIS
-    val after = DateTimeUtils.daysToMillis(date)
+    val after = CrossDateTimeUtils.daysToMillis(date)
     convertDuration(unit, before, after)
   }
 
@@ -42,14 +41,14 @@ object TimestampDiffImpl {
 
   // DateType -> DateType
   def evaluate(unit: String, date1: Int, date2: Int): Long = {
-    val before = DateTimeUtils.daysToMillis(date1)
-    val after = DateTimeUtils.daysToMillis(date2)
+    val before = CrossDateTimeUtils.daysToMillis(date1)
+    val after = CrossDateTimeUtils.daysToMillis(date2)
     convertDuration(unit, before, after)
   }
 
   // DateType -> TimestampType
   def evaluate(unit: String, date: Int, timestamp: Long): Long = {
-    val before = DateTimeUtils.daysToMillis(date)
+    val before = CrossDateTimeUtils.daysToMillis(date)
     val after = timestamp / MICROS_PER_MILLIS
     convertDuration(unit, before, after)
   }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala
deleted file mode 100644
index eef75cb..0000000
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.spark.util
-
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.{SPARK_VERSION, SparkContext}
-
-import scala.reflect.runtime.universe
-
-object KylinReflectUtils {
-  private val rm = universe.runtimeMirror(getClass.getClassLoader)
-
-  def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any = 
{
-    if (SPARK_VERSION.startsWith("2.4")) {
-      var className: String =
-        "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
-      if (!"hive".equals(sparkContext.getConf
-            .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
-        className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
-      }
-      val tuple = createObject(className, kylinSession, None)
-      val method = tuple._2.getMethod("build")
-      method.invoke(tuple._1)
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
-  }
-
-  def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
-    val clazz = Utils.classForName(className)
-    val ctor = clazz.getConstructors.head
-    ctor.setAccessible(true)
-    (ctor.newInstance(conArgs: _*), clazz)
-  }
-
-  def createObject(className: String): (Any, Class[_]) = {
-    val clazz = Utils.classForName(className)
-    val ctor = clazz.getConstructors.head
-    ctor.setAccessible(true)
-    (ctor.newInstance(), clazz)
-  }
-}
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index a0c49e8..cfd84aa 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -55,6 +55,7 @@ import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.KylinSparkEnv;
@@ -70,7 +71,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Sets;
 
 import java.io.File;
 import java.io.IOException;
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
index f788d15..8bf5fb8 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java
@@ -30,9 +30,9 @@ import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.SegmentRange;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
-import org.spark_project.guava.collect.Sets;
 
 import java.io.IOException;
 import java.util.Set;
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
index 0cbe4c0..91768ac 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java
@@ -33,6 +33,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
@@ -49,7 +50,6 @@ import org.junit.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Sets;
 
 import java.util.HashSet;
 import java.util.Map;
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
 
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index 3ecdfb5..f8dd610 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -20,9 +20,9 @@ package org.apache.kylin.engine.spark.builder
 
 import java.text.SimpleDateFormat
 import java.util.{Locale, TimeZone, UUID}
-
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.cube.{CubeInstance, CubeManager, CubeSegment}
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 import org.apache.kylin.engine.spark.job.KylinBuildEnv
 import org.apache.kylin.engine.spark.metadata.MetadataConverter
 import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree
@@ -30,7 +30,7 @@ import org.apache.kylin.job.engine.JobEngineConfig
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler
 import org.apache.kylin.job.lock.MockJobLock
 import org.apache.kylin.metadata.model.SegmentRange
-import org.apache.spark.InfoHelper
+import org.apache.spark.{InfoHelper, SPARK_VERSION}
 import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, 
SparderBaseFunSuite}
 import org.apache.spark.sql.{Dataset, Row}
 import org.junit.Assert
@@ -96,17 +96,32 @@ class TestCreateFlatTable extends SparderBaseFunSuite with 
SharedSparkSession wi
     val afterJoin1 = generateFlatTable(seg1, cube, true)
     afterJoin1.collect()
 
-    val jobs = helper.getJobsByGroupId(groupId)
-    Assert.assertEquals(jobs.length, 15)
+    if (SPARK_VERSION.startsWith("2.4")) {
+      val jobs = helper.getJobsByGroupId(groupId)
+      Assert.assertEquals(jobs.length, 15)
+    } else if (SPARK_VERSION.startsWith("3.1")) {
+      // in Spark 3.x, BroadcastExchangeExec overwrites job group ID
+      val jobs = helper.getJobsByGroupId(null)
+      Assert.assertEquals(6, 
jobs.count(_.jobGroup.exists(_.endsWith(groupId))))
+      Assert.assertEquals(9, 
jobs.count(_.description.exists(_.contains("broadcast exchange"))))
+    }
     DefaultScheduler.destroyInstance()
   }
 
   private def checkFilterCondition(ds: Dataset[Row], seg: CubeSegment) = {
     val queryExecution = ds.queryExecution.simpleString
-    val startTime = dateFormat.format(seg.getTSRange.start.v)
-    val endTime = dateFormat.format(seg.getTSRange.end.v)
+    var startTime = dateFormat.format(seg.getTSRange.start.v)
+    var endTime = dateFormat.format(seg.getTSRange.end.v)
 
     //Test Filter Condition
+
+    // dates will not be converted to string by default since spark 3.0.0.
+    // see https://issues.apache.org/jira/browse/SPARK-27638 for details.
+    if (SPARK_VERSION.startsWith("3.") && 
conf.get("spark.sql.legacy.typeCoercion.datetimeToString.enabled", "false") == 
"false") {
+      startTime = CrossDateTimeUtils.stringToDate(startTime).get.toString
+      endTime = CrossDateTimeUtils.stringToDate(endTime).get.toString
+    }
+
     Assert.assertTrue(queryExecution.contains(startTime))
     Assert.assertTrue(queryExecution.contains(endTime))
   }
diff --git a/kylin-spark-project/kylin-spark-metadata/pom.xml 
b/kylin-spark-project/kylin-spark-metadata/pom.xml
index 9cdd824..71b8b4e 100644
--- a/kylin-spark-project/kylin-spark-metadata/pom.xml
+++ b/kylin-spark-project/kylin-spark-metadata/pom.xml
@@ -39,21 +39,55 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-core-cube</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/${spark.version.dir}</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
                 <groupId>net.alchim31.maven</groupId>
                 <artifactId>scala-maven-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>scala-compile-first</id>
+                        <id>compile-version-dependent-source</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDir>${spark.version.dir}</sourceDir>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>compile-common-scala-source</id>
                         <phase>process-resources</phase>
                         <goals>
-                            <goal>add-source</goal>
                             <goal>compile</goal>
                         </goals>
+                        <configuration>
+                            <sourceDir>scala</sourceDir>
+                        </configuration>
                     </execution>
                     <execution>
                         <id>scala-test-compile</id>
diff --git 
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
 
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
index 2e189ec..486e358 100644
--- 
a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
+++ 
b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.utils
 
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.spark.unsafe.types.UTF8String
 import org.apache.kylin.common.util.DateFormat
 import org.apache.spark.sql.Column
 import org.apache.spark.internal.Logging
@@ -28,19 +27,23 @@ import java.math.BigDecimal
 import org.apache.calcite.util.NlsString
 import org.apache.calcite.rel.`type`.RelDataType
 import java.sql.{Date, Timestamp, Types}
+import java.time.ZoneId
 import java.util.regex.Pattern
 
 import org.apache.spark.sql.functions.col
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.rex.RexLiteral
-import java.util.{GregorianCalendar, Locale, TimeZone}
+import java.util.{GregorianCalendar, Locale}
 
 import org.apache.kylin.engine.spark.metadata.FunctionDesc
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.kylin.metadata.datatype.DataType
 import org.apache.spark.sql.types._
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 
 object SparkTypeUtil extends Logging {
+  private def defaultZoneId = ZoneId.systemDefault()
+  private def UTC = ZoneId.of("UTC")
+
   val DATETIME_FAMILY = List("time", "date", "timestamp", "datetime")
 
   def isDateTimeFamilyType(dataType: String): Boolean = {
@@ -167,9 +170,9 @@ object SparkTypeUtil extends Logging {
         s.getValue
       case g: GregorianCalendar =>
         if (literal.getTypeName.getName.equals("DATE")) {
-          new 
Date(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).get
 / 1000)
+          new Date(CrossDateTimeUtils.stringToTimestamp(literal).get / 1000)
         } else {
-          new 
Timestamp(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).get
 / 1000)
+          new Timestamp(CrossDateTimeUtils.stringToTimestamp(literal).get / 
1000)
         }
       case range: TimeUnitRange =>
         // Extract(x from y) in where clause
@@ -259,7 +262,7 @@ object SparkTypeUtil extends Logging {
               val time = DateFormat.stringToDate(string).getTime
               if (toCalcite) {
                 //current date is local timezone, 
org.apache.calcite.avatica.util.AbstractCursor.DateFromNumberAccessor need to 
utc
-                DateTimeUtils.stringToDate(UTF8String.fromString(string)).get
+                CrossDateTimeUtils.stringToDate(string).get
               } else {
                 // ms to s
                 time / 1000
@@ -277,7 +280,7 @@ object SparkTypeUtil extends Logging {
             var ts = s.asInstanceOf[Timestamp].toString
             if (toCalcite) {
               // current ts is local timezone 
,org.apache.calcite.avatica.util.AbstractCursor.TimeFromNumberAccessor need to 
utc
-              DateTimeUtils.stringToTimestamp(UTF8String.fromString(ts), 
TimeZone.getTimeZone("UTC")).get / 1000
+              CrossDateTimeUtils.stringToTimestamp(ts, UTC).get / 1000
             } else {
               // ms to s
               s.asInstanceOf[Timestamp].getTime / 1000
diff --git 
a/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
 
b/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
new file mode 100644
index 0000000..54dd480
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark.cross
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp}
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.time.ZoneId
+import java.util.TimeZone
+
+object CrossDateTimeUtils {
+  def stringToTimestamp(value: Any): Option[SQLTimestamp] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), 
TimeZone.getDefault)
+  }
+
+  def stringToTimestamp(value: Any, zoneId: ZoneId): Option[SQLTimestamp] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), 
TimeZone.getTimeZone(zoneId))
+  }
+
+  def stringToDate(value: Any): Option[SQLDate] = {
+    DateTimeUtils.stringToDate(UTF8String.fromString(value.toString))
+  }
+
+  def millisToDays(millis: Long): Int = {
+    DateTimeUtils.millisToDays(millis)
+  }
+
+  def daysToMillis(days: Int): Long = {
+    DateTimeUtils.daysToMillis(days)
+  }
+
+  def dateToString(): String = {
+    
DateTimeUtils.dateToString(DateTimeUtils.millisToDays(System.currentTimeMillis()))
+  }
+}
diff --git 
a/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
 
b/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
new file mode 100644
index 0000000..2ef6b18
--- /dev/null
+++ 
b/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark.cross
+
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter}
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.time.ZoneId
+import java.util.TimeZone
+
+object CrossDateTimeUtils {
+  val DEFAULT_TZ_ID = TimeZone.getDefault.toZoneId
+
+  def stringToTimestamp(value: Any): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), 
DEFAULT_TZ_ID)
+  }
+
+  def stringToTimestamp(value: Any, zoneId: ZoneId): Option[Long] = {
+    DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), 
zoneId)
+  }
+
+  def stringToDate(value: Any): Option[Int] = {
+    DateTimeUtils.stringToDate(UTF8String.fromString(value.toString), 
DEFAULT_TZ_ID)
+  }
+
+  def millisToDays(millis: Long): Int = {
+    DateTimeUtils.microsToDays(millis * 1000, DEFAULT_TZ_ID)
+  }
+
+  def daysToMillis(days: Int): Long = {
+    DateTimeUtils.daysToMicros(days, DEFAULT_TZ_ID)
+  }
+
+  def dateToString(): String = {
+    TimestampFormatter
+      .apply("yyyy-MM-dd", DEFAULT_TZ_ID, isParsing = false)
+      .format(DateTimeUtils.currentTimestamp())
+  }
+}
diff --git a/kylin-spark-project/kylin-spark-query/pom.xml 
b/kylin-spark-project/kylin-spark-query/pom.xml
index 2e6f7e4..5a6cff0 100644
--- a/kylin-spark-project/kylin-spark-query/pom.xml
+++ b/kylin-spark-project/kylin-spark-query/pom.xml
@@ -71,7 +71,12 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-app</artifactId>
             <scope>provided</scope>
         </dependency>
 
@@ -87,16 +92,28 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+            </exclusions>
             <type>test-jar</type>
-            <scope>provided</scope>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
             <groupId>org.scalamock</groupId>
-            <artifactId>scalamock_2.11</artifactId>
+            <artifactId>scalamock_${scala.binary.version}</artifactId>
             <version>4.1.0</version>
             <scope>test</scope>
         </dependency>
@@ -125,36 +142,20 @@
     <build>
         <plugins>
             <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>8</source>
-                    <target>8</target>
-                </configuration>
-            </plugin>
-            <!--<plugin>
-                <groupId>net.alchim31.maven</groupId>
-                <artifactId>scala-maven-plugin</artifactId>
-                <version>3.2.2</version>
-            </plugin>-->
-            <plugin>
-                <groupId>org.scalatest</groupId>
-                <artifactId>scalatest-maven-plugin</artifactId>
-                <version>1.0</version>
-                <configuration>
-                    <reportsDirectory>
-                        ${project.build.directory}/surefire-reports
-                    </reportsDirectory>
-                    <junitxml>.</junitxml>
-                    <filereports>SparkTestSuite.txt</filereports>
-                    <stdout>I</stdout>
-                </configuration>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>test</id>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
                         <goals>
-                            <goal>test</goal>
+                            <goal>add-source</goal>
                         </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/${spark.version.dir}</source>
+                            </sources>
+                        </configuration>
                     </execution>
                 </executions>
             </plugin>
@@ -163,12 +164,24 @@
                 <artifactId>scala-maven-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>scala-compile-first</id>
+                        <id>compile-version-dependent-source</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <configuration>
+                            <sourceDir>${spark.version.dir}</sourceDir>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>compile-common-scala-source</id>
                         <phase>process-resources</phase>
                         <goals>
-                            <goal>add-source</goal>
                             <goal>compile</goal>
                         </goals>
+                        <configuration>
+                            <sourceDir>scala</sourceDir>
+                        </configuration>
                     </execution>
                     <execution>
                         <id>scala-test-compile</id>
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
index bad566f..764807d 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala
@@ -27,7 +27,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.kylin.query.exception.UnsupportedSparkFunctionException
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.expressions.{If, IfNull, StringLocate}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.KylinFunctions._
 import org.apache.spark.sql.utils.SparkTypeUtil
@@ -216,9 +216,7 @@ object ExpressionConverter {
             callUDF("split_part", args: _*)
           // time_funcs
           case "current_date" =>
-            k_lit(
-              DateTimeUtils.dateToString(
-                DateTimeUtils.millisToDays(System.currentTimeMillis())))
+            k_lit(CrossDateTimeUtils.dateToString())
           case "current_timestamp" =>
             current_timestamp()
           case "to_timestamp" =>
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
index dcb4f14..a690f45 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala
@@ -28,13 +28,13 @@ import org.apache.calcite.sql.SqlKind._
 import org.apache.calcite.sql.`type`.{BasicSqlType, IntervalSqlType, 
SqlTypeFamily, SqlTypeName}
 import org.apache.calcite.sql.fun.SqlDatetimeSubtractionOperator
 import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils
 import org.apache.spark.sql.KylinFunctions._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DateType, LongType, TimestampType}
 import org.apache.spark.sql.{Column, DataFrame}
 import org.apache.spark.sql.utils.SparkTypeUtil
-import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
@@ -312,9 +312,9 @@ class SparderRexVisitor(
       case literalSql: BasicSqlType => {
         literalSql.getSqlTypeName match {
           case SqlTypeName.DATE =>
-            return Some(DateTimeUtils.stringToTime(literal.toString))
+            return 
Some(DateTimeUtils.toJavaDate(CrossDateTimeUtils.stringToDate(literal).get))
           case SqlTypeName.TIMESTAMP =>
-            return 
Some(DateTimeUtils.toJavaTimestamp(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).head))
+            return 
Some(DateTimeUtils.toJavaTimestamp(CrossDateTimeUtils.stringToTimestamp(literal).head))
           case _ =>
         }
       }
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/spark24/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
similarity index 100%
copy from 
kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
copy to 
kylin-spark-project/kylin-spark-query/src/main/spark24/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
diff --git 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
 
b/kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
similarity index 79%
rename from 
kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
rename to 
kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
index 21c6372..5641723 100644
--- 
a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
+++ 
b/kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
@@ -28,14 +28,15 @@ import 
org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionState}
   * @param parentState
   */
 class KylinHiveSessionStateBuilder(sparkSession: SparkSession,
-                                   parentState: Option[SessionState] = None)
-    extends HiveSessionStateBuilder(sparkSession, parentState) {
+                                   parentState: Option[SessionState] = None,
+                                   options: Map[String, String] = Map.empty)
+    extends HiveSessionStateBuilder(sparkSession, parentState, options) {
 
   private def externalCatalog: HiveExternalCatalog =
     session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
 
   override protected def newBuilder: NewBuilder =
-    new KylinHiveSessionStateBuilder(_, _)
+    new KylinHiveSessionStateBuilder(_, _, options)
 
 }
 
@@ -46,10 +47,11 @@ class KylinHiveSessionStateBuilder(sparkSession: 
SparkSession,
   * @param parentState
   */
 class KylinSessionStateBuilder(sparkSession: SparkSession,
-                               parentState: Option[SessionState] = None)
-    extends BaseSessionStateBuilder(sparkSession, parentState) {
+                               parentState: Option[SessionState] = None,
+                               options: Map[String, String] = Map.empty)
+    extends BaseSessionStateBuilder(sparkSession, parentState, options) {
 
   override protected def newBuilder: NewBuilder =
-    new KylinSessionStateBuilder(_, _)
+    new KylinSessionStateBuilder(_, _, options)
 
 }
diff --git a/kylin-spark-project/kylin-spark-test/pom.xml 
b/kylin-spark-project/kylin-spark-test/pom.xml
index 4221983..6f2f101 100644
--- a/kylin-spark-project/kylin-spark-test/pom.xml
+++ b/kylin-spark-project/kylin-spark-test/pom.xml
@@ -73,7 +73,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.calcite</groupId>
@@ -84,7 +84,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.11</artifactId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.calcite</groupId>
@@ -114,6 +114,12 @@
         <!-- test -->
         <dependency>
             <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-spark-engine</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
diff --git 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index b91a680..cb1b049 100644
--- 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++ 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark2;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.Quadruple;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -37,7 +38,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Lists;
 
 import java.util.Map;
 import java.io.File;
diff --git 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
index a02672b..1bc6d11 100644
--- 
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
+++ 
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java
@@ -30,6 +30,7 @@ import 
org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Dataset;
@@ -47,7 +48,6 @@ import org.junit.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.spark_project.guava.collect.Sets;
 
 import java.util.List;
 import java.util.Map;
diff --git a/kylin-spark-project/pom.xml b/kylin-spark-project/pom.xml
index 3b52556..3d4911f 100644
--- a/kylin-spark-project/pom.xml
+++ b/kylin-spark-project/pom.xml
@@ -63,7 +63,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <exclusions>
                 <exclusion>
                     <artifactId>jetty-plus</artifactId>
@@ -107,25 +107,25 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.11</artifactId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-unsafe_2.11</artifactId>
+            <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-yarn_2.11</artifactId>
+            <artifactId>spark-yarn_${scala.binary.version}</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -140,7 +140,7 @@
         <!--Env & Test-->
         <dependency>
             <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_2.11</artifactId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
             <version>3.0.0</version>
             <scope>test</scope>
         </dependency>
diff --git a/metastore-hbase/pom.xml b/metastore-hbase/pom.xml
index 0945cca..209e99e 100644
--- a/metastore-hbase/pom.xml
+++ b/metastore-hbase/pom.xml
@@ -108,7 +108,7 @@
 <!--        &lt;!&ndash; Spark dependency &ndash;&gt;-->
 <!--        <dependency>-->
 <!--            <groupId>org.apache.spark</groupId>-->
-<!--            <artifactId>spark-core_2.11</artifactId>-->
+<!--            <artifactId>spark-core_${scala.binary.version}</artifactId>-->
 <!--            <scope>provided</scope>-->
 <!--        </dependency>-->
     </dependencies>
diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml
index 1c39f08..5c549e3 100644
--- a/metrics-reporter-kafka/pom.xml
+++ b/metrics-reporter-kafka/pom.xml
@@ -39,7 +39,7 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
     </dependencies>
diff --git a/parquet-assembly/pom.xml b/parquet-assembly/pom.xml
index 6b34823..f2c795c 100644
--- a/parquet-assembly/pom.xml
+++ b/parquet-assembly/pom.xml
@@ -105,6 +105,14 @@
                                     <pattern>org.roaringbitmap</pattern>
                                     
<shadedPattern>${shadeBase}.org.roaringbitmap</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>${shadeBase}.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.codahale.metrics</pattern>
+                                    
<shadedPattern>${shadeBase}.com.codahale.metrics</shadedPattern>
+                                </relocation>
                             </relocations>
                             <filters>
                                 <filter>
diff --git a/pom.xml b/pom.xml
index d83b0fc..6885f86 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
 
     <!-- Spark versions -->
     <spark.version>2.4.7</spark.version>
+    <spark.version.dir>spark24</spark.version.dir>
     <janino.version>3.0.16</janino.version>
 
     <kryo.version>4.0.0</kryo.version>
@@ -92,6 +93,7 @@
 
     <!-- Scala versions -->
     <scala.version>2.11.8</scala.version>
+    <scala.binary.version>2.11</scala.binary.version>
 
     <reflections.version>0.9.10</reflections.version>
 
@@ -519,6 +521,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-client</artifactId>
+        <version>${hadoop2.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-yarn-api</artifactId>
         <version>${hadoop2.version}</version>
       </dependency>
@@ -729,7 +736,8 @@
         <artifactId>avatica</artifactId>
         <version>${avatica.version}</version>
       </dependency>
-      <!-- Workaround for hive 0.14 avatica dependency -->
+      <!-- Workaround for hive 0.14 avatica dependency
+           WARNING: fasterxml jackson library may conflict
       <dependency>
         <groupId>org.apache.calcite</groupId>
         <artifactId>calcite-avatica</artifactId>
@@ -741,6 +749,7 @@
           </exclusion>
         </exclusions>
       </dependency>
+      -->
       <dependency>
         <groupId>com.fasterxml.jackson.core</groupId>
         <artifactId>jackson-core</artifactId>
@@ -775,9 +784,13 @@
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-core_2.11</artifactId>
+        <artifactId>spark-core_${scala.binary.version}</artifactId>
         <exclusions>
           <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+          <exclusion>
             <artifactId>jetty-plus</artifactId>
             <groupId>org.eclipse.jetty</groupId>
           </exclusion>
@@ -819,20 +832,42 @@
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-sql_2.11</artifactId>
+        <artifactId>spark-sql_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-hive_2.11</artifactId>
+        <artifactId>spark-hive_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-yarn_2.11</artifactId>
+        <artifactId>spark-yarn_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+          </exclusion>
+        </exclusions>
       </dependency>
       <dependency>
         <groupId>com.esotericsoftware</groupId>
@@ -849,19 +884,19 @@
       </dependency>
       <dependency>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-scala_2.11</artifactId>
+        <artifactId>flink-scala_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
         <scope>provided</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+        
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
       </dependency>
       <!-- Kafka dependency -->
       <dependency>
         <groupId>org.apache.kafka</groupId>
-        <artifactId>kafka_2.11</artifactId>
+        <artifactId>kafka_${scala.binary.version}</artifactId>
         <version>${kafka.version}</version>
       </dependency>
 
@@ -1159,19 +1194,19 @@
       <!-- Spark dependency -->
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-sql_2.11</artifactId>
+        <artifactId>spark-sql_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-hive_2.11</artifactId>
+        <artifactId>spark-hive_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
         <scope>provided</scope>
       </dependency>
       <dependency>
         <groupId>org.apache.spark</groupId>
-        <artifactId>spark-unsafe_2.11</artifactId>
+        <artifactId>spark-unsafe_${scala.binary.version}</artifactId>
         <version>${spark.version}</version>
       </dependency>
       <dependency>
@@ -2085,9 +2120,29 @@
                 </lifecycleMappingMetadata>
               </configuration>
             </plugin>
+            <plugin>
+              <groupId>org.codehaus.mojo</groupId>
+              <artifactId>build-helper-maven-plugin</artifactId>
+              <version>3.2.0</version>
+            </plugin>
           </plugins>
         </pluginManagement>
       </build>
     </profile>
+    <profile>
+      <id>spark3</id>
+      <properties>
+        <scala.version>2.12.10</scala.version>
+        <scala.binary.version>2.12</scala.binary.version>
+        <spark.version>3.1.1</spark.version>
+        <spark.version.dir>spark31</spark.version.dir>
+        <jackson.version>2.10.0</jackson.version>
+      </properties>
+      <!--
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      -->
+    </profile>
   </profiles>
 </project>
diff --git a/server-base/pom.xml b/server-base/pom.xml
index a5411c8..e9fc1b9 100644
--- a/server-base/pom.xml
+++ b/server-base/pom.xml
@@ -210,12 +210,12 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 8a039c0..6af1d53 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -483,7 +483,7 @@ public class QueryService extends BasicService {
                 logger.warn("Write metric error.", th);
             }
             if (sqlResponse.getIsException())
-                throw new 
InternalErrorException(sqlResponse.getExceptionMessage());
+                throw new 
InternalErrorException(sqlResponse.getExceptionMessage(), 
sqlResponse.getThrowable());
 
             return sqlResponse;
 
diff --git a/server/pom.xml b/server/pom.xml
index 82114fb..fc7a60c 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -277,7 +277,7 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
@@ -325,19 +325,19 @@
         <!-- spark -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-hive_2.11</artifactId>
+            <artifactId>spark-hive_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-yarn_2.11</artifactId>
+            <artifactId>spark-yarn_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
     </dependencies>
diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml
index f54e5d7..2b61666 100644
--- a/source-kafka/pom.xml
+++ b/source-kafka/pom.xml
@@ -44,7 +44,7 @@
         <!-- Provided -->
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
         </dependency>
 
         <!-- Env & Test -->
diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml
index 02d08d5..21c39c4 100644
--- a/storage-hbase/pom.xml
+++ b/storage-hbase/pom.xml
@@ -113,7 +113,7 @@
         <!-- Spark dependency -->
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
             <scope>provided</scope>
         </dependency>
     </dependencies>
diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml
index 0e2c258..48f0e13 100644
--- a/stream-receiver/pom.xml
+++ b/stream-receiver/pom.xml
@@ -53,7 +53,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.kafka</groupId>
-                    <artifactId>kafka_2.11</artifactId>
+                    <artifactId>kafka_${scala.binary.version}</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
diff --git a/stream-source-kafka/pom.xml b/stream-source-kafka/pom.xml
index cdec1d4..6b7f1d2 100644
--- a/stream-source-kafka/pom.xml
+++ b/stream-source-kafka/pom.xml
@@ -59,7 +59,7 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.11</artifactId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
         </dependency>
 
         <dependency>

Reply via email to