This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new d34f085 KYLIN-5057 CubeBuildJob in Kylin4.0 run failed when open Spark3.1 AQE d34f085 is described below commit d34f08594db97d0f175144f0190e782e11f2b9b5 Author: Zhichao Zhang <zhan...@apache.org> AuthorDate: Wed Aug 11 22:28:28 2021 +0800 KYLIN-5057 CubeBuildJob in Kylin4.0 run failed when open Spark3.1 AQE Root cause: With KylinJoinSelection strategy, if AQE is true, this strategy will be applied before LogicalQueryStageStrategy, and then it will be applied JoinSelection strategy again, leading to change the 'Build Type' of BroadcastHashJoin in some cases. --- .../spark/sql/execution/KylinJoinSelection.scala | 282 --------------------- .../spark/sql/execution/KylinJoinSelection.scala | 249 ------------------ .../engine/spark/application/SparkApplication.java | 21 +- 3 files changed, 4 insertions(+), 548 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala deleted file mode 100644 index 200dd24..0000000 --- a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.spark.sql.execution - -import javax.annotation.concurrent.GuardedBy -import org.apache.kylin.common.KylinConfig -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering} -import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.{SparkSession, Strategy} - -/** - * Select the proper physical plan for join based on joining keys and size of logical plan. - * - * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at least some of the - * predicates can be evaluated by matching join keys. If found, join implementations are chosen - * with the following precedence: - * - * - Broadcast hash join (BHJ): - * BHJ is not supported for full outer join. For right outer join, we only can broadcast the - * left side. For left outer, left semi, left anti and the internal join type ExistenceJoin, - * we only can broadcast the right side. For inner like join, we can broadcast both sides. - * Normally, BHJ can perform faster than the other join algorithms when the broadcast side is - * small. However, broadcasting tables is a network-intensive operation. It could cause OOM - * or perform worse than the other join algorithms, especially when the build/broadcast side - * is big. - * - * For the supported cases, users can specify the broadcast hint (e.g. the user applied the - * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and session-based - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is used and - * which join side is broadcast. - * - * 1) Broadcast the join side with the broadcast hint, even if the size is larger than - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only when the type - * is inner like join), the side with a smaller estimated physical size will be broadcast. - * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side - * whose estimated physical size is smaller than the threshold. If both sides are below the - * threshold, broadcast the smaller side. If neither is smaller, BHJ is not used. - * - * - Shuffle hash join: if the average size of a single partition is small enough to build a hash - * table. - * - * - Sort merge: if the matching join keys are sortable. - * - * If there is no joining keys, Join implementations are chosen with the following precedence: - * - BroadcastNestedLoopJoin (BNLJ): - * BNLJ supports all the join types but the impl is OPTIMIZED for the following scenarios: - * For right outer join, the left side is broadcast. For left outer, left semi, left anti - * and the internal join type ExistenceJoin, the right side is broadcast. For inner like - * joins, either side is broadcast. - * - * Like BHJ, users still can specify the broadcast hint and session-based - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is broadcast. - * - * 1) Broadcast the join side with the broadcast hint, even if the size is larger than - * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (i.e., just for - * inner-like join), the side with a smaller estimated physical size will be broadcast. - * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and broadcast the side - * whose estimated physical size is smaller than the threshold. If both sides are below the - * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not used. - * - * - CartesianProduct: for inner like join, CartesianProduct is the fallback option. - * - * - BroadcastNestedLoopJoin (BNLJ): - * For the other join types, BNLJ is the fallback option. Here, we just pick the broadcast - * side with the broadcast hint. If neither side has a hint, we broadcast the side with - * the smaller estimated physical size. - */ -case class KylinJoinSelection(session: SparkSession) extends Strategy with PredicateHelper with Logging { - - val conf: SQLConf = session.sessionState.conf - - /** - * Matches a plan whose output should be small enough to be used in broadcast join. - */ - private def canBroadcast(plan: LogicalPlan): Boolean = { - val sizeInBytes = plan.stats.sizeInBytes - sizeInBytes >= 0 && sizeInBytes <= conf.autoBroadcastJoinThreshold && JoinMemoryManager.acquireMemory(sizeInBytes.toLong) - } - - /** - * Matches a plan whose single partition should be small enough to build a hash table. - * - * Note: this assume that the number of partition is fixed, requires additional work if it's - * dynamic. - */ - private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = { - plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions - } - - /** - * Returns whether plan a is much smaller (3X) than plan b. - * - * The cost to build hash map is higher than sorting, we should only build hash map on a table - * that is much smaller than other one. Since we does not have the statistic for number of rows, - * use the size of bytes here as estimation. - */ - private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = { - a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes - } - - private def canBuildRight(joinType: JoinType): Boolean = joinType match { - case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true - case _ => false - } - - private def canBuildLeft(joinType: JoinType): Boolean = joinType match { - case _: InnerLike | RightOuter => true - case _ => false - } - - private def broadcastSide( - canBuildLeft: Boolean, - canBuildRight: Boolean, - left: LogicalPlan, - right: LogicalPlan): BuildSide = { - - def smallerSide = - if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft - - if (canBuildRight && canBuildLeft) { - // Broadcast smaller side base on its estimated physical size - // if both sides have broadcast hint - smallerSide - } else if (canBuildRight) { - BuildRight - } else if (canBuildLeft) { - BuildLeft - } else { - // for the last default broadcast nested loop join - smallerSide - } - } - - private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) - : Boolean = { - val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast - val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast - buildLeft || buildRight - } - - private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) - : BuildSide = { - val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast - val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast - broadcastSide(buildLeft, buildRight, left, right) - } - - private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) - : Boolean = { - val buildLeft = canBuildLeft(joinType) && canBroadcast(left) - val buildRight = canBuildRight(joinType) && canBroadcast(right) - buildLeft || buildRight - } - - private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan) - : BuildSide = { - val buildLeft = canBuildLeft(joinType) && canBroadcast(left) - val buildRight = canBuildRight(joinType) && canBroadcast(right) - broadcastSide(buildLeft, buildRight, left, right) - } - - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - - // --- BroadcastHashJoin -------------------------------------------------------------------- - - // broadcast hints were specified - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) - if canBroadcastByHints(joinType, left, right) => - val buildSide = broadcastSideByHints(joinType, left, right) - Seq(joins.BroadcastHashJoinExec( - leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) - - // broadcast hints were not specified, so need to infer it from size and configuration. - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) - if canBroadcastBySizes(joinType, left, right) => - val buildSide = broadcastSideBySizes(joinType, left, right) - Seq(joins.BroadcastHashJoinExec( - leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) - - // --- ShuffledHashJoin --------------------------------------------------------------------- - - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) - if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) - && muchSmaller(right, left) || - !RowOrdering.isOrderable(leftKeys) => - Seq(joins.ShuffledHashJoinExec( - leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) - - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) - if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left) - && muchSmaller(left, right) || - !RowOrdering.isOrderable(leftKeys) => - Seq(joins.ShuffledHashJoinExec( - leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) - - // --- SortMergeJoin ------------------------------------------------------------ - - case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) - if RowOrdering.isOrderable(leftKeys) => - joins.SortMergeJoinExec( - leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil - - // --- Without joining keys ------------------------------------------------------------ - - // Pick BroadcastNestedLoopJoin if one side could be broadcast - case j...@logical.join(left, right, joinType, condition) - if canBroadcastByHints(joinType, left, right) => - val buildSide = broadcastSideByHints(joinType, left, right) - joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil - - case j...@logical.join(left, right, joinType, condition) - if canBroadcastBySizes(joinType, left, right) => - val buildSide = broadcastSideBySizes(joinType, left, right) - joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil - - // Pick CartesianProduct for InnerJoin - case logical.Join(left, right, _: InnerLike, condition) => - joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil - - case logical.Join(left, right, joinType, condition) => - val buildSide = broadcastSide( - left.stats.hints.broadcast, right.stats.hints.broadcast, left, right) - // This join could be very slow or OOM - joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition) :: Nil - - // --- Cases where this strategy does not apply --------------------------------------------- - - case _ => Nil - } -} - -object JoinMemoryManager extends Logging { - - @GuardedBy("this") - private[this] var memoryUsed: Long = 0 - - def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized { - assert(numBytesToAcquire >= 0) - val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed) - if (enoughMemory) { - memoryUsed += numBytesToAcquire - logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used $memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.") - } else { - logInfo("Driver memory is not enough for BHJ.") - } - enoughMemory - } - - private def maxMemoryJoinCanUse: Long = { - val joinMemoryFraction = KylinConfig.getInstanceFromEnv.getJoinMemoryFraction - (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong - } - - def releaseAllMemory(): Unit = synchronized { - memoryUsed = 0 - } - -} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala deleted file mode 100644 index 243ffd6..0000000 --- a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala +++ /dev/null @@ -1,249 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.spark.sql.execution - -import org.apache.kylin.common.KylinConfig -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, JoinSelectionHelper} -import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractSingleColumnNullAwareAntiJoin} -import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.{SparkSession, Strategy} - -import javax.annotation.concurrent.GuardedBy - -/** - * . - */ -case class KylinJoinSelection(session: SparkSession) extends Strategy - with JoinSelectionHelper - with PredicateHelper - with Logging { - - val conf: SQLConf = session.sessionState.conf - - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - - // If it is an equi-join, we first look at the join hints w.r.t. the following order: - // 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides - // have the broadcast hints, choose the smaller side (based on stats) to broadcast. - // 2. sort merge hint: pick sort merge join if join keys are sortable. - // 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both - // sides have the shuffle hash hints, choose the smaller side (based on stats) as the - // build side. - // 4. shuffle replicate NL hint: pick cartesian product if join type is inner like. - // - // If there is no hint or the hints are not applicable, we follow these rules one by one: - // 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type - // is supported. If both sides are small, choose the smaller side (based on stats) - // to broadcast. - // 2. Pick shuffle hash join if one side is small enough to build local hash map, and is - // much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false. - // 3. Pick sort merge join if the join keys are sortable. - // 4. Pick cartesian product if join type is inner like. - // 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have - // other choice. - case j@ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) => - def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = { - getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { - buildSide => - Seq(joins.BroadcastHashJoinExec( - leftKeys, - rightKeys, - joinType, - buildSide, - nonEquiCond, - planLater(left), - planLater(right))) - } - } - - def createShuffleHashJoin(onlyLookingAtHint: Boolean) = { - getShuffleHashJoinBuildSide(left, right, joinType, hint, onlyLookingAtHint, conf).map { - buildSide => - Seq(joins.ShuffledHashJoinExec( - leftKeys, - rightKeys, - joinType, - buildSide, - nonEquiCond, - planLater(left), - planLater(right))) - } - } - - def createSortMergeJoin() = { - if (RowOrdering.isOrderable(leftKeys)) { - Some(Seq(joins.SortMergeJoinExec( - leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right)))) - } else { - None - } - } - - def createCartesianProduct() = { - if (joinType.isInstanceOf[InnerLike]) { - // `CartesianProductExec` can't implicitly evaluate equal join condition, here we should - // pass the original condition which includes both equal and non-equal conditions. - Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition))) - } else { - None - } - } - - def createJoinWithoutHint() = { - createBroadcastHashJoin(false) - .orElse { - if (!conf.preferSortMergeJoin) { - createShuffleHashJoin(false) - } else { - None - } - } - .orElse(createSortMergeJoin()) - .orElse(createCartesianProduct()) - .getOrElse { - // This join could be very slow or OOM - val buildSide = getSmallerSide(left, right) - Seq(joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, nonEquiCond)) - } - } - - createBroadcastHashJoin(true) - .orElse { - if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None - } - .orElse(createShuffleHashJoin(true)) - .orElse { - if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None - } - .getOrElse(createJoinWithoutHint()) - - case j@ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) => - Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, BuildRight, - None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = true)) - - // If it is not an equi-join, we first look at the join hints w.r.t. the following order: - // 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast - // hints, choose the smaller side (based on stats) to broadcast for inner and full joins, - // choose the left side for right join, and choose right side for left join. - // 2. shuffle replicate NL hint: pick cartesian product if join type is inner like. - // - // If there is no hint or the hints are not applicable, we follow these rules one by one: - // 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left - // side is broadcast-able and it's left join, or only right side is broadcast-able and - // it's right join, we skip this rule. If both sides are small, broadcasts the smaller - // side for inner and full joins, broadcasts the left side for right join, and broadcasts - // right side for left join. - // 2. Pick cartesian product if join type is inner like. - // 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have - // other choice. It broadcasts the smaller side for inner and full joins, broadcasts the - // left side for right join, and broadcasts right side for left join. - case logical.Join(left, right, joinType, condition, hint) => - val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) { - getSmallerSide(left, right) - } else { - // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if - // it's a right join, and broadcast right side if it's a left join. - // TODO: revisit it. If left side is much smaller than the right side, it may be better - // to broadcast the left side even if it's a left join. - if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight - } - - def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = { - val maybeBuildSide = if (buildLeft && buildRight) { - Some(desiredBuildSide) - } else if (buildLeft) { - Some(BuildLeft) - } else if (buildRight) { - Some(BuildRight) - } else { - None - } - - maybeBuildSide.map { buildSide => - Seq(joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), buildSide, joinType, condition)) - } - } - - def createCartesianProduct() = { - if (joinType.isInstanceOf[InnerLike]) { - Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition))) - } else { - None - } - } - - def createJoinWithoutHint() = { - createBroadcastNLJoin(canBroadcastBySize(left, conf), canBroadcastBySize(right, conf)) - .orElse(createCartesianProduct()) - .getOrElse { - // This join could be very slow or OOM - Seq(joins.BroadcastNestedLoopJoinExec( - planLater(left), planLater(right), desiredBuildSide, joinType, condition)) - } - } - - createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint)) - .orElse { - if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None - } - .getOrElse(createJoinWithoutHint()) - - // --- Cases where this strategy does not apply --------------------------------------------- - case _ => Nil - } - - override def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = { - val size = plan.stats.sizeInBytes - size >= 0 && size <= conf.autoBroadcastJoinThreshold && JoinMemoryManager.acquireMemory(size.toLong) - } -} - -object JoinMemoryManager extends Logging { - - @GuardedBy("this") - private[this] var memoryUsed: Long = 0 - - def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized { - assert(numBytesToAcquire >= 0) - val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed) - if (enoughMemory) { - memoryUsed += numBytesToAcquire - logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used $memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.") - } else { - logInfo("Driver memory is not enough for BHJ.") - } - enoughMemory - } - - private def maxMemoryJoinCanUse: Long = { - val joinMemoryFraction = KylinConfig.getInstanceFromEnv.getJoinMemoryFraction - (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong - } - - def releaseAllMemory(): Unit = synchronized { - memoryUsed = 0 - } - -} \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 32f6219..0d4352b 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -55,10 +55,7 @@ import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metadata.MetadataConstants; import org.apache.spark.SparkConf; -import org.apache.spark.sql.execution.KylinJoinSelection; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.SparkSessionExtensions; -import org.apache.spark.sql.execution.SparkStrategy; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; import org.apache.spark.util.Utils; import org.apache.spark.utils.ResourceUtils; @@ -67,8 +64,6 @@ import org.apache.spark.utils.YarnInfoFetcherUtils; import org.apache.kylin.engine.spark.common.util.TimeZoneUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; public abstract class SparkApplication { private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class); @@ -287,18 +282,10 @@ public abstract class SparkApplication { } } - ss = SparkSession.builder().withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() { - @Override - public BoxedUnit apply(SparkSessionExtensions v1) { - v1.injectPlannerStrategy(new AbstractFunction1<SparkSession, SparkStrategy>() { - @Override - public SparkStrategy apply(SparkSession session) { - return new KylinJoinSelection(session); - } - }); - return BoxedUnit.UNIT; - } - }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") + ss = SparkSession.builder() + .enableHiveSupport() + .config(sparkConf) + .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") .getOrCreate(); if (isJobOnCluster(sparkConf)) {