This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new ed241cd KYLIN-4925 Use Spark 3.1 for Kylin 4.0 (#1601) ed241cd is described below commit ed241cd753efd40720d205d90e0ea54c46b0f9ee Author: Congling Xia <xiacongl...@xiaomi.com> AuthorDate: Sat Jun 26 23:32:49 2021 +0800 KYLIN-4925 Use Spark 3.1 for Kylin 4.0 (#1601) --- .travis.yml | 3 +- assembly/pom.xml | 2 +- engine-flink/pom.xml | 4 +- engine-spark/pom.xml | 6 +- kylin-it/pom.xml | 8 +- kylin-spark-project/kylin-spark-common/pom.xml | 60 ++++- .../spark/common/util/KylinDateTimeUtils.scala | 8 +- .../spark/dict/NGlobalDictBuilderAssist.scala | 2 +- .../org/apache/spark/sql/KylinFunctions.scala | 2 +- .../catalyst/expressions/KylinExpresssions.scala | 8 +- .../catalyst/expressions/TimestampAddImpl.scala | 6 +- .../catalyst/expressions/TimestampDiffImpl.scala | 11 +- .../sql/execution/datasource/FilePruner.scala | 34 ++- .../execution/datasource/KylinSourceStrategy.scala | 29 ++- .../org/apache/spark/utils/KylinReflectUtils.scala | 24 +- .../spark/monitor/MonitorExecutorExtension.scala | 0 .../sql/catalyst/expressions/ExpressionUtils.scala | 10 +- .../sql/execution/KylinFileSourceScanExec.scala | 9 +- .../spark/sql/execution/KylinJoinSelection.scala | 0 .../sql/execution/datasource/FilterExt.scala} | 22 +- .../spark/sql/hive/utils/QueryMetricUtils.scala | 1 + .../spark/monitor/MonitorExecutorExtension.scala | 8 +- .../sql/catalyst/expressions/ExpressionUtils.scala | 12 +- .../sql/execution/KylinFileSourceScanExec.scala | 55 +++-- .../spark/sql/execution/KylinJoinSelection.scala | 249 +++++++++++++++++++++ .../spark/sql/hive/utils/QueryMetricUtils.scala | 52 +++-- kylin-spark-project/kylin-spark-engine/pom.xml | 32 ++- .../spark/SparkBatchCubingEngineParquet.java | 2 +- .../kylin/engine/spark/job/NSparkCubingJob.java | 2 +- .../kylin/engine/spark/job/NSparkCubingUtil.java | 2 +- .../kylin/cluster/ClusterInfoFetcherFactory.scala | 2 +- .../spark/builder/CubeDictionaryBuilder.scala | 2 +- .../kylin/engine/spark/job/CuboidAggregator.scala | 18 +- .../kylin/query/runtime/ExpressionConverter.scala | 5 +- .../apache/spark/sql/udf/TimestampAddImpl.scala | 5 +- .../apache/spark/sql/udf/TimestampDiffImpl.scala | 11 +- .../org/apache/spark/util/KylinReflectUtils.scala | 58 ----- .../engine/spark/LocalWithSparkSessionTest.java | 2 +- .../kylin/engine/spark/job/JobStepFactoryTest.java | 2 +- .../kylin/engine/spark/job/SparkCubingJobTest.java | 2 +- .../engine/spark/builder/TestCreateFlatTable.scala | 27 ++- kylin-spark-project/kylin-spark-metadata/pom.xml | 38 +++- .../org/apache/spark/sql/utils/SparkTypeUtil.scala | 17 +- .../engine/spark/cross/CrossDateTimeUtils.scala | 52 +++++ .../engine/spark/cross/CrossDateTimeUtils.scala | 55 +++++ kylin-spark-project/kylin-spark-query/pom.xml | 77 ++++--- .../kylin/query/runtime/ExpressionConverter.scala | 6 +- .../kylin/query/runtime/SparderRexVisitor.scala | 6 +- .../sql/hive/KylinHiveSessionStateBuilder.scala | 0 .../sql/hive/KylinHiveSessionStateBuilder.scala | 14 +- kylin-spark-project/kylin-spark-test/pom.xml | 10 +- .../kylin/engine/spark2/NBuildAndQueryTest.java | 2 +- .../spark2/NManualBuildAndQueryCuboidTest.java | 2 +- kylin-spark-project/pom.xml | 12 +- metastore-hbase/pom.xml | 2 +- metrics-reporter-kafka/pom.xml | 2 +- parquet-assembly/pom.xml | 8 + pom.xml | 77 ++++++- server-base/pom.xml | 4 +- .../apache/kylin/rest/service/QueryService.java | 2 +- server/pom.xml | 8 +- source-kafka/pom.xml | 2 +- storage-hbase/pom.xml | 2 +- stream-receiver/pom.xml | 2 +- stream-source-kafka/pom.xml | 2 +- 65 files changed, 904 insertions(+), 295 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3dcb586..46236bc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -46,7 +46,8 @@ before_script: script: # mvn clean org.jacoco:jacoco-maven-plugin:prepare-agent test coveralls:report -e # Skip coveralls temporarily, fix it asap - - mvn clean test + - mvn clean test -q + - mvn clean test -q -Psandbox -Pspark3 - if [[ -n "${TRAVIS_PULL_REQUEST_SLUG}" && "${TRAVIS_PULL_REQUEST_SLUG}" != "${TRAVIS_REPO_SLUG}" ]]; then echo "The pull request from ${TRAVIS_PULL_REQUEST_SLUG} is an EXTERNAL pull request. Skip sonar analysis."; else diff --git a/assembly/pom.xml b/assembly/pom.xml index 041b2c2..2035e84 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -169,7 +169,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> diff --git a/engine-flink/pom.xml b/engine-flink/pom.xml index b10f3e5..2a99fe3 100644 --- a/engine-flink/pom.xml +++ b/engine-flink/pom.xml @@ -60,13 +60,13 @@ <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_2.11</artifactId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-hadoop-compatibility_2.11</artifactId> + <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> </dependency> <!-- Hadoop dependency --> diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml index cc1d9d6..af6486c 100644 --- a/engine-spark/pom.xml +++ b/engine-spark/pom.xml @@ -46,19 +46,19 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml index b7986ab..56a3dfb 100644 --- a/kylin-it/pom.xml +++ b/kylin-it/pom.xml @@ -272,7 +272,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> @@ -292,7 +292,7 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> @@ -305,7 +305,7 @@ <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> @@ -318,7 +318,7 @@ <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> diff --git a/kylin-spark-project/kylin-spark-common/pom.xml b/kylin-spark-project/kylin-spark-common/pom.xml index 44be54b..b027793 100644 --- a/kylin-spark-project/kylin-spark-common/pom.xml +++ b/kylin-spark-project/kylin-spark-common/pom.xml @@ -17,7 +17,8 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <name>Apache Kylin 4.X - Common</name> @@ -41,6 +42,7 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-core-common</artifactId> <type>test-jar</type> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kylin</groupId> @@ -48,6 +50,60 @@ <version>${project.version}</version> </dependency> </dependencies> - + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/${spark.version.dir}</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>compile-version-dependent-source</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <sourceDir>${spark.version.dir}</sourceDir> + </configuration> + </execution> + <execution> + <id>compile-common-scala-source</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <sourceDir>scala</sourceDir> + </configuration> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala index a5a6451..c03c1d0 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/util/KylinDateTimeUtils.scala @@ -19,7 +19,7 @@ package org.apache.kylin.engine.spark.common.util import org.apache.calcite.avatica.util.TimeUnitRange -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils object KylinDateTimeUtils { val MICROS_PER_MILLIS: Long = 1000L @@ -34,7 +34,7 @@ object KylinDateTimeUtils { def addMonths(timestamp: Long, m: Int): Long = { // spark ts unit is microsecond val ms = timestamp / 1000 - val day0 = DateTimeUtils.millisToDays(ms) + val day0 = CrossDateTimeUtils.millisToDays(ms) val millis = ms - day0 * MILLIS_PER_DAY val x = dateAddMonths(day0, m) (x * MILLIS_PER_DAY + millis) * 1000 @@ -63,9 +63,9 @@ object KylinDateTimeUtils { def subtractMonths(t0: Long, t1: Long): Int = { val millis0 = floorMod(t0, MILLIS_PER_DAY) - val d0 = DateTimeUtils.millisToDays(t0) + val d0 = CrossDateTimeUtils.millisToDays(t0) val millis1 = floorMod(t1, MILLIS_PER_DAY) - val d1 = DateTimeUtils.millisToDays(t1) + val d1 = CrossDateTimeUtils.millisToDays(t1) var x = dateSubtractMonths(d0, d1) val d2 = dateAddMonths(d1, x) if (x > 0 && d2 == d0 && millis0 < millis1) x -= 1 diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala index dbac8b8..3ce9bca 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictBuilderAssist.scala @@ -55,7 +55,7 @@ object NGlobalDictBuilderAssist extends Logging { existsDictDs .repartition(bucketPartitionSize, col(existsDictDs.schema.head.name).cast(StringType)) .foreachPartition { - iter => + iter: Iterator[(String, Long)] => val partitionID = TaskContext.get().partitionId() logInfo(s"Rebuild partition dict col: ${ref.identity}, partitionId: $partitionID") val d = broadcastDict.value diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala index 22cfb0d..8e112c1 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala @@ -51,7 +51,7 @@ object KylinFunctions { case _ => Column(Literal(literal)) } - def k_like(left: Column, right: Column): Column = Column(Like(left.expr, right.expr)) + def k_like(left: Column, right: Column): Column = Column(new Like(left.expr, right.expr)) def in(value: Expression, list: Seq[Expression]): Column = Column(In(value, list)) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala index 331309a..a1a45fa 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala @@ -302,11 +302,11 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte val globalDictClass = classOf[NGlobalDictionary].getName val bucketDictClass = classOf[NBucketDictionary].getName - val globalDictTerm = ctx.addMutableState(globalDictClass, s"${mid.simpleString.replace("[", "").replace("]", "")}_globalDict") - val bucketDictTerm = ctx.addMutableState(bucketDictClass, s"${mid.simpleString.replace("[", "").replace("]", "")}_bucketDict") + val globalDictTerm = ctx.addMutableState(globalDictClass, s"${ExpressionUtils.simpleString(mid).replace("[", "").replace("]", "")}_globalDict") + val bucketDictTerm = ctx.addMutableState(bucketDictClass, s"${ExpressionUtils.simpleString(mid).replace("[", "").replace("]", "")}_bucketDict") - val dictParamsTerm = mid.simpleString - val bucketSizeTerm = right.simpleString.toInt + val dictParamsTerm = ExpressionUtils.simpleString(mid) + val bucketSizeTerm = ExpressionUtils.simpleString(right).toInt val initBucketDictFuncName = ctx.addNewFunction(s"init${bucketDictTerm.replace("[", "").replace("]", "")}BucketDict", s""" diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala index 55f683c..e1a4d35 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampAddImpl.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._ -import java.util.{Calendar, Locale, TimeZone} +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import java.util.{Calendar, Locale, TimeZone} object TimestampAddImpl { private val localCalendar = new ThreadLocal[Calendar] { @@ -34,7 +34,7 @@ object TimestampAddImpl { calendar.clear() addTime("DAY", time, calendar) addTime(unit, increment, calendar) - DateTimeUtils.millisToDays(calendar.getTimeInMillis) + CrossDateTimeUtils.millisToDays(calendar.getTimeInMillis) } // add long on DateType diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala index e85abe0..cbf42cf 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimestampDiffImpl.scala @@ -19,17 +19,16 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Locale - import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._ import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils object TimestampDiffImpl { // TimestampType -> DateType def evaluate(unit: String, timestamp: Long, date: Int): Long = { val before = timestamp / MICROS_PER_MILLIS - val after = DateTimeUtils.daysToMillis(date) + val after = CrossDateTimeUtils.daysToMillis(date) convertDuration(unit, before, after) } @@ -42,14 +41,14 @@ object TimestampDiffImpl { // DateType -> DateType def evaluate(unit: String, date1: Int, date2: Int): Long = { - val before = DateTimeUtils.daysToMillis(date1) - val after = DateTimeUtils.daysToMillis(date2) + val before = CrossDateTimeUtils.daysToMillis(date1) + val after = CrossDateTimeUtils.daysToMillis(date2) convertDuration(unit, before, after) } // DateType -> TimestampType def evaluate(unit: String, date: Int, timestamp: Long): Long = { - val before = DateTimeUtils.daysToMillis(date) + val before = CrossDateTimeUtils.daysToMillis(date) val after = timestamp / MICROS_PER_MILLIS convertDuration(unit, before, after) } diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 91faab4..2784170 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasource import java.sql.{Date, Timestamp} - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.kylin.common.util.DateFormat import org.apache.kylin.cube.cuboid.Cuboid @@ -29,7 +28,7 @@ import org.apache.kylin.engine.spark.metadata.MetadataConverter import org.apache.kylin.metadata.model.{PartitionDesc, SegmentStatusEnum} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.Resolver -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, EmptyRow, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, EmptyRow, Expression, ExpressionUtils, Literal} import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ @@ -346,7 +345,7 @@ class FilePruner(cubeInstance: CubeInstance, segDirs } else { val translatedFilter = filters.map(filter => convertCastFilter(filter)) - .flatMap(DataSourceStrategy.translateFilter) + .flatMap(ExpressionUtils.translateFilter) if (translatedFilter.isEmpty) { logInfo("Can not use filters to prune segments.") segDirs @@ -357,8 +356,8 @@ class FilePruner(cubeInstance: CubeInstance, val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange SegFilters(tsRange.startValue, tsRange.endValue, pattern) .foldFilter(reducedFilter) match { - case Trivial(true) => true - case Trivial(false) => false + case AlwaysTrue => true + case AlwaysFalse => false } } } @@ -555,22 +554,20 @@ case class SegFilters(start: Long, end: Long, pattern: String) extends Logging { } case And(left: Filter, right: Filter) => And(foldFilter(left), foldFilter(right)) match { - case And(Trivial(false), _) => Trivial(false) - case And(_, Trivial(false)) => Trivial(false) - case And(Trivial(true), right) => right - case And(left, Trivial(true)) => left + case And(AlwaysFalse, _) => Trivial(false) + case And(_, AlwaysFalse) => Trivial(false) + case And(AlwaysTrue, right) => right + case And(left, AlwaysTrue) => left case other => other } case Or(left: Filter, right: Filter) => Or(foldFilter(left), foldFilter(right)) match { - case Or(Trivial(true), _) => Trivial(true) - case Or(_, Trivial(true)) => Trivial(true) - case Or(Trivial(false), right) => right - case Or(left, Trivial(false)) => left + case Or(AlwaysTrue, _) => Trivial(true) + case Or(_, AlwaysTrue) => Trivial(true) + case Or(AlwaysFalse, right) => right + case Or(left, AlwaysFalse) => left case other => other } - case trivial: Trivial => - trivial case unsupportedFilter => // return 'true' to scan all partitions // currently unsupported filters are: @@ -581,8 +578,7 @@ case class SegFilters(start: Long, end: Long, pattern: String) extends Logging { Trivial(true) } } -} - -case class Trivial(value: Boolean) extends Filter { - override def references: Array[String] = findReferences(value) + def Trivial(value: Boolean): Filter = { + if (value) AlwaysTrue else AlwaysFalse + } } \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala index 9e9a29c..0713b7c 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/KylinSourceStrategy.scala @@ -17,14 +17,16 @@ */ package org.apache.spark.sql.execution.datasource +import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.{execution, Strategy} +import org.apache.spark.sql.{Strategy, execution} import org.apache.spark.sql.execution.{KylinFileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, ExpressionSet, NamedExpression, SubqueryExpression} +import org.apache.spark.utils.KylinReflectUtils /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -119,15 +121,34 @@ object KylinSourceStrategy extends Strategy with Logging { val outputAttributes = readDataColumns ++ partitionColumns // to trigger setShufflePartitions filePruner.listFiles(partitionKeyFilters.iterator.toSeq, dataFilters.iterator.toSeq) - val scan = - new KylinFileSourceScanExec( + val className = "org.apache.spark.sql.execution.KylinFileSourceScanExec" + val (scan: KylinFileSourceScanExec, ignored: Class[_]) = if (SPARK_VERSION.startsWith("2.4")) { + KylinReflectUtils.createObject( + className, fsRelation, outputAttributes, outputSchema, partitionKeyFilters.toSeq, filePruner.getShardSpec, dataFilters, - table.map(_.identifier)) + table.map(_.identifier) + ) + } else if (SPARK_VERSION.startsWith("3.1")) { + KylinReflectUtils.createObject( + className, + fsRelation, + outputAttributes, + outputSchema, + partitionKeyFilters.toSeq, + filePruner.getShardSpec, + None, + dataFilters, + table.map(_.identifier), + java.lang.Boolean.TRUE + ) + } else { + throw new UnsupportedOperationException(s"Spark version ${SPARK_VERSION} is not supported.") + } val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala index 783eeb4..28b3a6e 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/KylinReflectUtils.scala @@ -27,19 +27,21 @@ object KylinReflectUtils { private val rm = universe.runtimeMirror(getClass.getClassLoader) def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any = { - if (SPARK_VERSION.startsWith("2.4")) { - var className: String = - "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder" - if (!"hive".equals(sparkContext.getConf - .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) { - className = "org.apache.spark.sql.hive.KylinSessionStateBuilder" - } - val tuple = createObject(className, kylinSession, None) - val method = tuple._2.getMethod("build") - method.invoke(tuple._1) + var className: String = + "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder" + if (!"hive".equals(sparkContext.getConf + .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) { + className = "org.apache.spark.sql.hive.KylinSessionStateBuilder" + } + + val (instance, clazz) = if (SPARK_VERSION.startsWith("2.4")) { + createObject(className, kylinSession, None) + } else if (SPARK_VERSION.startsWith("3.1")) { + createObject(className, kylinSession, None, Map.empty) } else { - throw new UnsupportedOperationException("Spark version not supported") + throw new UnsupportedOperationException(s"Spark version ${SPARK_VERSION} not supported") } + clazz.getMethod("build").invoke(instance) } def createObject(className: String, conArgs: Object*): (Any, Class[_]) = { diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/monitor/MonitorExecutorExtension.scala similarity index 100% copy from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala copy to kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/monitor/MonitorExecutorExtension.scala diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala similarity index 91% copy from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala copy to kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala index d9bdcc5..232f6cc 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala @@ -21,7 +21,9 @@ package org.apache.spark.sql.catalyst.expressions import scala.util.{Failure, Success, Try} import scala.reflect.ClassTag import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FunctionBuilder, expressions} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter object ExpressionUtils { @@ -78,6 +80,12 @@ ExpressionUtils { (name, (expressionInfo[T](name), builder)) } + def simpleString(expression: Expression): String = expression.simpleString + + def translateFilter(expression: Expression): Option[Filter] = { + DataSourceStrategy.translateFilter(expression) + } + private def expressionInfo[T <: Expression : ClassTag](name: String): ExpressionInfo = { val clazz = scala.reflect.classTag[T].runtimeClass val df = clazz.getAnnotation(classOf[ExpressionDescription]) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala similarity index 98% copy from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala copy to kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala index 90ff597..0fbb39d 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala @@ -21,7 +21,7 @@ package org.apache.spark.sql.execution import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.kylin.common.KylinConfig import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, ExpressionUtils, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources._ @@ -57,7 +57,7 @@ class KylinFileSourceScanExec( ret } - private lazy val inputRDD: RDD[InternalRow] = { + private lazy val _inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -77,11 +77,12 @@ class KylinFileSourceScanExec( } override def inputRDDs(): Seq[RDD[InternalRow]] = { - inputRDD :: Nil + _inputRDD :: Nil } @transient - private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) + private val pushedDownFilters = dataFilters + .flatMap(ExpressionUtils.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinJoinSelection.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala similarity index 100% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinJoinSelection.scala rename to kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala similarity index 68% copy from kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala copy to kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala index 9f709f4..d9ca8ab 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/datasource/FilterExt.scala @@ -14,17 +14,23 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ -package org.apache.kylin.cluster +package org.apache.spark.sql.execution.datasource -import org.apache.kylin.common.KylinConfig -import org.apache.spark.util.KylinReflectUtils +import org.apache.spark.sql.sources.Filter +case class AlwaysTrue() extends Filter { + override def references: Array[String] = Array.empty +} + +object AlwaysTrue extends AlwaysTrue { +} -object ClusterInfoFetcherFactory { - def create(kylinConfig: KylinConfig): ClusterInfoFetcher = { - KylinReflectUtils.createObject(kylinConfig.getClusterInfoFetcherClassName)._1.asInstanceOf[ClusterInfoFetcher] - } +case class AlwaysFalse() extends Filter { + override def references: Array[String] = Array.empty } + +object AlwaysFalse extends AlwaysFalse { +} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala similarity index 99% copy from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala copy to kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala index c928e84..cabe9c6 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala @@ -42,6 +42,7 @@ object QueryMetricUtils extends Logging { // There is only 'numOutputRows' metric in HiveTableScanExec (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l) } + val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1)) .filter(_ >= 0L).toList.asJava val scanFiles = metrics.map(metrics => java.lang.Long.valueOf(metrics._2)) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala similarity index 90% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala rename to kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala index d291faf..984fe45 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/monitor/MonitorExecutorExtension.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/monitor/MonitorExecutorExtension.scala @@ -18,10 +18,12 @@ package org.apache.spark.memory +import java.util + +import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext} import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcAddress, RpcEnv} -import org.apache.spark.util.RpcUtils -import org.apache.spark.{ExecutorPlugin, SparkConf, SparkEnv} +import org.apache.spark.{SparkConf, SparkEnv} class MonitorExecutorExtension extends ExecutorPlugin with Logging { @@ -31,7 +33,7 @@ class MonitorExecutorExtension extends ExecutorPlugin with Logging { val sparkConf: SparkConf = env.conf - override def init(): Unit = { + override def init(pluginContext: PluginContext, extraConf: util.Map[String, String]): Unit = { initMonitorEnv() diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala similarity index 91% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala rename to kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala index d9bdcc5..8405493 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/catalyst/expressions/ExpressionUtils.scala @@ -22,6 +22,8 @@ import scala.util.{Failure, Success, Try} import scala.reflect.ClassTag import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter object ExpressionUtils { @@ -78,6 +80,12 @@ ExpressionUtils { (name, (expressionInfo[T](name), builder)) } + def simpleString(expr: Expression): String = expr.simpleString(1) + + def translateFilter(expr: Expression): Option[Filter] ={ + DataSourceStrategy.translateFilter(expr, supportNestedPredicatePushdown = true) + } + private def expressionInfo[T <: Expression : ClassTag](name: String): ExpressionInfo = { val clazz = scala.reflect.classTag[T].runtimeClass val df = clazz.getAnnotation(classOf[ExpressionDescription]) @@ -91,7 +99,9 @@ ExpressionUtils { df.arguments(), df.examples(), df.note(), - df.since()) + df.group(), + df.since(), + df.deprecated()) } else { // This exists for the backward compatibility with old `ExpressionDescription`s defining // the extended description in `extended()`. diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala similarity index 86% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala rename to kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala index 90ff597..957944b 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinFileSourceScanExec.scala @@ -21,43 +21,55 @@ package org.apache.spark.sql.execution import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} import org.apache.kylin.common.KylinConfig import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, ExpressionUtils, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.execution.datasource.{FilePruner, ShardSpec} import org.apache.spark.sql.types.StructType -import scala.collection.mutable.ArrayBuffer +import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.collection.mutable.{ArrayBuffer, HashMap} class KylinFileSourceScanExec( @transient override val relation: HadoopFsRelation, override val output: Seq[Attribute], override val requiredSchema: StructType, override val partitionFilters: Seq[Expression], - val optionalShardSpec: Option[ShardSpec], + optionalShardSpec: Option[ShardSpec], + ignoredNumCoalescedBuckets: Option[Int], override val dataFilters: Seq[Expression], - override val tableIdentifier: Option[TableIdentifier]) extends FileSourceScanExec( - relation, output, requiredSchema, partitionFilters, None, dataFilters, tableIdentifier) { + override val tableIdentifier: Option[TableIdentifier], + ignoredDisableBucketedScan: Boolean = true) extends FileSourceScanExec( + relation, output, requiredSchema, partitionFilters, None, None, dataFilters, tableIdentifier, true) { + + private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty - @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { + private def sendDriverMetrics(): Unit = { + driverMetrics.foreach(e => metrics(e._1).add(e._2)) + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, + metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq) + } + + @transient lazy val _selectedPartitions: Seq[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) - val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 - - metrics("numFiles").add(ret.map(_.files.size.toLong).sum) - metrics("metadataTime").add(timeTakenMs) - val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics("numFiles") :: metrics("metadataTime") :: Nil) + driverMetrics("numFiles") = ret.map(_.files.size.toLong).sum + driverMetrics("filesSize") = ret.map(_.files.map(_.getLen).sum).sum + if (relation.partitionSchemaOption.isDefined) { + driverMetrics("numPartitions") = ret.length + } + val timeTakenMs = NANOSECONDS.toMillis((System.nanoTime() - startTime) + optimizerMetadataTimeNs) + driverMetrics("metadataTime") = timeTakenMs ret } - private lazy val inputRDD: RDD[InternalRow] = { + private lazy val _inputRDD: RDD[InternalRow] = { val readFile: (PartitionedFile) => Iterator[InternalRow] = relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, @@ -68,20 +80,23 @@ class KylinFileSourceScanExec( options = relation.options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - optionalShardSpec match { + val readRDD = optionalShardSpec match { case Some(spec) if KylinConfig.getInstanceFromEnv.isShardingJoinOptEnabled => - createShardingReadRDD(spec, readFile, selectedPartitions, relation) + createShardingReadRDD(spec, readFile, _selectedPartitions, relation) case _ => - createNonShardingReadRDD(readFile, selectedPartitions, relation) + createNonShardingReadRDD(readFile, _selectedPartitions, relation) } + sendDriverMetrics() + readRDD } override def inputRDDs(): Seq[RDD[InternalRow]] = { - inputRDD :: Nil + _inputRDD :: Nil } @transient - private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) + private val pushedDownFilters = dataFilters + .flatMap(ExpressionUtils.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { diff --git a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala new file mode 100644 index 0000000..243ffd6 --- /dev/null +++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.spark.sql.execution + +import org.apache.kylin.common.KylinConfig +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper} +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractSingleColumnNullAwareAntiJoin} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.{SparkSession, Strategy} + +import javax.annotation.concurrent.GuardedBy + +/** + * . + */ +case class KylinJoinSelection(session: SparkSession) extends Strategy + with JoinSelectionHelper + with PredicateHelper + with Logging { + + val conf: SQLConf = session.sessionState.conf + + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + + // If it is an equi-join, we first look at the join hints w.r.t. the following order: + // 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides + // have the broadcast hints, choose the smaller side (based on stats) to broadcast. + // 2. sort merge hint: pick sort merge join if join keys are sortable. + // 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both + // sides have the shuffle hash hints, choose the smaller side (based on stats) as the + // build side. + // 4. shuffle replicate NL hint: pick cartesian product if join type is inner like. + // + // If there is no hint or the hints are not applicable, we follow these rules one by one: + // 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type + // is supported. If both sides are small, choose the smaller side (based on stats) + // to broadcast. + // 2. Pick shuffle hash join if one side is small enough to build local hash map, and is + // much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false. + // 3. Pick sort merge join if the join keys are sortable. + // 4. Pick cartesian product if join type is inner like. + // 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have + // other choice. + case j@ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) => + def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = { + getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { + buildSide => + Seq(joins.BroadcastHashJoinExec( + leftKeys, + rightKeys, + joinType, + buildSide, + nonEquiCond, + planLater(left), + planLater(right))) + } + } + + def createShuffleHashJoin(onlyLookingAtHint: Boolean) = { + getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { + buildSide => + Seq(joins.ShuffledHashJoinExec( + leftKeys, + rightKeys, + joinType, + buildSide, + nonEquiCond, + planLater(left), + planLater(right))) + } + } + + def createSortMergeJoin() = { + if (RowOrdering.isOrderable(leftKeys)) { + Some(Seq(joins.SortMergeJoinExec( + leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right)))) + } else { + None + } + } + + def createCartesianProduct() = { + if (joinType.isInstanceOf[InnerLike]) { + // `CartesianProductExec` can't implicitly evaluate equal join condition, here we should + // pass the original condition which includes both equal and non-equal conditions. + Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition))) + } else { + None + } + } + + def createJoinWithoutHint() = { + createBroadcastHashJoin(false) + .orElse { + if (!conf.preferSortMergeJoin) { + createShuffleHashJoin(false) + } else { + None + } + } + .orElse(createSortMergeJoin()) + .orElse(createCartesianProduct()) + .getOrElse { + // This join could be very slow or OOM + val buildSide = getSmallerSide(left, right) + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), buildSide, joinType, nonEquiCond)) + } + } + + createBroadcastHashJoin(true) + .orElse { + if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None + } + .orElse(createShuffleHashJoin(true)) + .orElse { + if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None + } + .getOrElse(createJoinWithoutHint()) + + case j@ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) => + Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, BuildRight, + None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = true)) + + // If it is not an equi-join, we first look at the join hints w.r.t. the following order: + // 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast + // hints, choose the smaller side (based on stats) to broadcast for inner and full joins, + // choose the left side for right join, and choose right side for left join. + // 2. shuffle replicate NL hint: pick cartesian product if join type is inner like. + // + // If there is no hint or the hints are not applicable, we follow these rules one by one: + // 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left + // side is broadcast-able and it's left join, or only right side is broadcast-able and + // it's right join, we skip this rule. If both sides are small, broadcasts the smaller + // side for inner and full joins, broadcasts the left side for right join, and broadcasts + // right side for left join. + // 2. Pick cartesian product if join type is inner like. + // 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have + // other choice. It broadcasts the smaller side for inner and full joins, broadcasts the + // left side for right join, and broadcasts right side for left join. + case logical.Join(left, right, joinType, condition, hint) => + val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) { + getSmallerSide(left, right) + } else { + // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if + // it's a right join, and broadcast right side if it's a left join. + // TODO: revisit it. If left side is much smaller than the right side, it may be better + // to broadcast the left side even if it's a left join. + if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight + } + + def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = { + val maybeBuildSide = if (buildLeft && buildRight) { + Some(desiredBuildSide) + } else if (buildLeft) { + Some(BuildLeft) + } else if (buildRight) { + Some(BuildRight) + } else { + None + } + + maybeBuildSide.map { buildSide => + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), buildSide, joinType, condition)) + } + } + + def createCartesianProduct() = { + if (joinType.isInstanceOf[InnerLike]) { + Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) + } else { + None + } + } + + def createJoinWithoutHint() = { + createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf)) + .orElse(createCartesianProduct()) + .getOrElse { + // This join could be very slow or OOM + Seq(joins.BroadcastNestedLoopJoinExec( + planLater(left), planLater(right), desiredBuildSide, joinType, condition)) + } + } + + createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) + .orElse { + if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None + } + .getOrElse(createJoinWithoutHint()) + + // --- Cases where this strategy does not apply --------------------------------------------- + case _ => Nil + } + + override def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = { + val size = plan.stats.sizeInBytes + size >= 0 && size <= conf.autoBroadcastJoinThreshold && JoinMemoryManager.acquireMemory(size.toLong) + } +} + +object JoinMemoryManager extends Logging { + + @GuardedBy("this") + private[this] var memoryUsed: Long = 0 + + def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized { + assert(numBytesToAcquire >= 0) + val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed) + if (enoughMemory) { + memoryUsed += numBytesToAcquire + logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used $memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.") + } else { + logInfo("Driver memory is not enough for BHJ.") + } + enoughMemory + } + + private def maxMemoryJoinCanUse: Long = { + val joinMemoryFraction = KylinConfig.getInstanceFromEnv.getJoinMemoryFraction + (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong + } + + def releaseAllMemory(): Unit = synchronized { + memoryUsed = 0 + } + +} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala similarity index 61% rename from kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala rename to kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala index c928e84..0fc917f 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/hive/utils/QueryMetricUtils.scala @@ -19,29 +19,22 @@ package org.apache.spark.sql.hive.utils import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.{FileSourceScanExec, KylinFileSourceScanExec, SparkPlan} import org.apache.spark.sql.hive.execution.HiveTableScanExec -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters.seqAsJavaListConverter object QueryMetricUtils extends Logging { + def collectScanMetrics(plan: SparkPlan): (java.util.List[java.lang.Long], java.util.List[java.lang.Long], - java.util.List[java.lang.Long], java.util.List[java.lang.Long], java.util.List[java.lang.Long]) = { + java.util.List[java.lang.Long], java.util.List[java.lang.Long], java.util.List[java.lang.Long]) = { try { val metrics = plan.collect { - case exec: KylinFileSourceScanExec => - //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("numFiles").value, - exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) - case exec: FileSourceScanExec => - //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - (exec.metrics.apply("numOutputRows").value, exec.metrics.apply("numFiles").value, - exec.metrics.apply("metadataTime").value, exec.metrics.apply("scanTime").value, -1l) - case exec: HiveTableScanExec => - //(exec.metrics.apply("numOutputRows").value, exec.metrics.apply("readBytes").value) - // There is only 'numOutputRows' metric in HiveTableScanExec - (exec.metrics.apply("numOutputRows").value, -1l, -1l, -1l, -1l) + case exec: AdaptiveSparkPlanExec => metricLine(recursiveGetSparkPlan(exec.executedPlan)) + case exec: SparkPlan => metricLine(exec) } + val scanRows = metrics.map(metric => java.lang.Long.valueOf(metric._1)) .filter(_ >= 0L).toList.asJava val scanFiles = metrics.map(metrics => java.lang.Long.valueOf(metrics._2)) @@ -58,7 +51,36 @@ object QueryMetricUtils extends Logging { case throwable: Throwable => logWarning("Error occurred when collect query scan metrics.", throwable) (List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava, - List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava) + List.empty[java.lang.Long].asJava, List.empty[java.lang.Long].asJava) + } + } + + private def metricLine(exec: SparkPlan) = { + ( + exec.metrics.get("numOutputRows").map(_.value).getOrElse(-1L), + exec.metrics.get("numFiles").map(_.value).getOrElse(-1L), + exec.metrics.get("metadataTime").map(_.value).getOrElse(-1L), + exec.metrics.get("scanTime").map(_.value).getOrElse(-1L), + exec.metrics.get("filesSize").map(_.value).getOrElse(-1L) + ) + } + + private def recursiveGetSparkPlan(sparkPlan: SparkPlan): SparkPlan = { + sparkPlan match { + case exec: ShuffleQueryStageExec => + recursiveGetSparkPlan(exec.plan) + case exec: KylinFileSourceScanExec => + exec + case exec: FileSourceScanExec => + exec + case exec: HiveTableScanExec => + exec + case _ => { + if (sparkPlan.children.isEmpty) { + return null + } + recursiveGetSparkPlan(sparkPlan.children.head) + } } } } diff --git a/kylin-spark-project/kylin-spark-engine/pom.xml b/kylin-spark-project/kylin-spark-engine/pom.xml index 9954afe..de582b8 100644 --- a/kylin-spark-project/kylin-spark-engine/pom.xml +++ b/kylin-spark-project/kylin-spark-engine/pom.xml @@ -63,6 +63,21 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency> @@ -79,16 +94,29 @@ </dependency> <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <type>test-jar</type> <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.scalamock</groupId> - <artifactId>scalamock_2.11</artifactId> + <artifactId>scalamock_${scala.binary.version}</artifactId> <version>4.1.0</version> <scope>test</scope> </dependency> diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java index b6f4bb8..3b353ed 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingEngineParquet.java @@ -26,7 +26,7 @@ import org.apache.kylin.engine.IBatchCubingEngine; import org.apache.kylin.engine.spark.job.NSparkMergingJob; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; -import org.spark_project.guava.collect.Sets; +import org.apache.kylin.shaded.com.google.common.collect.Sets; public class SparkBatchCubingEngineParquet implements IBatchCubingEngine { @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java index da00f6d..886e476 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java @@ -37,9 +37,9 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.spark_project.guava.base.Preconditions; /** * diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java index 146a1f2..2b10518 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingUtil.java @@ -21,8 +21,8 @@ package org.apache.kylin.engine.spark.job; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.spark.sql.Column; -import org.spark_project.guava.collect.Sets; import java.util.Collection; import java.util.LinkedHashSet; diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala index 9f709f4..092eeef 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/cluster/ClusterInfoFetcherFactory.scala @@ -19,7 +19,7 @@ package org.apache.kylin.cluster import org.apache.kylin.common.KylinConfig -import org.apache.spark.util.KylinReflectUtils +import org.apache.spark.utils.KylinReflectUtils object ClusterInfoFetcherFactory { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala index dea6ede..f7ec55d 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeDictionaryBuilder.scala @@ -83,7 +83,7 @@ class CubeDictionaryBuilder(val dataset: Dataset[Row], .filter(dictCol.isNotNull) .repartition(bucketPartitionSize, dictCol) .foreachPartition { - iter => + iter: Iterator[Row] => DictHelper.genDict(columnName, broadcastDict, iter) } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala index 84beaf2..35ac1af 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CuboidAggregator.scala @@ -28,10 +28,11 @@ import org.apache.kylin.measure.bitmap.BitmapMeasureType import org.apache.kylin.measure.hllc.HLLCMeasureType import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.functions.{col, _} -import org.apache.spark.sql.types.{StringType, _} +import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DoubleType, FloatType, ShortType, StringType, _} import org.apache.spark.sql.udaf._ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ import scala.collection.mutable @@ -72,8 +73,19 @@ object CuboidAggregator { val colIndex = dataSet.schema.fieldNames.zipWithIndex.map(tp => (tp._2, tp._1)).toMap columns.appendAll(measure.pra.map(p =>col(p.id.toString))) } else { - val value = measure.pra.head.asInstanceOf[LiteralColumnDesc].value - columns.append(new Column(Literal.create(value, measure.pra.head.dataType))) + var value = measure.pra.head.asInstanceOf[LiteralColumnDesc].value + value = measure.pra.head.dataType match { + case BooleanType => value.asInstanceOf[String].toBoolean + case ByteType => value.asInstanceOf[String].toByte + case ShortType => value.asInstanceOf[String].toShort + case IntegerType | DateType => value.asInstanceOf[String].toInt + case LongType | TimestampType => value.asInstanceOf[String].toLong + case FloatType => value.asInstanceOf[String].toFloat + case DoubleType => value.asInstanceOf[String].toDouble + case BinaryType => value.asInstanceOf[String].toArray + case StringType => value.asInstanceOf[UTF8String] + } + columns.append(new Column(Literal.create(value, measure.pra.head.dataType))) } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala index 49aa30f..a08affe 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala @@ -24,11 +24,11 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.SqlKind._ import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils import org.apache.kylin.query.util.UnsupportedSparkFunctionException import org.apache.spark.sql.Column import org.apache.spark.sql.KylinFunctions._ import org.apache.spark.sql.catalyst.expressions.{If, IfNull, StringLocate} -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.utils.SparkTypeUtil @@ -213,8 +213,7 @@ object ExpressionConverter { // time_funcs case "current_date" => k_lit( - DateTimeUtils.dateToString( - DateTimeUtils.millisToDays(System.currentTimeMillis()))) + CrossDateTimeUtils.dateToString()) case "current_timestamp" => current_timestamp() case "to_timestamp" => diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala index 7413e4b..3beddcf 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampAddImpl.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.udf import java.util.{Calendar, Locale, TimeZone} - -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils.{MICROS_PER_MILLIS, MONTHS_PER_QUARTER} +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils object TimestampAddImpl { private val localCalendar = new ThreadLocal[Calendar] { @@ -35,7 +34,7 @@ object TimestampAddImpl { calendar.clear() addTime("DAY", time, calendar) addTime(unit, increment, calendar) - DateTimeUtils.millisToDays(calendar.getTimeInMillis) + CrossDateTimeUtils.millisToDays(calendar.getTimeInMillis) } // add long on DateType diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala index f0d4d9e..f8453a2 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/sql/udf/TimestampDiffImpl.scala @@ -19,17 +19,16 @@ package org.apache.spark.sql.udf import java.util.Locale - import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils import org.apache.kylin.engine.spark.common.util.KylinDateTimeUtils._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils object TimestampDiffImpl { // TimestampType -> DateType def evaluate(unit: String, timestamp: Long, date: Int): Long = { val before = timestamp / MICROS_PER_MILLIS - val after = DateTimeUtils.daysToMillis(date) + val after = CrossDateTimeUtils.daysToMillis(date) convertDuration(unit, before, after) } @@ -42,14 +41,14 @@ object TimestampDiffImpl { // DateType -> DateType def evaluate(unit: String, date1: Int, date2: Int): Long = { - val before = DateTimeUtils.daysToMillis(date1) - val after = DateTimeUtils.daysToMillis(date2) + val before = CrossDateTimeUtils.daysToMillis(date1) + val after = CrossDateTimeUtils.daysToMillis(date2) convertDuration(unit, before, after) } // DateType -> TimestampType def evaluate(unit: String, date: Int, timestamp: Long): Long = { - val before = DateTimeUtils.daysToMillis(date) + val before = CrossDateTimeUtils.daysToMillis(date) val after = timestamp / MICROS_PER_MILLIS convertDuration(unit, before, after) } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala deleted file mode 100644 index eef75cb..0000000 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/util/KylinReflectUtils.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.spark.util - -import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION -import org.apache.spark.{SPARK_VERSION, SparkContext} - -import scala.reflect.runtime.universe - -object KylinReflectUtils { - private val rm = universe.runtimeMirror(getClass.getClassLoader) - - def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any = { - if (SPARK_VERSION.startsWith("2.4")) { - var className: String = - "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder" - if (!"hive".equals(sparkContext.getConf - .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) { - className = "org.apache.spark.sql.hive.KylinSessionStateBuilder" - } - val tuple = createObject(className, kylinSession, None) - val method = tuple._2.getMethod("build") - method.invoke(tuple._1) - } else { - throw new UnsupportedOperationException("Spark version not supported") - } - } - - def createObject(className: String, conArgs: Object*): (Any, Class[_]) = { - val clazz = Utils.classForName(className) - val ctor = clazz.getConstructors.head - ctor.setAccessible(true) - (ctor.newInstance(conArgs: _*), clazz) - } - - def createObject(className: String): (Any, Class[_]) = { - val clazz = Utils.classForName(className) - val ctor = clazz.getConstructors.head - ctor.setAccessible(true) - (ctor.newInstance(), clazz) - } -} diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index a0c49e8..cfd84aa 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -55,6 +55,7 @@ import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.KylinSparkEnv; @@ -70,7 +71,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.spark_project.guava.collect.Sets; import java.io.File; import java.io.IOException; diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java index f788d15..8bf5fb8 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/JobStepFactoryTest.java @@ -30,9 +30,9 @@ import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.SegmentRange; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.Segments; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.junit.Assert; import org.junit.Test; -import org.spark_project.guava.collect.Sets; import java.io.IOException; import java.util.Set; diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java index 0cbe4c0..91768ac 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/job/SparkCubingJobTest.java @@ -33,6 +33,7 @@ import org.apache.kylin.job.execution.ExecutableManager; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.metadata.model.SegmentRange; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.storage.StorageFactory; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -49,7 +50,6 @@ import org.junit.Test; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.spark_project.guava.collect.Sets; import java.util.HashSet; import java.util.Map; diff --git a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala index 3ecdfb5..f8dd610 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala +++ b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala @@ -20,9 +20,9 @@ package org.apache.kylin.engine.spark.builder import java.text.SimpleDateFormat import java.util.{Locale, TimeZone, UUID} - import org.apache.kylin.common.KylinConfig import org.apache.kylin.cube.{CubeInstance, CubeManager, CubeSegment} +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils import org.apache.kylin.engine.spark.job.KylinBuildEnv import org.apache.kylin.engine.spark.metadata.MetadataConverter import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree @@ -30,7 +30,7 @@ import org.apache.kylin.job.engine.JobEngineConfig import org.apache.kylin.job.impl.threadpool.DefaultScheduler import org.apache.kylin.job.lock.MockJobLock import org.apache.kylin.metadata.model.SegmentRange -import org.apache.spark.InfoHelper +import org.apache.spark.{InfoHelper, SPARK_VERSION} import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite} import org.apache.spark.sql.{Dataset, Row} import org.junit.Assert @@ -96,17 +96,32 @@ class TestCreateFlatTable extends SparderBaseFunSuite with SharedSparkSession wi val afterJoin1 = generateFlatTable(seg1, cube, true) afterJoin1.collect() - val jobs = helper.getJobsByGroupId(groupId) - Assert.assertEquals(jobs.length, 15) + if (SPARK_VERSION.startsWith("2.4")) { + val jobs = helper.getJobsByGroupId(groupId) + Assert.assertEquals(jobs.length, 15) + } else if (SPARK_VERSION.startsWith("3.1")) { + // in Spark 3.x, BroadcastExchangeExec overwrites job group ID + val jobs = helper.getJobsByGroupId(null) + Assert.assertEquals(6, jobs.count(_.jobGroup.exists(_.endsWith(groupId)))) + Assert.assertEquals(9, jobs.count(_.description.exists(_.contains("broadcast exchange")))) + } DefaultScheduler.destroyInstance() } private def checkFilterCondition(ds: Dataset[Row], seg: CubeSegment) = { val queryExecution = ds.queryExecution.simpleString - val startTime = dateFormat.format(seg.getTSRange.start.v) - val endTime = dateFormat.format(seg.getTSRange.end.v) + var startTime = dateFormat.format(seg.getTSRange.start.v) + var endTime = dateFormat.format(seg.getTSRange.end.v) //Test Filter Condition + + // dates will not be converted to string by default since spark 3.0.0. + // see https://issues.apache.org/jira/browse/SPARK-27638 for details. + if (SPARK_VERSION.startsWith("3.") && conf.get("spark.sql.legacy.typeCoercion.datetimeToString.enabled", "false") == "false") { + startTime = CrossDateTimeUtils.stringToDate(startTime).get.toString + endTime = CrossDateTimeUtils.stringToDate(endTime).get.toString + } + Assert.assertTrue(queryExecution.contains(startTime)) Assert.assertTrue(queryExecution.contains(endTime)) } diff --git a/kylin-spark-project/kylin-spark-metadata/pom.xml b/kylin-spark-project/kylin-spark-metadata/pom.xml index 9cdd824..71b8b4e 100644 --- a/kylin-spark-project/kylin-spark-metadata/pom.xml +++ b/kylin-spark-project/kylin-spark-metadata/pom.xml @@ -39,21 +39,55 @@ <groupId>org.apache.kylin</groupId> <artifactId>kylin-core-cube</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </dependency> </dependencies> <build> <plugins> <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/${spark.version.dir}</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> - <id>scala-compile-first</id> + <id>compile-version-dependent-source</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <sourceDir>${spark.version.dir}</sourceDir> + </configuration> + </execution> + <execution> + <id>compile-common-scala-source</id> <phase>process-resources</phase> <goals> - <goal>add-source</goal> <goal>compile</goal> </goals> + <configuration> + <sourceDir>scala</sourceDir> + </configuration> </execution> <execution> <id>scala-test-compile</id> diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala index 2e189ec..486e358 100644 --- a/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala +++ b/kylin-spark-project/kylin-spark-metadata/src/main/scala/org/apache/spark/sql/utils/SparkTypeUtil.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.utils import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.spark.unsafe.types.UTF8String import org.apache.kylin.common.util.DateFormat import org.apache.spark.sql.Column import org.apache.spark.internal.Logging @@ -28,19 +27,23 @@ import java.math.BigDecimal import org.apache.calcite.util.NlsString import org.apache.calcite.rel.`type`.RelDataType import java.sql.{Date, Timestamp, Types} +import java.time.ZoneId import java.util.regex.Pattern import org.apache.spark.sql.functions.col import org.apache.calcite.avatica.util.TimeUnitRange import org.apache.calcite.rex.RexLiteral -import java.util.{GregorianCalendar, Locale, TimeZone} +import java.util.{GregorianCalendar, Locale} import org.apache.kylin.engine.spark.metadata.FunctionDesc -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.kylin.metadata.datatype.DataType import org.apache.spark.sql.types._ +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils object SparkTypeUtil extends Logging { + private def defaultZoneId = ZoneId.systemDefault() + private def UTC = ZoneId.of("UTC") + val DATETIME_FAMILY = List("time", "date", "timestamp", "datetime") def isDateTimeFamilyType(dataType: String): Boolean = { @@ -167,9 +170,9 @@ object SparkTypeUtil extends Logging { s.getValue case g: GregorianCalendar => if (literal.getTypeName.getName.equals("DATE")) { - new Date(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).get / 1000) + new Date(CrossDateTimeUtils.stringToTimestamp(literal).get / 1000) } else { - new Timestamp(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).get / 1000) + new Timestamp(CrossDateTimeUtils.stringToTimestamp(literal).get / 1000) } case range: TimeUnitRange => // Extract(x from y) in where clause @@ -259,7 +262,7 @@ object SparkTypeUtil extends Logging { val time = DateFormat.stringToDate(string).getTime if (toCalcite) { //current date is local timezone, org.apache.calcite.avatica.util.AbstractCursor.DateFromNumberAccessor need to utc - DateTimeUtils.stringToDate(UTF8String.fromString(string)).get + CrossDateTimeUtils.stringToDate(string).get } else { // ms to s time / 1000 @@ -277,7 +280,7 @@ object SparkTypeUtil extends Logging { var ts = s.asInstanceOf[Timestamp].toString if (toCalcite) { // current ts is local timezone ,org.apache.calcite.avatica.util.AbstractCursor.TimeFromNumberAccessor need to utc - DateTimeUtils.stringToTimestamp(UTF8String.fromString(ts), TimeZone.getTimeZone("UTC")).get / 1000 + CrossDateTimeUtils.stringToTimestamp(ts, UTC).get / 1000 } else { // ms to s s.asInstanceOf[Timestamp].getTime / 1000 diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala b/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala new file mode 100644 index 0000000..54dd480 --- /dev/null +++ b/kylin-spark-project/kylin-spark-metadata/src/main/spark24/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark.cross + +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{SQLDate, SQLTimestamp} +import org.apache.spark.unsafe.types.UTF8String + +import java.time.ZoneId +import java.util.TimeZone + +object CrossDateTimeUtils { + def stringToTimestamp(value: Any): Option[SQLTimestamp] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), TimeZone.getDefault) + } + + def stringToTimestamp(value: Any, zoneId: ZoneId): Option[SQLTimestamp] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), TimeZone.getTimeZone(zoneId)) + } + + def stringToDate(value: Any): Option[SQLDate] = { + DateTimeUtils.stringToDate(UTF8String.fromString(value.toString)) + } + + def millisToDays(millis: Long): Int = { + DateTimeUtils.millisToDays(millis) + } + + def daysToMillis(days: Int): Long = { + DateTimeUtils.daysToMillis(days) + } + + def dateToString(): String = { + DateTimeUtils.dateToString(DateTimeUtils.millisToDays(System.currentTimeMillis())) + } +} diff --git a/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala b/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala new file mode 100644 index 0000000..2ef6b18 --- /dev/null +++ b/kylin-spark-project/kylin-spark-metadata/src/main/spark31/org/apache/kylin/engine/spark/cross/CrossDateTimeUtils.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.spark.cross + +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.unsafe.types.UTF8String + +import java.time.ZoneId +import java.util.TimeZone + +object CrossDateTimeUtils { + val DEFAULT_TZ_ID = TimeZone.getDefault.toZoneId + + def stringToTimestamp(value: Any): Option[Long] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), DEFAULT_TZ_ID) + } + + def stringToTimestamp(value: Any, zoneId: ZoneId): Option[Long] = { + DateTimeUtils.stringToTimestamp(UTF8String.fromString(value.toString), zoneId) + } + + def stringToDate(value: Any): Option[Int] = { + DateTimeUtils.stringToDate(UTF8String.fromString(value.toString), DEFAULT_TZ_ID) + } + + def millisToDays(millis: Long): Int = { + DateTimeUtils.microsToDays(millis * 1000, DEFAULT_TZ_ID) + } + + def daysToMillis(days: Int): Long = { + DateTimeUtils.daysToMicros(days, DEFAULT_TZ_ID) + } + + def dateToString(): String = { + TimestampFormatter + .apply("yyyy-MM-dd", DEFAULT_TZ_ID, isParsing = false) + .format(DateTimeUtils.currentTimestamp()) + } +} diff --git a/kylin-spark-project/kylin-spark-query/pom.xml b/kylin-spark-project/kylin-spark-query/pom.xml index 2e6f7e4..5a6cff0 100644 --- a/kylin-spark-project/kylin-spark-query/pom.xml +++ b/kylin-spark-project/kylin-spark-query/pom.xml @@ -71,7 +71,12 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> <scope>provided</scope> </dependency> @@ -87,16 +92,28 @@ </dependency> <dependency> + <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + </exclusions> <type>test-jar</type> - <scope>provided</scope> + <scope>test</scope> </dependency> <dependency> <groupId>org.scalamock</groupId> - <artifactId>scalamock_2.11</artifactId> + <artifactId>scalamock_${scala.binary.version}</artifactId> <version>4.1.0</version> <scope>test</scope> </dependency> @@ -125,36 +142,20 @@ <build> <plugins> <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>8</source> - <target>8</target> - </configuration> - </plugin> - <!--<plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.2.2</version> - </plugin>--> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> - <configuration> - <reportsDirectory> - ${project.build.directory}/surefire-reports - </reportsDirectory> - <junitxml>.</junitxml> - <filereports>SparkTestSuite.txt</filereports> - <stdout>I</stdout> - </configuration> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> <executions> <execution> - <id>test</id> + <id>add-source</id> + <phase>generate-sources</phase> <goals> - <goal>test</goal> + <goal>add-source</goal> </goals> + <configuration> + <sources> + <source>src/main/${spark.version.dir}</source> + </sources> + </configuration> </execution> </executions> </plugin> @@ -163,12 +164,24 @@ <artifactId>scala-maven-plugin</artifactId> <executions> <execution> - <id>scala-compile-first</id> + <id>compile-version-dependent-source</id> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + <configuration> + <sourceDir>${spark.version.dir}</sourceDir> + </configuration> + </execution> + <execution> + <id>compile-common-scala-source</id> <phase>process-resources</phase> <goals> - <goal>add-source</goal> <goal>compile</goal> </goals> + <configuration> + <sourceDir>scala</sourceDir> + </configuration> </execution> <execution> <id>scala-test-compile</id> diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala index bad566f..764807d 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala @@ -27,7 +27,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.kylin.query.exception.UnsupportedSparkFunctionException import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.{If, IfNull, StringLocate} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.KylinFunctions._ import org.apache.spark.sql.utils.SparkTypeUtil @@ -216,9 +216,7 @@ object ExpressionConverter { callUDF("split_part", args: _*) // time_funcs case "current_date" => - k_lit( - DateTimeUtils.dateToString( - DateTimeUtils.millisToDays(System.currentTimeMillis()))) + k_lit(CrossDateTimeUtils.dateToString()) case "current_timestamp" => current_timestamp() case "to_timestamp" => diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala index dcb4f14..a690f45 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala @@ -28,13 +28,13 @@ import org.apache.calcite.sql.SqlKind._ import org.apache.calcite.sql.`type`.{BasicSqlType, IntervalSqlType, SqlTypeFamily, SqlTypeName} import org.apache.calcite.sql.fun.SqlDatetimeSubtractionOperator import org.apache.kylin.common.util.DateFormat +import org.apache.kylin.engine.spark.cross.CrossDateTimeUtils import org.apache.spark.sql.KylinFunctions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DateType, LongType, TimestampType} import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.utils.SparkTypeUtil -import org.apache.spark.unsafe.types.UTF8String import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer @@ -312,9 +312,9 @@ class SparderRexVisitor( case literalSql: BasicSqlType => { literalSql.getSqlTypeName match { case SqlTypeName.DATE => - return Some(DateTimeUtils.stringToTime(literal.toString)) + return Some(DateTimeUtils.toJavaDate(CrossDateTimeUtils.stringToDate(literal).get)) case SqlTypeName.TIMESTAMP => - return Some(DateTimeUtils.toJavaTimestamp(DateTimeUtils.stringToTimestamp(UTF8String.fromString(literal.toString)).head)) + return Some(DateTimeUtils.toJavaTimestamp(CrossDateTimeUtils.stringToTimestamp(literal).head)) case _ => } } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala b/kylin-spark-project/kylin-spark-query/src/main/spark24/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala similarity index 100% copy from kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala copy to kylin-spark-project/kylin-spark-query/src/main/spark24/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala b/kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala similarity index 79% rename from kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala rename to kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala index 21c6372..5641723 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/spark31/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala @@ -28,14 +28,15 @@ import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionState} * @param parentState */ class KylinHiveSessionStateBuilder(sparkSession: SparkSession, - parentState: Option[SessionState] = None) - extends HiveSessionStateBuilder(sparkSession, parentState) { + parentState: Option[SessionState] = None, + options: Map[String, String] = Map.empty) + extends HiveSessionStateBuilder(sparkSession, parentState, options) { private def externalCatalog: HiveExternalCatalog = session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] override protected def newBuilder: NewBuilder = - new KylinHiveSessionStateBuilder(_, _) + new KylinHiveSessionStateBuilder(_, _, options) } @@ -46,10 +47,11 @@ class KylinHiveSessionStateBuilder(sparkSession: SparkSession, * @param parentState */ class KylinSessionStateBuilder(sparkSession: SparkSession, - parentState: Option[SessionState] = None) - extends BaseSessionStateBuilder(sparkSession, parentState) { + parentState: Option[SessionState] = None, + options: Map[String, String] = Map.empty) + extends BaseSessionStateBuilder(sparkSession, parentState, options) { override protected def newBuilder: NewBuilder = - new KylinSessionStateBuilder(_, _) + new KylinSessionStateBuilder(_, _, options) } diff --git a/kylin-spark-project/kylin-spark-test/pom.xml b/kylin-spark-project/kylin-spark-test/pom.xml index 4221983..6f2f101 100644 --- a/kylin-spark-project/kylin-spark-test/pom.xml +++ b/kylin-spark-project/kylin-spark-test/pom.xml @@ -73,7 +73,7 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <exclusions> <exclusion> <groupId>org.apache.calcite</groupId> @@ -84,7 +84,7 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <exclusions> <exclusion> <groupId>org.apache.calcite</groupId> @@ -114,6 +114,12 @@ <!-- test --> <dependency> <groupId>org.apache.kylin</groupId> + <artifactId>kylin-core-common</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kylin</groupId> <artifactId>kylin-spark-engine</artifactId> <version>${project.version}</version> <type>test-jar</type> diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java index b91a680..cb1b049 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark2; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.Quadruple; +import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -37,7 +38,6 @@ import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.spark_project.guava.collect.Lists; import java.util.Map; import java.io.File; diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java index a02672b..1bc6d11 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java @@ -30,6 +30,7 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity; import org.apache.kylin.job.exception.SchedulerException; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.storage.StorageFactory; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; @@ -47,7 +48,6 @@ import org.junit.Test; import org.roaringbitmap.longlong.Roaring64NavigableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.spark_project.guava.collect.Sets; import java.util.List; import java.util.Map; diff --git a/kylin-spark-project/pom.xml b/kylin-spark-project/pom.xml index 3b52556..3d4911f 100644 --- a/kylin-spark-project/pom.xml +++ b/kylin-spark-project/pom.xml @@ -63,7 +63,7 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <exclusions> <exclusion> <artifactId>jetty-plus</artifactId> @@ -107,25 +107,25 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-unsafe_2.11</artifactId> + <artifactId>spark-unsafe_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.11</artifactId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> @@ -140,7 +140,7 @@ <!--Env & Test--> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.11</artifactId> + <artifactId>scalatest_${scala.binary.version}</artifactId> <version>3.0.0</version> <scope>test</scope> </dependency> diff --git a/metastore-hbase/pom.xml b/metastore-hbase/pom.xml index 0945cca..209e99e 100644 --- a/metastore-hbase/pom.xml +++ b/metastore-hbase/pom.xml @@ -108,7 +108,7 @@ <!-- <!– Spark dependency –>--> <!-- <dependency>--> <!-- <groupId>org.apache.spark</groupId>--> -<!-- <artifactId>spark-core_2.11</artifactId>--> +<!-- <artifactId>spark-core_${scala.binary.version}</artifactId>--> <!-- <scope>provided</scope>--> <!-- </dependency>--> </dependencies> diff --git a/metrics-reporter-kafka/pom.xml b/metrics-reporter-kafka/pom.xml index 1c39f08..5c549e3 100644 --- a/metrics-reporter-kafka/pom.xml +++ b/metrics-reporter-kafka/pom.xml @@ -39,7 +39,7 @@ <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> </dependencies> diff --git a/parquet-assembly/pom.xml b/parquet-assembly/pom.xml index 6b34823..f2c795c 100644 --- a/parquet-assembly/pom.xml +++ b/parquet-assembly/pom.xml @@ -105,6 +105,14 @@ <pattern>org.roaringbitmap</pattern> <shadedPattern>${shadeBase}.org.roaringbitmap</shadedPattern> </relocation> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>${shadeBase}.com.fasterxml.jackson</shadedPattern> + </relocation> + <relocation> + <pattern>com.codahale.metrics</pattern> + <shadedPattern>${shadeBase}.com.codahale.metrics</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/pom.xml b/pom.xml index d83b0fc..6885f86 100644 --- a/pom.xml +++ b/pom.xml @@ -80,6 +80,7 @@ <!-- Spark versions --> <spark.version>2.4.7</spark.version> + <spark.version.dir>spark24</spark.version.dir> <janino.version>3.0.16</janino.version> <kryo.version>4.0.0</kryo.version> @@ -92,6 +93,7 @@ <!-- Scala versions --> <scala.version>2.11.8</scala.version> + <scala.binary.version>2.11</scala.binary.version> <reflections.version>0.9.10</reflections.version> @@ -519,6 +521,11 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${hadoop2.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-api</artifactId> <version>${hadoop2.version}</version> </dependency> @@ -729,7 +736,8 @@ <artifactId>avatica</artifactId> <version>${avatica.version}</version> </dependency> - <!-- Workaround for hive 0.14 avatica dependency --> + <!-- Workaround for hive 0.14 avatica dependency + WARNING: fasterxml jackson library may conflict <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-avatica</artifactId> @@ -741,6 +749,7 @@ </exclusion> </exclusions> </dependency> + --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> @@ -775,9 +784,13 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <exclusions> <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> <artifactId>jetty-plus</artifactId> <groupId>org.eclipse.jetty</groupId> </exclusion> @@ -819,20 +832,42 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.11</artifactId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.esotericsoftware</groupId> @@ -849,19 +884,19 @@ </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_2.11</artifactId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-hadoop-compatibility_2.11</artifactId> + <artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- Kafka dependency --> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependency> @@ -1159,19 +1194,19 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-unsafe_2.11</artifactId> + <artifactId>spark-unsafe_${scala.binary.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> @@ -2085,9 +2120,29 @@ </lifecycleMappingMetadata> </configuration> </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.2.0</version> + </plugin> </plugins> </pluginManagement> </build> </profile> + <profile> + <id>spark3</id> + <properties> + <scala.version>2.12.10</scala.version> + <scala.binary.version>2.12</scala.binary.version> + <spark.version>3.1.1</spark.version> + <spark.version.dir>spark31</spark.version.dir> + <jackson.version>2.10.0</jackson.version> + </properties> + <!-- + <activation> + <activeByDefault>true</activeByDefault> + </activation> + --> + </profile> </profiles> </project> diff --git a/server-base/pom.xml b/server-base/pom.xml index a5411c8..e9fc1b9 100644 --- a/server-base/pom.xml +++ b/server-base/pom.xml @@ -210,12 +210,12 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 8a039c0..6af1d53 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -483,7 +483,7 @@ public class QueryService extends BasicService { logger.warn("Write metric error.", th); } if (sqlResponse.getIsException()) - throw new InternalErrorException(sqlResponse.getExceptionMessage()); + throw new InternalErrorException(sqlResponse.getExceptionMessage(), sqlResponse.getThrowable()); return sqlResponse; diff --git a/server/pom.xml b/server/pom.xml index 82114fb..fc7a60c 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -277,7 +277,7 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> @@ -325,19 +325,19 @@ <!-- spark --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.11</artifactId> + <artifactId>spark-yarn_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> </dependencies> diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index f54e5d7..2b61666 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -44,7 +44,7 @@ <!-- Provided --> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> </dependency> <!-- Env & Test --> diff --git a/storage-hbase/pom.xml b/storage-hbase/pom.xml index 02d08d5..21c39c4 100644 --- a/storage-hbase/pom.xml +++ b/storage-hbase/pom.xml @@ -113,7 +113,7 @@ <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> </dependencies> diff --git a/stream-receiver/pom.xml b/stream-receiver/pom.xml index 0e2c258..48f0e13 100644 --- a/stream-receiver/pom.xml +++ b/stream-receiver/pom.xml @@ -53,7 +53,7 @@ </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> </exclusion> </exclusions> </dependency> diff --git a/stream-source-kafka/pom.xml b/stream-source-kafka/pom.xml index cdec1d4..6b7f1d2 100644 --- a/stream-source-kafka/pom.xml +++ b/stream-source-kafka/pom.xml @@ -59,7 +59,7 @@ <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_${scala.binary.version}</artifactId> </dependency> <dependency>