Copilot commented on code in PR #7368: URL: https://github.com/apache/kyuubi/pull/7368#discussion_r2998691843
########## docs/deployment/settings.md: ########## @@ -0,0 +1,34 @@ +<!-- +- Licensed to the Apache Software Foundation (ASF) under one or more +- contributor license agreements. See the NOTICE file distributed with +- this work for additional information regarding copyright ownership. +- The ASF licenses this file to You under the Apache License, Version 2.0 +- (the "License"); you may not use this file except in compliance with +- the License. You may obtain a copy of the License at +- +- http://www.apache.org/licenses/LICENSE-2.0 +- +- Unless required by applicable law or agreed to in writing, software +- distributed under the License is distributed on an "AS IS" BASIS, +- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- See the License for the specific language governing permissions and +- limitations under the License. +--> + +# Deployment Settings for Dangerous Join Watchdog + +## Spark SQL Extensions + +```properties +spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension +``` + +## Dangerous Join Configurations + +| Name | Default | Description | +|------------------------------------------------|---------|-----------------------------------------------------------| +| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join watchdog | Review Comment: `docs/deployment/settings.md` lists `kyuubi.watchdog.dangerousJoin.enabled` default as `true`, but this PR sets it to `false` in several Spark extension modules. Please update either the code default or this documentation so operators see the correct default behavior. ```suggestion | `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join watchdog | ``` ########## extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } Review Comment: The early return when `autoBroadcastJoinThreshold <= 0` disables all detection, including Cartesian/BNLJ risks that are not dependent on broadcast thresholds. Detection should still run for non-broadcast-related rules even when broadcast is disabled. ########## extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, KyuubiDangerousJoinException} + +class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest { + override protected def beforeAll(): Unit = { + super.beforeAll() + setupData() + } + + test("equi join oversized broadcast fallback should be counted") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + } + } + + test("non equi join cartesian should include Cartesian marker") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian"))) + } Review Comment: The assertions only check `toJson.contains(...)`, which can pass even if the JSON payload is malformed. Consider parsing/validating the JSON so the test actually verifies the structured diagnostics format. ########## docs/watchdog/dangerous-join.md: ########## @@ -0,0 +1,105 @@ +<!-- +- Licensed to the Apache Software Foundation (ASF) under one or more +- contributor license agreements. See the NOTICE file distributed with +- this work for additional information regarding copyright ownership. +- The ASF licenses this file to You under the Apache License, Version 2.0 +- (the "License"); you may not use this file except in compliance with +- the License. You may obtain a copy of the License at +- +- http://www.apache.org/licenses/LICENSE-2.0 +- +- Unless required by applicable law or agreed to in writing, software +- distributed under the License is distributed on an "AS IS" BASIS, +- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- See the License for the specific language governing permissions and +- limitations under the License. +--> + +# Dangerous Join Watchdog + +Kyuubi Dangerous Join Watchdog detects risky join planning patterns before query execution. +It helps reduce accidental Cartesian products, oversized broadcast attempts, and long-running nested loop joins. + +## Background + +In shared SQL gateway environments, a single risky join can consume excessive driver memory or create very slow jobs. +The Dangerous Join Watchdog adds planning-time checks for these high-risk patterns. + +## Risk Rules + +### Equi-Join + +- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian pattern. +- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds the configured broadcast ratio threshold. + +### Non-Equi Join + +- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast threshold and effectively become Cartesian risk. +- Rule 2: Non-equi join is marked dangerous when build side is not selectable and the plan falls back to a second BNLJ pattern. + +## Configurations + +| Name | Default | Meaning | +|------------------------------------------------|---------|---------------------------------------------------------------------------| +| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable or disable dangerous join detection | +| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision | +| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission | Review Comment: The configuration table lists `kyuubi.watchdog.dangerousJoin.enabled` default as `true`, but in this PR most Spark extension modules set the default to `false` (e.g., spark-3-3/3-4/3-5/4-0). Please reconcile the documented defaults with the actual defaults (and keep them consistent across Spark versions). ########## docs/watchdog/dangerous-join.md: ########## @@ -0,0 +1,105 @@ +<!-- +- Licensed to the Apache Software Foundation (ASF) under one or more +- contributor license agreements. See the NOTICE file distributed with +- this work for additional information regarding copyright ownership. +- The ASF licenses this file to You under the Apache License, Version 2.0 +- (the "License"); you may not use this file except in compliance with +- the License. You may obtain a copy of the License at +- +- http://www.apache.org/licenses/LICENSE-2.0 +- +- Unless required by applicable law or agreed to in writing, software +- distributed under the License is distributed on an "AS IS" BASIS, +- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- See the License for the specific language governing permissions and +- limitations under the License. +--> + +# Dangerous Join Watchdog + +Kyuubi Dangerous Join Watchdog detects risky join planning patterns before query execution. +It helps reduce accidental Cartesian products, oversized broadcast attempts, and long-running nested loop joins. + +## Background + +In shared SQL gateway environments, a single risky join can consume excessive driver memory or create very slow jobs. +The Dangerous Join Watchdog adds planning-time checks for these high-risk patterns. + +## Risk Rules + +### Equi-Join + +- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian pattern. +- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds the configured broadcast ratio threshold. + +### Non-Equi Join + +- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast threshold and effectively become Cartesian risk. +- Rule 2: Non-equi join is marked dangerous when build side is not selectable and the plan falls back to a second BNLJ pattern. + +## Configurations + +| Name | Default | Meaning | +|------------------------------------------------|---------|---------------------------------------------------------------------------| +| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable or disable dangerous join detection | +| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision | +| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission | + +## Usage + +1. Put Kyuubi Spark extension jar into Spark classpath. +2. Configure SQL extensions: + +```properties +spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension +``` + +3. Configure action: + +```properties +kyuubi.watchdog.dangerousJoin.action=WARN +``` + +or + +```properties +kyuubi.watchdog.dangerousJoin.action=REJECT +``` + +## Sample WARN Log + +When action is `WARN`, Kyuubi writes a structured JSON payload: + +```text +KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8} Review Comment: The emitted `sql` field in the JSON payload is built from `plan.toString()` in the interceptor (logical plan tree string), not the original SQL text. The sample log currently shows `"sql":"SELECT ..."`, which is misleading; please either adjust the payload to include real SQL text (if available) or update the docs/sample to reflect what is actually logged. ```suggestion When action is `WARN`, Kyuubi writes a structured JSON payload (the `sql` field contains the logical plan string from `plan.toString()`): ```text KYUUBI_LOG_KEY={"sql":"Project [*]\n+- LogicalRDD [id#0, value#1], false","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8} ``` ########## extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { + Some("Cartesian") + } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { + Some("SecondBNLJ") + } else { Review Comment: `cannotSelectBuildSide` is identical to the preceding Cartesian check (`leftSize > threshold && rightSize > threshold`), making the `SecondBNLJ` branch unreachable. This prevents the interceptor from ever reporting `SecondBNLJ` as intended; please adjust the logic so these conditions are distinct (or remove the dead branch). ########## extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { + Some("Cartesian") + } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { + Some("SecondBNLJ") + } else { Review Comment: `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so `SecondBNLJ` can be detected distinctly or remove the dead code path. ########## extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import scala.collection.mutable.ArrayBuffer + +object DangerousJoinCounter { + case class Entry( + sqlText: String, + joinType: String, + reason: String, + leftSize: BigInt, + rightSize: BigInt, + broadcastThreshold: Long, + broadcastRatio: Double) { + def toJson: String = { + val pairs = Seq( + "sql" -> escape(sqlText), + "joinType" -> escape(joinType), + "reason" -> escape(reason), + "leftSize" -> leftSize.toString, + "rightSize" -> rightSize.toString, + "broadcastThreshold" -> broadcastThreshold.toString, + "broadcastRatio" -> broadcastRatio.toString) + pairs.map { case (k, v) => + if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") { + s""""$k":$v""" + } else { + s""""$k":"$v"""" + } + }.mkString("{", ",", "}") + } + } + + private val entries = ArrayBuffer.empty[Entry] + + def add(entry: Entry): Unit = synchronized { Review Comment: `DangerousJoinCounter` stores all entries in a global mutable buffer without bounds/eviction. In a long-running driver this can grow indefinitely and cause memory pressure. Consider bounding the buffer or removing storage in production and relying on metrics/logging instead. ```suggestion private val MaxEntries = 1000 private val entries = ArrayBuffer.empty[Entry] def add(entry: Entry): Unit = synchronized { if (entries.size >= MaxEntries) { entries.remove(0) } ``` ########## extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { Review Comment: The early return when `autoBroadcastJoinThreshold <= 0` disables all detection, including Cartesian/BNLJ risks that are not dependent on broadcast thresholds. Detection should still run for non-broadcast-related rules even when broadcast is disabled. ```suggestion val leftSize = join.left.stats.sizeInBytes val rightSize = join.right.stats.sizeInBytes val hasEquiJoin = isEquiJoin(join) if (hasEquiJoin) { if (isCartesianCondition(join.condition)) { Some("Cartesian") } else if (threshold > 0 && minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { Some("OversizedBroadcastFallback") } else { None } } else { if (threshold <= 0) { None } else if (leftSize > threshold && rightSize > threshold) { ``` ########## extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import scala.collection.mutable.ArrayBuffer + +object DangerousJoinCounter { + case class Entry( + sqlText: String, + joinType: String, + reason: String, + leftSize: BigInt, + rightSize: BigInt, + broadcastThreshold: Long, + broadcastRatio: Double) { + def toJson: String = { + val pairs = Seq( + "sql" -> escape(sqlText), + "joinType" -> escape(joinType), + "reason" -> escape(reason), + "leftSize" -> leftSize.toString, + "rightSize" -> rightSize.toString, + "broadcastThreshold" -> broadcastThreshold.toString, + "broadcastRatio" -> broadcastRatio.toString) + pairs.map { case (k, v) => + if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") { + s""""$k":$v""" + } else { + s""""$k":"$v"""" + } + }.mkString("{", ",", "}") + } + } + + private val entries = ArrayBuffer.empty[Entry] + + def add(entry: Entry): Unit = synchronized { + entries += entry + } Review Comment: `DangerousJoinCounter` stores every detected entry in a global `ArrayBuffer` with no bounds or eviction. In a long-running Spark driver (Kyuubi gateway), this can grow unbounded and eventually cause memory pressure/OOM. Consider removing storage entirely for production, or keep only a bounded ring buffer / latest entry and expose counters via metrics instead. ########## docs/watchdog/dangerous-join.md: ########## @@ -0,0 +1,105 @@ +<!-- +- Licensed to the Apache Software Foundation (ASF) under one or more +- contributor license agreements. See the NOTICE file distributed with +- this work for additional information regarding copyright ownership. +- The ASF licenses this file to You under the Apache License, Version 2.0 +- (the "License"); you may not use this file except in compliance with +- the License. You may obtain a copy of the License at +- +- http://www.apache.org/licenses/LICENSE-2.0 +- +- Unless required by applicable law or agreed to in writing, software +- distributed under the License is distributed on an "AS IS" BASIS, +- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +- See the License for the specific language governing permissions and +- limitations under the License. +--> + +# Dangerous Join Watchdog + +Kyuubi Dangerous Join Watchdog detects risky join planning patterns before query execution. +It helps reduce accidental Cartesian products, oversized broadcast attempts, and long-running nested loop joins. + +## Background + +In shared SQL gateway environments, a single risky join can consume excessive driver memory or create very slow jobs. +The Dangerous Join Watchdog adds planning-time checks for these high-risk patterns. + +## Risk Rules + +### Equi-Join + +- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian pattern. +- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds the configured broadcast ratio threshold. + +### Non-Equi Join + +- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast threshold and effectively become Cartesian risk. +- Rule 2: Non-equi join is marked dangerous when build side is not selectable and the plan falls back to a second BNLJ pattern. + +## Configurations + +| Name | Default | Meaning | +|------------------------------------------------|---------|---------------------------------------------------------------------------| +| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable or disable dangerous join detection | +| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision | +| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission | + +## Usage + +1. Put Kyuubi Spark extension jar into Spark classpath. +2. Configure SQL extensions: + +```properties +spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension +``` Review Comment: The usage example configures `spark.sql.extensions` with both `org.apache.kyuubi.sql.KyuubiSparkSQLExtension` and `org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension`. In this PR, `KyuubiSparkSQLExtension` already injects `DangerousJoinInterceptor`, so adding `KyuubiDangerousJoinExtension` would inject it twice and can lead to duplicated warnings/counter increments (and potentially double rejection paths). Please update the docs to recommend only one mechanism (either the main extension or the dedicated watchdog extension). ########## extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { + Some("Cartesian") + } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { + Some("SecondBNLJ") + } else { Review Comment: `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so `SecondBNLJ` can be detected distinctly or remove the dead code path. ########## extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import scala.collection.mutable.ArrayBuffer + +object DangerousJoinCounter { + case class Entry( + sqlText: String, + joinType: String, + reason: String, + leftSize: BigInt, + rightSize: BigInt, + broadcastThreshold: Long, + broadcastRatio: Double) { + def toJson: String = { + val pairs = Seq( + "sql" -> escape(sqlText), + "joinType" -> escape(joinType), + "reason" -> escape(reason), + "leftSize" -> leftSize.toString, + "rightSize" -> rightSize.toString, + "broadcastThreshold" -> broadcastThreshold.toString, + "broadcastRatio" -> broadcastRatio.toString) + pairs.map { case (k, v) => + if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") { + s""""$k":$v""" + } else { + s""""$k":"$v"""" + } + }.mkString("{", ",", "}") + } + } + + private val entries = ArrayBuffer.empty[Entry] + + def add(entry: Entry): Unit = synchronized { + entries += entry + } Review Comment: `DangerousJoinCounter` stores all entries in a global mutable buffer without bounds/eviction. In a long-running driver this can grow indefinitely and cause memory pressure. Consider bounding the buffer or removing storage in production and relying on metrics/logging instead. ########## extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, KyuubiDangerousJoinException} + +class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest { + override protected def beforeAll(): Unit = { + super.beforeAll() + setupData() + } + + test("equi join oversized broadcast fallback should be counted") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + } + } + + test("non equi join cartesian should include Cartesian marker") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian"))) + } Review Comment: The assertions only check `toJson.contains(...)`, which can pass even if the JSON payload is malformed. Consider parsing/validating the JSON so the test actually verifies the structured diagnostics format. ########## extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { + Some("Cartesian") + } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { Review Comment: The early return when `autoBroadcastJoinThreshold <= 0` disables all detection, including Cartesian/BNLJ risks that are not dependent on broadcast thresholds. Detection should still run for non-broadcast-related rules even when broadcast is disabled. ```suggestion val leftSize = join.left.stats.sizeInBytes val rightSize = join.right.stats.sizeInBytes val hasEquiJoin = isEquiJoin(join) if (hasEquiJoin) { if (isCartesianCondition(join.condition)) { Some("Cartesian") } else if (threshold > 0 && minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { Some("OversizedBroadcastFallback") } else { None } } else { if (threshold > 0 && leftSize > threshold && rightSize > threshold) { Some("Cartesian") } else if (threshold > 0 && cannotSelectBuildSide(leftSize, rightSize, threshold)) { ``` ########## extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } Review Comment: The early return when `autoBroadcastJoinThreshold <= 0` disables *all* dangerous-join detection, including Cartesian / nested-loop cases that are unrelated to broadcast thresholds. If the goal is to guard against Cartesian and BNLJ, detection should still run when broadcast is disabled; only the broadcast-size based rule should depend on this threshold. ########## docs/configuration/settings.md: ########## @@ -580,6 +580,18 @@ jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.execu Please refer to the Spark official online documentation for [SET Command](https://spark.apache.org/docs/latest/sql-ref-syntax-aux-conf-mgmt-set.html) +### Dangerous Join Watchdog + +You can enable dangerous join detection for Spark SQL extension with: + +| Name | Default | Description | +|------------------------------------------------|---------|------------------------------------------------------------------------------------| +| `kyuubi.watchdog.dangerousJoin.enabled` | `true` | Enable dangerous join detection | Review Comment: This table documents `kyuubi.watchdog.dangerousJoin.enabled` default as `true`, but the Spark extension modules introduced in this PR mostly default it to `false`. Please align the defaults here with the code (and keep consistent across Spark versions) to avoid misleading operators. ```suggestion | `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join detection | ``` ########## docs/extensions/engines/spark/rules.md: ########## @@ -65,31 +65,34 @@ Now, you can enjoy the Kyuubi SQL Extension. Kyuubi provides some configs to make these feature easy to use. -| Name | Default Value | Description | Since | -|---------------------------------------------------------------------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------| -| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 | -| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario join, multi table join). | 1.2.0 | -| spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 | -| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | -| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | -| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable maxPartition Strategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | -| spark.sql.watchdog.maxFileSize | none | Set the maximum size in bytes of files when spark scans a data source. Enable maxFileSize Strategy by specifying this configuration. Add maxFileSize Strategy to avoid scan excessive size of files, it's optional that works with defined | 1.8.0 | -| spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent database/table/view/function/partition | 1.5.0 | -| spark.sql.optimizer.rebalanceBeforeZorder.enabled | false | When true, we do a rebalance before zorder in case data skew. Note that, if the insertion is dynamic partition we will use the partition columns to rebalance. | 1.6.0 | -| spark.sql.optimizer.rebalanceZorderColumns.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance before Z-Order. If it's dynamic partition insert, the rebalance expression will include both partition columns and Z-Order columns. | 1.6.0 | -| spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do two phase rebalance before Z-Order for the dynamic partition write. The first phase rebalance using dynamic partition column; The second phase rebalance using dynamic partition column Z-Order columns. | 1.6.0 | -| spark.sql.optimizer.zorderUsingOriginalOrdering.enabled | false | When true and `spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do sort by the original ordering i.e. lexicographical order. | 1.6.0 | -| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled | false | When ture, infer columns for rebalance and sort orders from original query, e.g. the join keys from join. It can avoid compression ratio regression. | 1.7.0 | -| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns | 3 | The max columns of inferred columns. | 1.7.0 | -| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false | When true, add repartition even if the original plan does not have shuffle. | 1.7.0 | -| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled | true | When true, only enable final stage isolation for writing. | 1.7.0 | -| spark.sql.finalWriteStage.eagerlyKillExecutors.enabled | false | When true, eagerly kill redundant executors before running final write stage. | 1.8.0 | -| spark.sql.finalWriteStage.skipKillingExecutorsForTableCache | true | When true, skip killing executors if the plan has table caches. | 1.8.0 | -| spark.sql.finalWriteStage.retainExecutorsFactor | 1.2 | If the target executors * factor < active executors, and target executors * factor > min executors, then inject kill executors or inject custom resource profile. | 1.8.0 | -| spark.sql.finalWriteStage.resourceIsolation.enabled | false | When true, make final write stage resource isolation using custom RDD resource profile. | 1.8.0 | -| spark.sql.finalWriteStageExecutorCores | fallback spark.executor.cores | Specify the executor core request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | -| spark.sql.finalWriteStageExecutorMemory | fallback spark.executor.memory | Specify the executor on heap memory request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | -| spark.sql.finalWriteStageExecutorMemoryOverhead | fallback spark.executor.memoryOverhead | Specify the executor memory overhead request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | -| spark.sql.finalWriteStageExecutorOffHeapMemory | NONE | Specify the executor off heap memory request for final write stage. It would be passed to the RDD resource profile. | 1.8.0 | -| spark.sql.execution.scriptTransformation.enabled | true | When false, script transformation is not allowed. | 1.9.0 | +| Name | Default Value | Description | Since | +|---------------------------------------------------------------------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------| +| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add repartition node at the top of query plan. An approach of merging small files. | 1.2.0 | +| spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario join, multi table join). | 1.2.0 | +| spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the final stage support use different config with previous stage. The prefix of final stage config key should be `spark.sql.finalStage.`. For example, the raw spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0 | +| spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we will follow target table properties to insert zorder or not. The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by these cols. | 1.4.0 | +| spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a global sort using zorder. Note that, it can cause data skew issue if the zorder columns have less cardinality. When false, we only do local sort using zorder. | 1.4.0 | +| spark.sql.watchdog.maxPartitions | none | Set the max partition number when spark scans a data source. Enable maxPartition Strategy by specifying this configuration. Add maxPartitions Strategy to avoid scan excessive partitions on partitioned table, it's optional that works with defined | 1.4.0 | +| spark.sql.watchdog.maxFileSize | none | Set the maximum size in bytes of files when spark scans a data source. Enable maxFileSize Strategy by specifying this configuration. Add maxFileSize Strategy to avoid scan excessive size of files, it's optional that works with defined | 1.8.0 | +| kyuubi.watchdog.dangerousJoin.enabled | false | Enable dangerous join condition detection in planner stage. | 1.10.0 | +| kyuubi.watchdog.dangerousJoin.broadcastRatio | 0.8 | Broadcast threshold coefficient used to identify oversized broadcast fallback. | 1.10.0 | +| kyuubi.watchdog.dangerousJoin.action | WARN | Action when dangerous join is detected, one of `WARN` and `REJECT`. | 1.10.0 | Review Comment: The table marks the dangerous-join configs as `Since 1.10.0`, but in this PR the corresponding `KyuubiSQLConf` entries are versioned `1.11.0` in most Spark extension modules (and `1.10.0` only in spark-4-1). Please align the documented `Since` version (and the default value shown here) with the actual conf definitions for each supported Spark module. ########## extensions/spark/kyuubi-extension-spark-4-1/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, KyuubiDangerousJoinException} + +class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest { + override protected def beforeAll(): Unit = { + super.beforeAll() + setupData() + } + + test("equi join oversized broadcast fallback should be counted") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + } + } + + test("non equi join cartesian should include Cartesian marker") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian"))) + } Review Comment: The dangerous-join tests only assert `toJson.contains(...)`, which will still pass even if the emitted JSON is malformed (and currently `DangerousJoinCounter.toJson` is malformed). To make this test effective, please parse the JSON (or otherwise validate it is well-formed and contains expected keys/values). ########## extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, KyuubiDangerousJoinException} + +class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest { + override protected def beforeAll(): Unit = { + super.beforeAll() + setupData() + } + + test("equi join oversized broadcast fallback should be counted") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + } + } + + test("non equi join cartesian should include Cartesian marker") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian"))) + } Review Comment: The assertions only check `toJson.contains(...)`, which can pass even if the JSON payload is malformed. Consider parsing/validating the JSON so the test actually verifies the structured diagnostics format. ########## extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import scala.collection.mutable.ArrayBuffer + +object DangerousJoinCounter { + case class Entry( + sqlText: String, + joinType: String, + reason: String, + leftSize: BigInt, + rightSize: BigInt, + broadcastThreshold: Long, + broadcastRatio: Double) { + def toJson: String = { + val pairs = Seq( + "sql" -> escape(sqlText), + "joinType" -> escape(joinType), + "reason" -> escape(reason), + "leftSize" -> leftSize.toString, + "rightSize" -> rightSize.toString, + "broadcastThreshold" -> broadcastThreshold.toString, + "broadcastRatio" -> broadcastRatio.toString) + pairs.map { case (k, v) => + if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") { + s""""$k":$v""" + } else { + s""""$k":"$v"""" + } + }.mkString("{", ",", "}") + } + } + + private val entries = ArrayBuffer.empty[Entry] + + def add(entry: Entry): Unit = synchronized { + entries += entry Review Comment: `DangerousJoinCounter` stores all entries in a global mutable buffer without bounds/eviction. In a long-running driver this can grow indefinitely and cause memory pressure. Consider bounding the buffer or removing storage in production and relying on metrics/logging instead. ```suggestion // Limit the number of in-memory entries to avoid unbounded growth in long-running drivers. private val MaxEntries = 1000 private val entries = ArrayBuffer.empty[Entry] def add(entry: Entry): Unit = synchronized { entries += entry if (entries.size > MaxEntries) { // Remove oldest entries to keep only the most recent MaxEntries val toRemove = entries.size - MaxEntries if (toRemove > 0) { entries.remove(0, toRemove) } } ``` ########## extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { + Some("Cartesian") + } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { + Some("SecondBNLJ") + } else { Review Comment: `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so `SecondBNLJ` can be detected distinctly or remove the dead code path. ########## extensions/spark/kyuubi-extension-spark-4-0/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.kyuubi.sql.KyuubiSQLConf +import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, KyuubiDangerousJoinException} + +class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest { + override protected def beforeAll(): Unit = { + super.beforeAll() + setupData() + } + + test("equi join oversized broadcast fallback should be counted") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + } + } + + test("non equi join cartesian should include Cartesian marker") { + DangerousJoinCounter.reset() + withSQLConf( + KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true", + KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN", + "spark.sql.autoBroadcastJoinThreshold" -> "1") { + sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > b.c1").queryExecution.sparkPlan + assert(DangerousJoinCounter.count >= 1) + assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian"))) + } Review Comment: The assertions only check `toJson.contains(...)`, which can pass even if the JSON payload is malformed. Consider parsing/validating the JSON so the test actually verifies the structured diagnostics format. ########## extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import scala.collection.mutable.ArrayBuffer + +object DangerousJoinCounter { + case class Entry( + sqlText: String, + joinType: String, + reason: String, + leftSize: BigInt, + rightSize: BigInt, + broadcastThreshold: Long, + broadcastRatio: Double) { + def toJson: String = { + val pairs = Seq( + "sql" -> escape(sqlText), + "joinType" -> escape(joinType), + "reason" -> escape(reason), + "leftSize" -> leftSize.toString, + "rightSize" -> rightSize.toString, + "broadcastThreshold" -> broadcastThreshold.toString, + "broadcastRatio" -> broadcastRatio.toString) + pairs.map { case (k, v) => + if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") { + s""""$k":$v""" + } else { + s""""$k":"$v"""" + } + }.mkString("{", ",", "}") + } + } + + private val entries = ArrayBuffer.empty[Entry] + + def add(entry: Entry): Unit = synchronized { Review Comment: `DangerousJoinCounter` stores all entries in a global mutable buffer without bounds/eviction. In a long-running driver this can grow indefinitely and cause memory pressure. Consider bounding the buffer or removing storage in production and relying on metrics/logging instead. ```suggestion // Limit the number of stored entries to avoid unbounded memory growth. private val MaxEntries = 1000 private val entries = ArrayBuffer.empty[Entry] def add(entry: Entry): Unit = synchronized { if (entries.size >= MaxEntries) { // Evict the oldest entry to keep the buffer bounded. entries.remove(0) } ``` ########## extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala: ########## @@ -123,6 +123,30 @@ object KyuubiSQLConf { .bytesConf(ByteUnit.BYTE) .createOptional + val DANGEROUS_JOIN_ENABLED = + buildConf("kyuubi.watchdog.dangerousJoin.enabled") + .doc("Enable dangerous join condition detection.") + .version("1.10.0") + .booleanConf + .createWithDefault(true) + Review Comment: In this Spark 4.1 extension, `kyuubi.watchdog.dangerousJoin.enabled` defaults to `true`, while other Spark-extension modules in this PR default it to `false`. Enabling this by default changes behavior (extra WARN logs / possible REJECT) and makes defaults inconsistent across Spark versions; please align the default across modules and update docs accordingly. ########## extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { Review Comment: The early return when `autoBroadcastJoinThreshold <= 0` disables all detection, including Cartesian/BNLJ risks that are not dependent on broadcast thresholds. Detection should still run for non-broadcast-related rules even when broadcast is disabled. ```suggestion val leftSize = join.left.stats.sizeInBytes val rightSize = join.right.stats.sizeInBytes val hasEquiJoin = isEquiJoin(join) if (hasEquiJoin) { if (isCartesianCondition(join.condition)) { Some("Cartesian") } else if (threshold > 0 && minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { Some("OversizedBroadcastFallback") } else { None } } else { if (threshold <= 0) { None } else if (leftSize > threshold && rightSize > threshold) { ``` ########## extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.sql.watchdog + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression} +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} +import org.slf4j.LoggerFactory + +import org.apache.kyuubi.sql.KyuubiSQLConf + +case class DangerousJoinInterceptor(session: SparkSession) extends SparkStrategy { + import DangerousJoinInterceptor._ + + private val logger = LoggerFactory.getLogger(classOf[DangerousJoinInterceptor]) + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = { + val conf = session.sessionState.conf + if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) { + return Nil + } + val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO) + val threshold = conf.autoBroadcastJoinThreshold + plan.foreach { + case join: Join => + detect(join, threshold, ratio).foreach { reason => + val entry = DangerousJoinCounter.Entry( + sqlText = plan.toString(), + joinType = join.joinType.sql, + reason = reason, + leftSize = join.left.stats.sizeInBytes, + rightSize = join.right.stats.sizeInBytes, + broadcastThreshold = threshold, + broadcastRatio = ratio) + DangerousJoinCounter.add(entry) + logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}") + if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) { + throw new KyuubiDangerousJoinException(entry.toJson) + } + } + case _ => + } + Nil + } + + private def detect(join: Join, threshold: Long, ratio: Double): Option[String] = { + if (threshold <= 0) { + return None + } + val leftSize = join.left.stats.sizeInBytes + val rightSize = join.right.stats.sizeInBytes + val hasEquiJoin = isEquiJoin(join) + if (hasEquiJoin) { + if (isCartesianCondition(join.condition)) { + Some("Cartesian") + } else if (minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) { + Some("OversizedBroadcastFallback") + } else { + None + } + } else { + if (leftSize > threshold && rightSize > threshold) { + Some("Cartesian") + } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) { + Some("SecondBNLJ") + } else { Review Comment: `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so `SecondBNLJ` can be detected distinctly or remove the dead code path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
