This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new d34f085  KYLIN-5057 CubeBuildJob in Kylin4.0 run failed when open 
Spark3.1 AQE
d34f085 is described below

commit d34f08594db97d0f175144f0190e782e11f2b9b5
Author: Zhichao Zhang <zhan...@apache.org>
AuthorDate: Wed Aug 11 22:28:28 2021 +0800

    KYLIN-5057 CubeBuildJob in Kylin4.0 run failed when open Spark3.1 AQE
    
    Root cause:
    With KylinJoinSelection strategy, if AQE is true, this strategy will be 
applied before LogicalQueryStageStrategy,
    and then it will be applied JoinSelection strategy again, leading to change 
the 'Build Type' of BroadcastHashJoin in some cases.
---
 .../spark/sql/execution/KylinJoinSelection.scala   | 282 ---------------------
 .../spark/sql/execution/KylinJoinSelection.scala   | 249 ------------------
 .../engine/spark/application/SparkApplication.java |  21 +-
 3 files changed, 4 insertions(+), 548 deletions(-)

diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
deleted file mode 100644
index 200dd24..0000000
--- 
a/kylin-spark-project/kylin-spark-common/src/main/spark24/org/apache/spark/sql/execution/KylinJoinSelection.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.spark.sql.execution
-
-import javax.annotation.concurrent.GuardedBy
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
-import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.{SparkSession, Strategy}
-
-/**
- * Select the proper physical plan for join based on joining keys and size of 
logical plan.
- *
- * At first, uses the [[ExtractEquiJoinKeys]] pattern to find joins where at 
least some of the
- * predicates can be evaluated by matching join keys. If found, join 
implementations are chosen
- * with the following precedence:
- *
- * - Broadcast hash join (BHJ):
- * BHJ is not supported for full outer join. For right outer join, we only can 
broadcast the
- * left side. For left outer, left semi, left anti and the internal join type 
ExistenceJoin,
- * we only can broadcast the right side. For inner like join, we can broadcast 
both sides.
- * Normally, BHJ can perform faster than the other join algorithms when the 
broadcast side is
- *     small. However, broadcasting tables is a network-intensive operation. 
It could cause OOM
- * or perform worse than the other join algorithms, especially when the 
build/broadcast side
- * is big.
- *
- * For the supported cases, users can specify the broadcast hint (e.g. the 
user applied the
- * [[org.apache.spark.sql.functions.broadcast()]] function to a DataFrame) and 
session-based
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to adjust whether BHJ is 
used and
- * which join side is broadcast.
- *
- * 1) Broadcast the join side with the broadcast hint, even if the size is 
larger than
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint (only 
when the type
- * is inner like join), the side with a smaller estimated physical size will 
be broadcast.
- * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and 
broadcast the side
- * whose estimated physical size is smaller than the threshold. If both sides 
are below the
- * threshold, broadcast the smaller side. If neither is smaller, BHJ is not 
used.
- *
- * - Shuffle hash join: if the average size of a single partition is small 
enough to build a hash
- * table.
- *
- * - Sort merge: if the matching join keys are sortable.
- *
- * If there is no joining keys, Join implementations are chosen with the 
following precedence:
- * - BroadcastNestedLoopJoin (BNLJ):
- * BNLJ supports all the join types but the impl is OPTIMIZED for the 
following scenarios:
- * For right outer join, the left side is broadcast. For left outer, left 
semi, left anti
- * and the internal join type ExistenceJoin, the right side is broadcast. For 
inner like
- * joins, either side is broadcast.
- *
- * Like BHJ, users still can specify the broadcast hint and session-based
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold to impact which side is 
broadcast.
- *
- * 1) Broadcast the join side with the broadcast hint, even if the size is 
larger than
- * [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]]. If both sides have the hint 
(i.e., just for
- * inner-like join), the side with a smaller estimated physical size will be 
broadcast.
- * 2) Respect the [[SQLConf.AUTO_BROADCASTJOIN_THRESHOLD]] threshold and 
broadcast the side
- * whose estimated physical size is smaller than the threshold. If both sides 
are below the
- * threshold, broadcast the smaller side. If neither is smaller, BNLJ is not 
used.
- *
- * - CartesianProduct: for inner like join, CartesianProduct is the fallback 
option.
- *
- * - BroadcastNestedLoopJoin (BNLJ):
- * For the other join types, BNLJ is the fallback option. Here, we just pick 
the broadcast
- * side with the broadcast hint. If neither side has a hint, we broadcast the 
side with
- * the smaller estimated physical size.
- */
-case class KylinJoinSelection(session: SparkSession) extends Strategy with 
PredicateHelper with Logging {
-
-  val conf: SQLConf = session.sessionState.conf
-
-  /**
-   * Matches a plan whose output should be small enough to be used in 
broadcast join.
-   */
-  private def canBroadcast(plan: LogicalPlan): Boolean = {
-    val sizeInBytes = plan.stats.sizeInBytes
-    sizeInBytes >= 0 && sizeInBytes <= conf.autoBroadcastJoinThreshold && 
JoinMemoryManager.acquireMemory(sizeInBytes.toLong)
-  }
-
-  /**
-   * Matches a plan whose single partition should be small enough to build a 
hash table.
-   *
-   * Note: this assume that the number of partition is fixed, requires 
additional work if it's
-   * dynamic.
-   */
-  private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
-    plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * 
conf.numShufflePartitions
-  }
-
-  /**
-   * Returns whether plan a is much smaller (3X) than plan b.
-   *
-   * The cost to build hash map is higher than sorting, we should only build 
hash map on a table
-   * that is much smaller than other one. Since we does not have the statistic 
for number of rows,
-   * use the size of bytes here as estimation.
-   */
-  private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
-    a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
-  }
-
-  private def canBuildRight(joinType: JoinType): Boolean = joinType match {
-    case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => 
true
-    case _ => false
-  }
-
-  private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
-    case _: InnerLike | RightOuter => true
-    case _ => false
-  }
-
-  private def broadcastSide(
-                             canBuildLeft: Boolean,
-                             canBuildRight: Boolean,
-                             left: LogicalPlan,
-                             right: LogicalPlan): BuildSide = {
-
-    def smallerSide =
-      if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else 
BuildLeft
-
-    if (canBuildRight && canBuildLeft) {
-      // Broadcast smaller side base on its estimated physical size
-      // if both sides have broadcast hint
-      smallerSide
-    } else if (canBuildRight) {
-      BuildRight
-    } else if (canBuildLeft) {
-      BuildLeft
-    } else {
-      // for the last default broadcast nested loop join
-      smallerSide
-    }
-  }
-
-  private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
-  : Boolean = {
-    val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
-    val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
-    buildLeft || buildRight
-  }
-
-  private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
-  : BuildSide = {
-    val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
-    val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
-    broadcastSide(buildLeft, buildRight, left, right)
-  }
-
-  private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
-  : Boolean = {
-    val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
-    val buildRight = canBuildRight(joinType) && canBroadcast(right)
-    buildLeft || buildRight
-  }
-
-  private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, 
right: LogicalPlan)
-  : BuildSide = {
-    val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
-    val buildRight = canBuildRight(joinType) && canBroadcast(right)
-    broadcastSide(buildLeft, buildRight, left, right)
-  }
-
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-
-    // --- BroadcastHashJoin 
--------------------------------------------------------------------
-
-    // broadcast hints were specified
-    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
-      if canBroadcastByHints(joinType, left, right) =>
-      val buildSide = broadcastSideByHints(joinType, left, right)
-      Seq(joins.BroadcastHashJoinExec(
-        leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), 
planLater(right)))
-
-    // broadcast hints were not specified, so need to infer it from size and 
configuration.
-    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
-      if canBroadcastBySizes(joinType, left, right) =>
-      val buildSide = broadcastSideBySizes(joinType, left, right)
-      Seq(joins.BroadcastHashJoinExec(
-        leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), 
planLater(right)))
-
-    // --- ShuffledHashJoin 
---------------------------------------------------------------------
-
-    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
-      if !conf.preferSortMergeJoin && canBuildRight(joinType) && 
canBuildLocalHashMap(right)
-        && muchSmaller(right, left) ||
-        !RowOrdering.isOrderable(leftKeys) =>
-      Seq(joins.ShuffledHashJoinExec(
-        leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), 
planLater(right)))
-
-    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
-      if !conf.preferSortMergeJoin && canBuildLeft(joinType) && 
canBuildLocalHashMap(left)
-        && muchSmaller(left, right) ||
-        !RowOrdering.isOrderable(leftKeys) =>
-      Seq(joins.ShuffledHashJoinExec(
-        leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), 
planLater(right)))
-
-    // --- SortMergeJoin 
------------------------------------------------------------
-
-    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
-      if RowOrdering.isOrderable(leftKeys) =>
-      joins.SortMergeJoinExec(
-        leftKeys, rightKeys, joinType, condition, planLater(left), 
planLater(right)) :: Nil
-
-    // --- Without joining keys 
------------------------------------------------------------
-
-    // Pick BroadcastNestedLoopJoin if one side could be broadcast
-    case j...@logical.join(left, right, joinType, condition)
-      if canBroadcastByHints(joinType, left, right) =>
-      val buildSide = broadcastSideByHints(joinType, left, right)
-      joins.BroadcastNestedLoopJoinExec(
-        planLater(left), planLater(right), buildSide, joinType, condition) :: 
Nil
-
-    case j...@logical.join(left, right, joinType, condition)
-      if canBroadcastBySizes(joinType, left, right) =>
-      val buildSide = broadcastSideBySizes(joinType, left, right)
-      joins.BroadcastNestedLoopJoinExec(
-        planLater(left), planLater(right), buildSide, joinType, condition) :: 
Nil
-
-    // Pick CartesianProduct for InnerJoin
-    case logical.Join(left, right, _: InnerLike, condition) =>
-      joins.CartesianProductExec(planLater(left), planLater(right), condition) 
:: Nil
-
-    case logical.Join(left, right, joinType, condition) =>
-      val buildSide = broadcastSide(
-        left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
-      // This join could be very slow or OOM
-      joins.BroadcastNestedLoopJoinExec(
-        planLater(left), planLater(right), buildSide, joinType, condition) :: 
Nil
-
-    // --- Cases where this strategy does not apply 
---------------------------------------------
-
-    case _ => Nil
-  }
-}
-
-object JoinMemoryManager extends Logging {
-
-  @GuardedBy("this")
-  private[this] var memoryUsed: Long = 0
-
-  def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized {
-    assert(numBytesToAcquire >= 0)
-    val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed)
-    if (enoughMemory) {
-      memoryUsed += numBytesToAcquire
-      logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used 
$memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.")
-    } else {
-      logInfo("Driver memory is not enough for BHJ.")
-    }
-    enoughMemory
-  }
-
-  private def maxMemoryJoinCanUse: Long = {
-    val joinMemoryFraction = 
KylinConfig.getInstanceFromEnv.getJoinMemoryFraction
-    (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong
-  }
-
-  def releaseAllMemory(): Unit = synchronized {
-    memoryUsed = 0
-  }
-
-}
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
deleted file mode 100644
index 243ffd6..0000000
--- 
a/kylin-spark-project/kylin-spark-common/src/main/spark31/org/apache/spark/sql/execution/KylinJoinSelection.scala
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.spark.sql.execution
-
-import org.apache.kylin.common.KylinConfig
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, RowOrdering}
-import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
JoinSelectionHelper}
-import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractSingleColumnNullAwareAntiJoin}
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.{SparkSession, Strategy}
-
-import javax.annotation.concurrent.GuardedBy
-
-/**
- * .
- */
-case class KylinJoinSelection(session: SparkSession) extends Strategy
-  with JoinSelectionHelper
-  with PredicateHelper
-  with Logging {
-
-  val conf: SQLConf = session.sessionState.conf
-
-  def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-
-    // If it is an equi-join, we first look at the join hints w.r.t. the 
following order:
-    //   1. broadcast hint: pick broadcast hash join if the join type is 
supported. If both sides
-    //      have the broadcast hints, choose the smaller side (based on stats) 
to broadcast.
-    //   2. sort merge hint: pick sort merge join if join keys are sortable.
-    //   3. shuffle hash hint: We pick shuffle hash join if the join type is 
supported. If both
-    //      sides have the shuffle hash hints, choose the smaller side (based 
on stats) as the
-    //      build side.
-    //   4. shuffle replicate NL hint: pick cartesian product if join type is 
inner like.
-    //
-    // If there is no hint or the hints are not applicable, we follow these 
rules one by one:
-    //   1. Pick broadcast hash join if one side is small enough to broadcast, 
and the join type
-    //      is supported. If both sides are small, choose the smaller side 
(based on stats)
-    //      to broadcast.
-    //   2. Pick shuffle hash join if one side is small enough to build local 
hash map, and is
-    //      much smaller than the other side, and 
`spark.sql.join.preferSortMergeJoin` is false.
-    //   3. Pick sort merge join if the join keys are sortable.
-    //   4. Pick cartesian product if join type is inner like.
-    //   5. Pick broadcast nested loop join as the final solution. It may OOM 
but we don't have
-    //      other choice.
-    case j@ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, 
left, right, hint) =>
-      def createBroadcastHashJoin(onlyLookingAtHint: Boolean) = {
-        getBroadcastBuildSide(left, right, joinType, hint, onlyLookingAtHint, 
conf).map {
-          buildSide =>
-            Seq(joins.BroadcastHashJoinExec(
-              leftKeys,
-              rightKeys,
-              joinType,
-              buildSide,
-              nonEquiCond,
-              planLater(left),
-              planLater(right)))
-        }
-      }
-
-      def createShuffleHashJoin(onlyLookingAtHint: Boolean) = {
-        getShuffleHashJoinBuildSide(left, right, joinType, hint, 
onlyLookingAtHint, conf).map {
-          buildSide =>
-            Seq(joins.ShuffledHashJoinExec(
-              leftKeys,
-              rightKeys,
-              joinType,
-              buildSide,
-              nonEquiCond,
-              planLater(left),
-              planLater(right)))
-        }
-      }
-
-      def createSortMergeJoin() = {
-        if (RowOrdering.isOrderable(leftKeys)) {
-          Some(Seq(joins.SortMergeJoinExec(
-            leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), 
planLater(right))))
-        } else {
-          None
-        }
-      }
-
-      def createCartesianProduct() = {
-        if (joinType.isInstanceOf[InnerLike]) {
-          // `CartesianProductExec` can't implicitly evaluate equal join 
condition, here we should
-          // pass the original condition which includes both equal and 
non-equal conditions.
-          Some(Seq(joins.CartesianProductExec(planLater(left), 
planLater(right), j.condition)))
-        } else {
-          None
-        }
-      }
-
-      def createJoinWithoutHint() = {
-        createBroadcastHashJoin(false)
-          .orElse {
-            if (!conf.preferSortMergeJoin) {
-              createShuffleHashJoin(false)
-            } else {
-              None
-            }
-          }
-          .orElse(createSortMergeJoin())
-          .orElse(createCartesianProduct())
-          .getOrElse {
-            // This join could be very slow or OOM
-            val buildSide = getSmallerSide(left, right)
-            Seq(joins.BroadcastNestedLoopJoinExec(
-              planLater(left), planLater(right), buildSide, joinType, 
nonEquiCond))
-          }
-      }
-
-      createBroadcastHashJoin(true)
-        .orElse {
-          if (hintToSortMergeJoin(hint)) createSortMergeJoin() else None
-        }
-        .orElse(createShuffleHashJoin(true))
-        .orElse {
-          if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else 
None
-        }
-        .getOrElse(createJoinWithoutHint())
-
-    case j@ExtractSingleColumnNullAwareAntiJoin(leftKeys, rightKeys) =>
-      Seq(joins.BroadcastHashJoinExec(leftKeys, rightKeys, LeftAnti, 
BuildRight,
-        None, planLater(j.left), planLater(j.right), isNullAwareAntiJoin = 
true))
-
-    // If it is not an equi-join, we first look at the join hints w.r.t. the 
following order:
-    //   1. broadcast hint: pick broadcast nested loop join. If both sides 
have the broadcast
-    //      hints, choose the smaller side (based on stats) to broadcast for 
inner and full joins,
-    //      choose the left side for right join, and choose right side for 
left join.
-    //   2. shuffle replicate NL hint: pick cartesian product if join type is 
inner like.
-    //
-    // If there is no hint or the hints are not applicable, we follow these 
rules one by one:
-    //   1. Pick broadcast nested loop join if one side is small enough to 
broadcast. If only left
-    //      side is broadcast-able and it's left join, or only right side is 
broadcast-able and
-    //      it's right join, we skip this rule. If both sides are small, 
broadcasts the smaller
-    //      side for inner and full joins, broadcasts the left side for right 
join, and broadcasts
-    //      right side for left join.
-    //   2. Pick cartesian product if join type is inner like.
-    //   3. Pick broadcast nested loop join as the final solution. It may OOM 
but we don't have
-    //      other choice. It broadcasts the smaller side for inner and full 
joins, broadcasts the
-    //      left side for right join, and broadcasts right side for left join.
-    case logical.Join(left, right, joinType, condition, hint) =>
-      val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType 
== FullOuter) {
-        getSmallerSide(left, right)
-      } else {
-        // For perf reasons, `BroadcastNestedLoopJoinExec` prefers to 
broadcast left side if
-        // it's a right join, and broadcast right side if it's a left join.
-        // TODO: revisit it. If left side is much smaller than the right side, 
it may be better
-        // to broadcast the left side even if it's a left join.
-        if (canBuildBroadcastLeft(joinType)) BuildLeft else BuildRight
-      }
-
-      def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
-        val maybeBuildSide = if (buildLeft && buildRight) {
-          Some(desiredBuildSide)
-        } else if (buildLeft) {
-          Some(BuildLeft)
-        } else if (buildRight) {
-          Some(BuildRight)
-        } else {
-          None
-        }
-
-        maybeBuildSide.map { buildSide =>
-          Seq(joins.BroadcastNestedLoopJoinExec(
-            planLater(left), planLater(right), buildSide, joinType, condition))
-        }
-      }
-
-      def createCartesianProduct() = {
-        if (joinType.isInstanceOf[InnerLike]) {
-          Some(Seq(joins.CartesianProductExec(planLater(left), 
planLater(right), condition)))
-        } else {
-          None
-        }
-      }
-
-      def createJoinWithoutHint() = {
-        createBroadcastNLJoin(canBroadcastBySize(left, conf), 
canBroadcastBySize(right, conf))
-          .orElse(createCartesianProduct())
-          .getOrElse {
-            // This join could be very slow or OOM
-            Seq(joins.BroadcastNestedLoopJoinExec(
-              planLater(left), planLater(right), desiredBuildSide, joinType, 
condition))
-          }
-      }
-
-      createBroadcastNLJoin(hintToBroadcastLeft(hint), 
hintToBroadcastRight(hint))
-        .orElse {
-          if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else 
None
-        }
-        .getOrElse(createJoinWithoutHint())
-
-    // --- Cases where this strategy does not apply 
---------------------------------------------
-    case _ => Nil
-  }
-
-  override def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = 
{
-    val size = plan.stats.sizeInBytes
-    size >= 0 && size <= conf.autoBroadcastJoinThreshold && 
JoinMemoryManager.acquireMemory(size.toLong)
-  }
-}
-
-object JoinMemoryManager extends Logging {
-
-  @GuardedBy("this")
-  private[this] var memoryUsed: Long = 0
-
-  def acquireMemory(numBytesToAcquire: Long): Boolean = synchronized {
-    assert(numBytesToAcquire >= 0)
-    val enoughMemory = numBytesToAcquire <= (maxMemoryJoinCanUse - memoryUsed)
-    if (enoughMemory) {
-      memoryUsed += numBytesToAcquire
-      logInfo(s"Acquire $numBytesToAcquire bytes for BHJ, memory used 
$memoryUsed, max memory BHJ can use $maxMemoryJoinCanUse.")
-    } else {
-      logInfo("Driver memory is not enough for BHJ.")
-    }
-    enoughMemory
-  }
-
-  private def maxMemoryJoinCanUse: Long = {
-    val joinMemoryFraction = 
KylinConfig.getInstanceFromEnv.getJoinMemoryFraction
-    (Runtime.getRuntime.maxMemory() * joinMemoryFraction).toLong
-  }
-
-  def releaseAllMemory(): Unit = synchronized {
-    memoryUsed = 0
-  }
-
-}
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 32f6219..0d4352b 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -55,10 +55,7 @@ import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.spark.SparkConf;
-import org.apache.spark.sql.execution.KylinJoinSelection;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.SparkSessionExtensions;
-import org.apache.spark.sql.execution.SparkStrategy;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.apache.spark.util.Utils;
 import org.apache.spark.utils.ResourceUtils;
@@ -67,8 +64,6 @@ import org.apache.spark.utils.YarnInfoFetcherUtils;
 import org.apache.kylin.engine.spark.common.util.TimeZoneUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
 
 public abstract class SparkApplication {
     private static final Logger logger = 
LoggerFactory.getLogger(SparkApplication.class);
@@ -287,18 +282,10 @@ public abstract class SparkApplication {
                 }
             }
 
-            ss = SparkSession.builder().withExtensions(new 
AbstractFunction1<SparkSessionExtensions, BoxedUnit>() {
-                @Override
-                public BoxedUnit apply(SparkSessionExtensions v1) {
-                    v1.injectPlannerStrategy(new 
AbstractFunction1<SparkSession, SparkStrategy>() {
-                        @Override
-                        public SparkStrategy apply(SparkSession session) {
-                            return new KylinJoinSelection(session);
-                        }
-                    });
-                    return BoxedUnit.UNIT;
-                }
-            
}).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs",
 "false")
+            ss = SparkSession.builder()
+                    .enableHiveSupport()
+                    .config(sparkConf)
+                    
.config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
                     .getOrCreate();
 
             if (isJobOnCluster(sparkConf)) {

Reply via email to