Copilot commented on code in PR #7368:
URL: https://github.com/apache/kyuubi/pull/7368#discussion_r2998691843


##########
docs/deployment/settings.md:
##########
@@ -0,0 +1,34 @@
+<!--
+- Licensed to the Apache Software Foundation (ASF) under one or more
+- contributor license agreements.  See the NOTICE file distributed with
+- this work for additional information regarding copyright ownership.
+- The ASF licenses this file to You under the Apache License, Version 2.0
+- (the "License"); you may not use this file except in compliance with
+- the License.  You may obtain a copy of the License at
+-
+-   http://www.apache.org/licenses/LICENSE-2.0
+-
+- Unless required by applicable law or agreed to in writing, software
+- distributed under the License is distributed on an "AS IS" BASIS,
+- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- See the License for the specific language governing permissions and
+- limitations under the License.
+-->
+
+# Deployment Settings for Dangerous Join Watchdog
+
+## Spark SQL Extensions
+
+```properties
+spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
+```
+
+## Dangerous Join Configurations
+
+|                      Name                      | Default |                   
     Description                        |
+|------------------------------------------------|---------|-----------------------------------------------------------|
+| `kyuubi.watchdog.dangerousJoin.enabled`        | `true`  | Enable dangerous 
join watchdog                            |

Review Comment:
   `docs/deployment/settings.md` lists `kyuubi.watchdog.dangerousJoin.enabled` 
default as `true`, but this PR sets it to `false` in several Spark extension 
modules. Please update either the code default or this documentation so 
operators see the correct default behavior.
   ```suggestion
   | `kyuubi.watchdog.dangerousJoin.enabled`        | `false` | Enable 
dangerous join watchdog                            |
   ```



##########
extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }

Review Comment:
   The early return when `autoBroadcastJoinThreshold <= 0` disables all 
detection, including Cartesian/BNLJ risks that are not dependent on broadcast 
thresholds. Detection should still run for non-broadcast-related rules even 
when broadcast is disabled.



##########
extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, 
KyuubiDangerousJoinException}
+
+class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest {
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setupData()
+  }
+
+  test("equi join oversized broadcast fallback should be counted") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+    }
+  }
+
+  test("non equi join cartesian should include Cartesian marker") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+      
assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian")))
+    }

Review Comment:
   The assertions only check `toJson.contains(...)`, which can pass even if the 
JSON payload is malformed. Consider parsing/validating the JSON so the test 
actually verifies the structured diagnostics format.



##########
docs/watchdog/dangerous-join.md:
##########
@@ -0,0 +1,105 @@
+<!--
+- Licensed to the Apache Software Foundation (ASF) under one or more
+- contributor license agreements.  See the NOTICE file distributed with
+- this work for additional information regarding copyright ownership.
+- The ASF licenses this file to You under the Apache License, Version 2.0
+- (the "License"); you may not use this file except in compliance with
+- the License.  You may obtain a copy of the License at
+-
+-   http://www.apache.org/licenses/LICENSE-2.0
+-
+- Unless required by applicable law or agreed to in writing, software
+- distributed under the License is distributed on an "AS IS" BASIS,
+- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- See the License for the specific language governing permissions and
+- limitations under the License.
+-->
+
+# Dangerous Join Watchdog
+
+Kyuubi Dangerous Join Watchdog detects risky join planning patterns before 
query execution.
+It helps reduce accidental Cartesian products, oversized broadcast attempts, 
and long-running nested loop joins.
+
+## Background
+
+In shared SQL gateway environments, a single risky join can consume excessive 
driver memory or create very slow jobs.
+The Dangerous Join Watchdog adds planning-time checks for these high-risk 
patterns.
+
+## Risk Rules
+
+### Equi-Join
+
+- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian 
pattern.
+- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds 
the configured broadcast ratio threshold.
+
+### Non-Equi Join
+
+- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast 
threshold and effectively become Cartesian risk.
+- Rule 2: Non-equi join is marked dangerous when build side is not selectable 
and the plan falls back to a second BNLJ pattern.
+
+## Configurations
+
+|                      Name                      | Default |                   
               Meaning                                  |
+|------------------------------------------------|---------|---------------------------------------------------------------------------|
+| `kyuubi.watchdog.dangerousJoin.enabled`        | `true`  | Enable or disable 
dangerous join detection                                |
+| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8`   | Ratio against 
Spark broadcast threshold for warning/reject decision       |
+| `kyuubi.watchdog.dangerousJoin.action`         | `WARN`  | `WARN` logs 
diagnostics; `REJECT` throws exception and rejects submission |

Review Comment:
   The configuration table lists `kyuubi.watchdog.dangerousJoin.enabled` 
default as `true`, but in this PR most Spark extension modules set the default 
to `false` (e.g., spark-3-3/3-4/3-5/4-0). Please reconcile the documented 
defaults with the actual defaults (and keep them consistent across Spark 
versions).



##########
docs/watchdog/dangerous-join.md:
##########
@@ -0,0 +1,105 @@
+<!--
+- Licensed to the Apache Software Foundation (ASF) under one or more
+- contributor license agreements.  See the NOTICE file distributed with
+- this work for additional information regarding copyright ownership.
+- The ASF licenses this file to You under the Apache License, Version 2.0
+- (the "License"); you may not use this file except in compliance with
+- the License.  You may obtain a copy of the License at
+-
+-   http://www.apache.org/licenses/LICENSE-2.0
+-
+- Unless required by applicable law or agreed to in writing, software
+- distributed under the License is distributed on an "AS IS" BASIS,
+- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- See the License for the specific language governing permissions and
+- limitations under the License.
+-->
+
+# Dangerous Join Watchdog
+
+Kyuubi Dangerous Join Watchdog detects risky join planning patterns before 
query execution.
+It helps reduce accidental Cartesian products, oversized broadcast attempts, 
and long-running nested loop joins.
+
+## Background
+
+In shared SQL gateway environments, a single risky join can consume excessive 
driver memory or create very slow jobs.
+The Dangerous Join Watchdog adds planning-time checks for these high-risk 
patterns.
+
+## Risk Rules
+
+### Equi-Join
+
+- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian 
pattern.
+- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds 
the configured broadcast ratio threshold.
+
+### Non-Equi Join
+
+- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast 
threshold and effectively become Cartesian risk.
+- Rule 2: Non-equi join is marked dangerous when build side is not selectable 
and the plan falls back to a second BNLJ pattern.
+
+## Configurations
+
+|                      Name                      | Default |                   
               Meaning                                  |
+|------------------------------------------------|---------|---------------------------------------------------------------------------|
+| `kyuubi.watchdog.dangerousJoin.enabled`        | `true`  | Enable or disable 
dangerous join detection                                |
+| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8`   | Ratio against 
Spark broadcast threshold for warning/reject decision       |
+| `kyuubi.watchdog.dangerousJoin.action`         | `WARN`  | `WARN` logs 
diagnostics; `REJECT` throws exception and rejects submission |
+
+## Usage
+
+1. Put Kyuubi Spark extension jar into Spark classpath.
+2. Configure SQL extensions:
+
+```properties
+spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
+```
+
+3. Configure action:
+
+```properties
+kyuubi.watchdog.dangerousJoin.action=WARN
+```
+
+or
+
+```properties
+kyuubi.watchdog.dangerousJoin.action=REJECT
+```
+
+## Sample WARN Log
+
+When action is `WARN`, Kyuubi writes a structured JSON payload:
+
+```text
+KYUUBI_LOG_KEY={"sql":"SELECT 
...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}

Review Comment:
   The emitted `sql` field in the JSON payload is built from `plan.toString()` 
in the interceptor (logical plan tree string), not the original SQL text. The 
sample log currently shows `"sql":"SELECT ..."`, which is misleading; please 
either adjust the payload to include real SQL text (if available) or update the 
docs/sample to reflect what is actually logged.
   ```suggestion
   When action is `WARN`, Kyuubi writes a structured JSON payload (the `sql` 
field contains the logical plan string from `plan.toString()`):
   
   ```text
   KYUUBI_LOG_KEY={"sql":"Project [*]\n+- LogicalRDD [id#0, value#1], 
false","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}
   ```



##########
extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {
+        Some("Cartesian")
+      } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) {
+        Some("SecondBNLJ")
+      } else {

Review Comment:
   `cannotSelectBuildSide` is identical to the preceding Cartesian check 
(`leftSize > threshold && rightSize > threshold`), making the `SecondBNLJ` 
branch unreachable. This prevents the interceptor from ever reporting 
`SecondBNLJ` as intended; please adjust the logic so these conditions are 
distinct (or remove the dead branch).



##########
extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {
+        Some("Cartesian")
+      } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) {
+        Some("SecondBNLJ")
+      } else {

Review Comment:
   `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, 
so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so 
`SecondBNLJ` can be detected distinctly or remove the dead code path.



##########
extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import scala.collection.mutable.ArrayBuffer
+
+object DangerousJoinCounter {
+  case class Entry(
+      sqlText: String,
+      joinType: String,
+      reason: String,
+      leftSize: BigInt,
+      rightSize: BigInt,
+      broadcastThreshold: Long,
+      broadcastRatio: Double) {
+    def toJson: String = {
+      val pairs = Seq(
+        "sql" -> escape(sqlText),
+        "joinType" -> escape(joinType),
+        "reason" -> escape(reason),
+        "leftSize" -> leftSize.toString,
+        "rightSize" -> rightSize.toString,
+        "broadcastThreshold" -> broadcastThreshold.toString,
+        "broadcastRatio" -> broadcastRatio.toString)
+      pairs.map { case (k, v) =>
+        if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" 
|| k == "broadcastRatio") {
+          s""""$k":$v"""
+        } else {
+          s""""$k":"$v""""
+        }
+      }.mkString("{", ",", "}")
+    }
+  }
+
+  private val entries = ArrayBuffer.empty[Entry]
+
+  def add(entry: Entry): Unit = synchronized {

Review Comment:
   `DangerousJoinCounter` stores all entries in a global mutable buffer without 
bounds/eviction. In a long-running driver this can grow indefinitely and cause 
memory pressure. Consider bounding the buffer or removing storage in production 
and relying on metrics/logging instead.
   ```suggestion
   
     private val MaxEntries = 1000
     private val entries = ArrayBuffer.empty[Entry]
   
     def add(entry: Entry): Unit = synchronized {
       if (entries.size >= MaxEntries) {
         entries.remove(0)
       }
   ```



##########
extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {

Review Comment:
   The early return when `autoBroadcastJoinThreshold <= 0` disables all 
detection, including Cartesian/BNLJ risks that are not dependent on broadcast 
thresholds. Detection should still run for non-broadcast-related rules even 
when broadcast is disabled.
   ```suggestion
       val leftSize = join.left.stats.sizeInBytes
       val rightSize = join.right.stats.sizeInBytes
       val hasEquiJoin = isEquiJoin(join)
       if (hasEquiJoin) {
         if (isCartesianCondition(join.condition)) {
           Some("Cartesian")
         } else if (threshold > 0 &&
             minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) 
{
           Some("OversizedBroadcastFallback")
         } else {
           None
         }
       } else {
         if (threshold <= 0) {
           None
         } else if (leftSize > threshold && rightSize > threshold) {
   ```



##########
extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import scala.collection.mutable.ArrayBuffer
+
+object DangerousJoinCounter {
+  case class Entry(
+      sqlText: String,
+      joinType: String,
+      reason: String,
+      leftSize: BigInt,
+      rightSize: BigInt,
+      broadcastThreshold: Long,
+      broadcastRatio: Double) {
+    def toJson: String = {
+      val pairs = Seq(
+        "sql" -> escape(sqlText),
+        "joinType" -> escape(joinType),
+        "reason" -> escape(reason),
+        "leftSize" -> leftSize.toString,
+        "rightSize" -> rightSize.toString,
+        "broadcastThreshold" -> broadcastThreshold.toString,
+        "broadcastRatio" -> broadcastRatio.toString)
+      pairs.map { case (k, v) =>
+        if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" 
|| k == "broadcastRatio") {
+          s""""$k":$v"""
+        } else {
+          s""""$k":"$v""""
+        }
+      }.mkString("{", ",", "}")
+    }
+  }
+
+  private val entries = ArrayBuffer.empty[Entry]
+
+  def add(entry: Entry): Unit = synchronized {
+    entries += entry
+  }

Review Comment:
   `DangerousJoinCounter` stores every detected entry in a global `ArrayBuffer` 
with no bounds or eviction. In a long-running Spark driver (Kyuubi gateway), 
this can grow unbounded and eventually cause memory pressure/OOM. Consider 
removing storage entirely for production, or keep only a bounded ring buffer / 
latest entry and expose counters via metrics instead.



##########
docs/watchdog/dangerous-join.md:
##########
@@ -0,0 +1,105 @@
+<!--
+- Licensed to the Apache Software Foundation (ASF) under one or more
+- contributor license agreements.  See the NOTICE file distributed with
+- this work for additional information regarding copyright ownership.
+- The ASF licenses this file to You under the Apache License, Version 2.0
+- (the "License"); you may not use this file except in compliance with
+- the License.  You may obtain a copy of the License at
+-
+-   http://www.apache.org/licenses/LICENSE-2.0
+-
+- Unless required by applicable law or agreed to in writing, software
+- distributed under the License is distributed on an "AS IS" BASIS,
+- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- See the License for the specific language governing permissions and
+- limitations under the License.
+-->
+
+# Dangerous Join Watchdog
+
+Kyuubi Dangerous Join Watchdog detects risky join planning patterns before 
query execution.
+It helps reduce accidental Cartesian products, oversized broadcast attempts, 
and long-running nested loop joins.
+
+## Background
+
+In shared SQL gateway environments, a single risky join can consume excessive 
driver memory or create very slow jobs.
+The Dangerous Join Watchdog adds planning-time checks for these high-risk 
patterns.
+
+## Risk Rules
+
+### Equi-Join
+
+- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian 
pattern.
+- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds 
the configured broadcast ratio threshold.
+
+### Non-Equi Join
+
+- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast 
threshold and effectively become Cartesian risk.
+- Rule 2: Non-equi join is marked dangerous when build side is not selectable 
and the plan falls back to a second BNLJ pattern.
+
+## Configurations
+
+|                      Name                      | Default |                   
               Meaning                                  |
+|------------------------------------------------|---------|---------------------------------------------------------------------------|
+| `kyuubi.watchdog.dangerousJoin.enabled`        | `true`  | Enable or disable 
dangerous join detection                                |
+| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8`   | Ratio against 
Spark broadcast threshold for warning/reject decision       |
+| `kyuubi.watchdog.dangerousJoin.action`         | `WARN`  | `WARN` logs 
diagnostics; `REJECT` throws exception and rejects submission |
+
+## Usage
+
+1. Put Kyuubi Spark extension jar into Spark classpath.
+2. Configure SQL extensions:
+
+```properties
+spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
+```

Review Comment:
   The usage example configures `spark.sql.extensions` with both 
`org.apache.kyuubi.sql.KyuubiSparkSQLExtension` and 
`org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension`. In this PR, 
`KyuubiSparkSQLExtension` already injects `DangerousJoinInterceptor`, so adding 
`KyuubiDangerousJoinExtension` would inject it twice and can lead to duplicated 
warnings/counter increments (and potentially double rejection paths). Please 
update the docs to recommend only one mechanism (either the main extension or 
the dedicated watchdog extension).



##########
extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {
+        Some("Cartesian")
+      } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) {
+        Some("SecondBNLJ")
+      } else {

Review Comment:
   `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, 
so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so 
`SecondBNLJ` can be detected distinctly or remove the dead code path.



##########
extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import scala.collection.mutable.ArrayBuffer
+
+object DangerousJoinCounter {
+  case class Entry(
+      sqlText: String,
+      joinType: String,
+      reason: String,
+      leftSize: BigInt,
+      rightSize: BigInt,
+      broadcastThreshold: Long,
+      broadcastRatio: Double) {
+    def toJson: String = {
+      val pairs = Seq(
+        "sql" -> escape(sqlText),
+        "joinType" -> escape(joinType),
+        "reason" -> escape(reason),
+        "leftSize" -> leftSize.toString,
+        "rightSize" -> rightSize.toString,
+        "broadcastThreshold" -> broadcastThreshold.toString,
+        "broadcastRatio" -> broadcastRatio.toString)
+      pairs.map { case (k, v) =>
+        if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" 
|| k == "broadcastRatio") {
+          s""""$k":$v"""
+        } else {
+          s""""$k":"$v""""
+        }
+      }.mkString("{", ",", "}")
+    }
+  }
+
+  private val entries = ArrayBuffer.empty[Entry]
+
+  def add(entry: Entry): Unit = synchronized {
+    entries += entry
+  }

Review Comment:
   `DangerousJoinCounter` stores all entries in a global mutable buffer without 
bounds/eviction. In a long-running driver this can grow indefinitely and cause 
memory pressure. Consider bounding the buffer or removing storage in production 
and relying on metrics/logging instead.



##########
extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, 
KyuubiDangerousJoinException}
+
+class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest {
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setupData()
+  }
+
+  test("equi join oversized broadcast fallback should be counted") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+    }
+  }
+
+  test("non equi join cartesian should include Cartesian marker") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+      
assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian")))
+    }

Review Comment:
   The assertions only check `toJson.contains(...)`, which can pass even if the 
JSON payload is malformed. Consider parsing/validating the JSON so the test 
actually verifies the structured diagnostics format.



##########
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {
+        Some("Cartesian")
+      } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) {

Review Comment:
   The early return when `autoBroadcastJoinThreshold <= 0` disables all 
detection, including Cartesian/BNLJ risks that are not dependent on broadcast 
thresholds. Detection should still run for non-broadcast-related rules even 
when broadcast is disabled.
   ```suggestion
       val leftSize = join.left.stats.sizeInBytes
       val rightSize = join.right.stats.sizeInBytes
       val hasEquiJoin = isEquiJoin(join)
       if (hasEquiJoin) {
         if (isCartesianCondition(join.condition)) {
           Some("Cartesian")
         } else if (threshold > 0 &&
             minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) 
{
           Some("OversizedBroadcastFallback")
         } else {
           None
         }
       } else {
         if (threshold > 0 && leftSize > threshold && rightSize > threshold) {
           Some("Cartesian")
         } else if (threshold > 0 &&
             cannotSelectBuildSide(leftSize, rightSize, threshold)) {
   ```



##########
extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }

Review Comment:
   The early return when `autoBroadcastJoinThreshold <= 0` disables *all* 
dangerous-join detection, including Cartesian / nested-loop cases that are 
unrelated to broadcast thresholds. If the goal is to guard against Cartesian 
and BNLJ, detection should still run when broadcast is disabled; only the 
broadcast-size based rule should depend on this threshold.



##########
docs/configuration/settings.md:
##########
@@ -580,6 +580,18 @@ 
jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.execu
 
 Please refer to the Spark official online documentation for [SET 
Command](https://spark.apache.org/docs/latest/sql-ref-syntax-aux-conf-mgmt-set.html)
 
+### Dangerous Join Watchdog
+
+You can enable dangerous join detection for Spark SQL extension with:
+
+|                      Name                      | Default |                   
                 Description                                     |
+|------------------------------------------------|---------|------------------------------------------------------------------------------------|
+| `kyuubi.watchdog.dangerousJoin.enabled`        | `true`  | Enable dangerous 
join detection                                                    |

Review Comment:
   This table documents `kyuubi.watchdog.dangerousJoin.enabled` default as 
`true`, but the Spark extension modules introduced in this PR mostly default it 
to `false`. Please align the defaults here with the code (and keep consistent 
across Spark versions) to avoid misleading operators.
   ```suggestion
   | `kyuubi.watchdog.dangerousJoin.enabled`        | `false` | Enable 
dangerous join detection                                                    |
   ```



##########
docs/extensions/engines/spark/rules.md:
##########
@@ -65,31 +65,34 @@ Now, you can enjoy the Kyuubi SQL Extension.
 
 Kyuubi provides some configs to make these feature easy to use.
 
-|                                Name                                 |        
     Default Value              |                                               
                                                                                
                                   Description                                  
                                                                                
                                                | Since |
-|---------------------------------------------------------------------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
-| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled            | true   
                                | Add repartition node at the top of query 
plan. An approach of merging small files.                                       
                                                                                
                                                                                
                                                     | 1.2.0 |
-| spark.sql.optimizer.forceShuffleBeforeJoin.enabled                  | false  
                                | Ensure shuffle node exists before shuffled 
join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario 
join, multi table join).                                                        
                                                                                
                                                       | 1.2.0 |
-| spark.sql.optimizer.finalStageConfigIsolation.enabled               | false  
                                | If true, the final stage support use 
different config with previous stage. The prefix of final stage config key 
should be `spark.sql.finalStage.`. For example, the raw spark config: 
`spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config 
should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 
1.2.0 |
-| spark.sql.optimizer.insertZorderBeforeWriting.enabled               | true   
                                | When true, we will follow target table 
properties to insert zorder or not. The key properties are: 1) 
`kyuubi.zorder.enabled`: if this property is true, we will insert zorder before 
writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by 
these cols.                                                             | 1.4.0 
|
-| spark.sql.optimizer.zorderGlobalSort.enabled                        | true   
                                | When true, we do a global sort using zorder. 
Note that, it can cause data skew issue if the zorder columns have less 
cardinality. When false, we only do local sort using zorder.                    
                                                                                
                                                         | 1.4.0 |
-| spark.sql.watchdog.maxPartitions                                    | none   
                                | Set the max partition number when spark scans 
a data source. Enable maxPartition Strategy by specifying this configuration. 
Add maxPartitions Strategy to avoid scan excessive partitions on partitioned 
table, it's optional that works with defined                                    
                                                     | 1.4.0 |
-| spark.sql.watchdog.maxFileSize                                      | none   
                                | Set the maximum size in bytes of files when 
spark scans a data source. Enable maxFileSize Strategy by specifying this 
configuration. Add maxFileSize Strategy to avoid scan excessive size of files, 
it's optional that works with defined                                           
                                                         | 1.8.0 |
-| spark.sql.optimizer.dropIgnoreNonExistent                           | false  
                                | When true, do not report an error if DROP 
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent 
database/table/view/function/partition                                          
                                                                                
                                                                    | 1.5.0 |
-| spark.sql.optimizer.rebalanceBeforeZorder.enabled                   | false  
                                | When true, we do a rebalance before zorder in 
case data skew. Note that, if the insertion is dynamic partition we will use 
the partition columns to rebalance.                                             
                                                                                
                                                   | 1.6.0 |
-| spark.sql.optimizer.rebalanceZorderColumns.enabled                  | false  
                                | When true and 
`spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do rebalance 
before Z-Order. If it's dynamic partition insert, the rebalance expression will 
include both partition columns and Z-Order columns.                             
                                                                                
   | 1.6.0 |
-| spark.sql.optimizer.twoPhaseRebalanceBeforeZorder.enabled           | false  
                                | When true and 
`spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do two phase 
rebalance before Z-Order for the dynamic partition write. The first phase 
rebalance using dynamic partition column; The second phase rebalance using 
dynamic partition column Z-Order columns.                                       
              | 1.6.0 |
-| spark.sql.optimizer.zorderUsingOriginalOrdering.enabled             | false  
                                | When true and 
`spark.sql.optimizer.rebalanceBeforeZorder.enabled` is true, we do sort by the 
original ordering i.e. lexicographical order.                                   
                                                                                
                                                                                
 | 1.6.0 |
-| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled             | false  
                                | When ture, infer columns for rebalance and 
sort orders from original query, e.g. the join keys from join. It can avoid 
compression ratio regression.                                                   
                                                                                
                                                       | 1.7.0 |
-| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns           | 3      
                                | The max columns of inferred columns.          
                                                                                
                                                                                
                                                                                
                                                | 1.7.0 |
-| spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled | false  
                                | When true, add repartition even if the 
original plan does not have shuffle.                                            
                                                                                
                                                                                
                                                       | 1.7.0 |
-| spark.sql.optimizer.finalStageConfigIsolationWriteOnly.enabled      | true   
                                | When true, only enable final stage isolation 
for writing.                                                                    
                                                                                
                                                                                
                                                 | 1.7.0 |
-| spark.sql.finalWriteStage.eagerlyKillExecutors.enabled              | false  
                                | When true, eagerly kill redundant executors 
before running final write stage.                                               
                                                                                
                                                                                
                                                  | 1.8.0 |
-| spark.sql.finalWriteStage.skipKillingExecutorsForTableCache         | true   
                                | When true, skip killing executors if the plan 
has table caches.                                                               
                                                                                
                                                                                
                                                | 1.8.0 |
-| spark.sql.finalWriteStage.retainExecutorsFactor                     | 1.2    
                                | If the target executors * factor < active 
executors, and target executors * factor > min executors, then inject kill 
executors or inject custom resource profile.                                    
                                                                                
                                                         | 1.8.0 |
-| spark.sql.finalWriteStage.resourceIsolation.enabled                 | false  
                                | When true, make final write stage resource 
isolation using custom RDD resource profile.                                    
                                                                                
                                                                                
                                                   | 1.8.0 |
-| spark.sql.finalWriteStageExecutorCores                              | 
fallback spark.executor.cores          | Specify the executor core request for 
final write stage. It would be passed to the RDD resource profile.              
                                                                                
                                                                                
                                                        | 1.8.0 |
-| spark.sql.finalWriteStageExecutorMemory                             | 
fallback spark.executor.memory         | Specify the executor on heap memory 
request for final write stage. It would be passed to the RDD resource profile.  
                                                                                
                                                                                
                                                          | 1.8.0 |
-| spark.sql.finalWriteStageExecutorMemoryOverhead                     | 
fallback spark.executor.memoryOverhead | Specify the executor memory overhead 
request for final write stage. It would be passed to the RDD resource profile.  
                                                                                
                                                                                
                                                         | 1.8.0 |
-| spark.sql.finalWriteStageExecutorOffHeapMemory                      | NONE   
                                | Specify the executor off heap memory request 
for final write stage. It would be passed to the RDD resource profile.          
                                                                                
                                                                                
                                                 | 1.8.0 |
-| spark.sql.execution.scriptTransformation.enabled                    | true   
                                | When false, script transformation is not 
allowed.                                                                        
                                                                                
                                                                                
                                                     | 1.9.0 |
+|                                Name                                 |        
     Default Value              |                                               
                                                                                
                                   Description                                  
                                                                                
                                                | Since  |
+|---------------------------------------------------------------------|----------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|
+| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled            | true   
                                | Add repartition node at the top of query 
plan. An approach of merging small files.                                       
                                                                                
                                                                                
                                                     | 1.2.0  |
+| spark.sql.optimizer.forceShuffleBeforeJoin.enabled                  | false  
                                | Ensure shuffle node exists before shuffled 
join (shj and smj) to make AQE `OptimizeSkewedJoin` works (complex scenario 
join, multi table join).                                                        
                                                                                
                                                       | 1.2.0  |
+| spark.sql.optimizer.finalStageConfigIsolation.enabled               | false  
                                | If true, the final stage support use 
different config with previous stage. The prefix of final stage config key 
should be `spark.sql.finalStage.`. For example, the raw spark config: 
`spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config 
should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 
1.2.0  |
+| spark.sql.optimizer.insertZorderBeforeWriting.enabled               | true   
                                | When true, we will follow target table 
properties to insert zorder or not. The key properties are: 1) 
`kyuubi.zorder.enabled`: if this property is true, we will insert zorder before 
writing data. 2) `kyuubi.zorder.cols`: string split by comma, we will zorder by 
these cols.                                                             | 1.4.0 
 |
+| spark.sql.optimizer.zorderGlobalSort.enabled                        | true   
                                | When true, we do a global sort using zorder. 
Note that, it can cause data skew issue if the zorder columns have less 
cardinality. When false, we only do local sort using zorder.                    
                                                                                
                                                         | 1.4.0  |
+| spark.sql.watchdog.maxPartitions                                    | none   
                                | Set the max partition number when spark scans 
a data source. Enable maxPartition Strategy by specifying this configuration. 
Add maxPartitions Strategy to avoid scan excessive partitions on partitioned 
table, it's optional that works with defined                                    
                                                     | 1.4.0  |
+| spark.sql.watchdog.maxFileSize                                      | none   
                                | Set the maximum size in bytes of files when 
spark scans a data source. Enable maxFileSize Strategy by specifying this 
configuration. Add maxFileSize Strategy to avoid scan excessive size of files, 
it's optional that works with defined                                           
                                                         | 1.8.0  |
+| kyuubi.watchdog.dangerousJoin.enabled                               | false  
                                | Enable dangerous join condition detection in 
planner stage.                                                                  
                                                                                
                                                                                
                                                 | 1.10.0 |
+| kyuubi.watchdog.dangerousJoin.broadcastRatio                        | 0.8    
                                | Broadcast threshold coefficient used to 
identify oversized broadcast fallback.                                          
                                                                                
                                                                                
                                                      | 1.10.0 |
+| kyuubi.watchdog.dangerousJoin.action                                | WARN   
                                | Action when dangerous join is detected, one 
of `WARN` and `REJECT`.                                                         
                                                                                
                                                                                
                                                  | 1.10.0 |

Review Comment:
   The table marks the dangerous-join configs as `Since 1.10.0`, but in this PR 
the corresponding `KyuubiSQLConf` entries are versioned `1.11.0` in most Spark 
extension modules (and `1.10.0` only in spark-4-1). Please align the documented 
`Since` version (and the default value shown here) with the actual conf 
definitions for each supported Spark module.



##########
extensions/spark/kyuubi-extension-spark-4-1/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, 
KyuubiDangerousJoinException}
+
+class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest {
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setupData()
+  }
+
+  test("equi join oversized broadcast fallback should be counted") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+    }
+  }
+
+  test("non equi join cartesian should include Cartesian marker") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+      
assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian")))
+    }

Review Comment:
   The dangerous-join tests only assert `toJson.contains(...)`, which will 
still pass even if the emitted JSON is malformed (and currently 
`DangerousJoinCounter.toJson` is malformed). To make this test effective, 
please parse the JSON (or otherwise validate it is well-formed and contains 
expected keys/values).



##########
extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, 
KyuubiDangerousJoinException}
+
+class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest {
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setupData()
+  }
+
+  test("equi join oversized broadcast fallback should be counted") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+    }
+  }
+
+  test("non equi join cartesian should include Cartesian marker") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+      
assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian")))
+    }

Review Comment:
   The assertions only check `toJson.contains(...)`, which can pass even if the 
JSON payload is malformed. Consider parsing/validating the JSON so the test 
actually verifies the structured diagnostics format.



##########
extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import scala.collection.mutable.ArrayBuffer
+
+object DangerousJoinCounter {
+  case class Entry(
+      sqlText: String,
+      joinType: String,
+      reason: String,
+      leftSize: BigInt,
+      rightSize: BigInt,
+      broadcastThreshold: Long,
+      broadcastRatio: Double) {
+    def toJson: String = {
+      val pairs = Seq(
+        "sql" -> escape(sqlText),
+        "joinType" -> escape(joinType),
+        "reason" -> escape(reason),
+        "leftSize" -> leftSize.toString,
+        "rightSize" -> rightSize.toString,
+        "broadcastThreshold" -> broadcastThreshold.toString,
+        "broadcastRatio" -> broadcastRatio.toString)
+      pairs.map { case (k, v) =>
+        if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" 
|| k == "broadcastRatio") {
+          s""""$k":$v"""
+        } else {
+          s""""$k":"$v""""
+        }
+      }.mkString("{", ",", "}")
+    }
+  }
+
+  private val entries = ArrayBuffer.empty[Entry]
+
+  def add(entry: Entry): Unit = synchronized {
+    entries += entry

Review Comment:
   `DangerousJoinCounter` stores all entries in a global mutable buffer without 
bounds/eviction. In a long-running driver this can grow indefinitely and cause 
memory pressure. Consider bounding the buffer or removing storage in production 
and relying on metrics/logging instead.
   ```suggestion
   
     // Limit the number of in-memory entries to avoid unbounded growth in 
long-running drivers.
     private val MaxEntries = 1000
     private val entries = ArrayBuffer.empty[Entry]
   
     def add(entry: Entry): Unit = synchronized {
       entries += entry
       if (entries.size > MaxEntries) {
         // Remove oldest entries to keep only the most recent MaxEntries
         val toRemove = entries.size - MaxEntries
         if (toRemove > 0) {
           entries.remove(0, toRemove)
         }
       }
   ```



##########
extensions/spark/kyuubi-extension-spark-4-0/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {
+        Some("Cartesian")
+      } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) {
+        Some("SecondBNLJ")
+      } else {

Review Comment:
   `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, 
so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so 
`SecondBNLJ` can be detected distinctly or remove the dead code path.



##########
extensions/spark/kyuubi-extension-spark-4-0/src/test/scala/org/apache/spark/sql/DangerousJoinInterceptorSuite.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+import org.apache.kyuubi.sql.watchdog.{DangerousJoinCounter, 
KyuubiDangerousJoinException}
+
+class DangerousJoinInterceptorSuite extends KyuubiSparkSQLExtensionTest {
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    setupData()
+  }
+
+  test("equi join oversized broadcast fallback should be counted") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO.key -> "0.8",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 = 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+    }
+  }
+
+  test("non equi join cartesian should include Cartesian marker") {
+    DangerousJoinCounter.reset()
+    withSQLConf(
+      KyuubiSQLConf.DANGEROUS_JOIN_ENABLED.key -> "true",
+      KyuubiSQLConf.DANGEROUS_JOIN_ACTION.key -> "WARN",
+      "spark.sql.autoBroadcastJoinThreshold" -> "1") {
+      sql("SELECT * FROM t1 a JOIN t2 b ON a.c1 > 
b.c1").queryExecution.sparkPlan
+      assert(DangerousJoinCounter.count >= 1)
+      
assert(DangerousJoinCounter.latest.exists(_.toJson.contains("Cartesian")))
+    }

Review Comment:
   The assertions only check `toJson.contains(...)`, which can pass even if the 
JSON payload is malformed. Consider parsing/validating the JSON so the test 
actually verifies the structured diagnostics format.



##########
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinCounter.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import scala.collection.mutable.ArrayBuffer
+
+object DangerousJoinCounter {
+  case class Entry(
+      sqlText: String,
+      joinType: String,
+      reason: String,
+      leftSize: BigInt,
+      rightSize: BigInt,
+      broadcastThreshold: Long,
+      broadcastRatio: Double) {
+    def toJson: String = {
+      val pairs = Seq(
+        "sql" -> escape(sqlText),
+        "joinType" -> escape(joinType),
+        "reason" -> escape(reason),
+        "leftSize" -> leftSize.toString,
+        "rightSize" -> rightSize.toString,
+        "broadcastThreshold" -> broadcastThreshold.toString,
+        "broadcastRatio" -> broadcastRatio.toString)
+      pairs.map { case (k, v) =>
+        if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" 
|| k == "broadcastRatio") {
+          s""""$k":$v"""
+        } else {
+          s""""$k":"$v""""
+        }
+      }.mkString("{", ",", "}")
+    }
+  }
+
+  private val entries = ArrayBuffer.empty[Entry]
+
+  def add(entry: Entry): Unit = synchronized {

Review Comment:
   `DangerousJoinCounter` stores all entries in a global mutable buffer without 
bounds/eviction. In a long-running driver this can grow indefinitely and cause 
memory pressure. Consider bounding the buffer or removing storage in production 
and relying on metrics/logging instead.
   ```suggestion
   
     // Limit the number of stored entries to avoid unbounded memory growth.
     private val MaxEntries = 1000
     private val entries = ArrayBuffer.empty[Entry]
   
     def add(entry: Entry): Unit = synchronized {
       if (entries.size >= MaxEntries) {
         // Evict the oldest entry to keep the buffer bounded.
         entries.remove(0)
       }
   ```



##########
extensions/spark/kyuubi-extension-spark-4-1/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala:
##########
@@ -123,6 +123,30 @@ object KyuubiSQLConf {
       .bytesConf(ByteUnit.BYTE)
       .createOptional
 
+  val DANGEROUS_JOIN_ENABLED =
+    buildConf("kyuubi.watchdog.dangerousJoin.enabled")
+      .doc("Enable dangerous join condition detection.")
+      .version("1.10.0")
+      .booleanConf
+      .createWithDefault(true)
+

Review Comment:
   In this Spark 4.1 extension, `kyuubi.watchdog.dangerousJoin.enabled` 
defaults to `true`, while other Spark-extension modules in this PR default it 
to `false`. Enabling this by default changes behavior (extra WARN logs / 
possible REJECT) and makes defaults inconsistent across Spark versions; please 
align the default across modules and update docs accordingly.



##########
extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {

Review Comment:
   The early return when `autoBroadcastJoinThreshold <= 0` disables all 
detection, including Cartesian/BNLJ risks that are not dependent on broadcast 
thresholds. Detection should still run for non-broadcast-related rules even 
when broadcast is disabled.
   ```suggestion
       val leftSize = join.left.stats.sizeInBytes
       val rightSize = join.right.stats.sizeInBytes
       val hasEquiJoin = isEquiJoin(join)
       if (hasEquiJoin) {
         if (isCartesianCondition(join.condition)) {
           Some("Cartesian")
         } else if (threshold > 0 &&
             minSize(leftSize, rightSize) > BigInt((threshold * ratio).toLong)) 
{
           Some("OversizedBroadcastFallback")
         } else {
           None
         }
       } else {
         if (threshold <= 0) {
           None
         } else if (leftSize > threshold && rightSize > threshold) {
   ```



##########
extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/watchdog/DangerousJoinInterceptor.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql.watchdog
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
+import org.slf4j.LoggerFactory
+
+import org.apache.kyuubi.sql.KyuubiSQLConf
+
+case class DangerousJoinInterceptor(session: SparkSession) extends 
SparkStrategy {
+  import DangerousJoinInterceptor._
+
+  private val logger = 
LoggerFactory.getLogger(classOf[DangerousJoinInterceptor])
+
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+    val conf = session.sessionState.conf
+    if (!conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ENABLED)) {
+      return Nil
+    }
+    val ratio = conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_BROADCAST_RATIO)
+    val threshold = conf.autoBroadcastJoinThreshold
+    plan.foreach {
+      case join: Join =>
+        detect(join, threshold, ratio).foreach { reason =>
+          val entry = DangerousJoinCounter.Entry(
+            sqlText = plan.toString(),
+            joinType = join.joinType.sql,
+            reason = reason,
+            leftSize = join.left.stats.sizeInBytes,
+            rightSize = join.right.stats.sizeInBytes,
+            broadcastThreshold = threshold,
+            broadcastRatio = ratio)
+          DangerousJoinCounter.add(entry)
+          logger.warn(s"$KYUUBI_LOG_KEY=${entry.toJson}")
+          if (conf.getConf(KyuubiSQLConf.DANGEROUS_JOIN_ACTION) == REJECT) {
+            throw new KyuubiDangerousJoinException(entry.toJson)
+          }
+        }
+      case _ =>
+    }
+    Nil
+  }
+
+  private def detect(join: Join, threshold: Long, ratio: Double): 
Option[String] = {
+    if (threshold <= 0) {
+      return None
+    }
+    val leftSize = join.left.stats.sizeInBytes
+    val rightSize = join.right.stats.sizeInBytes
+    val hasEquiJoin = isEquiJoin(join)
+    if (hasEquiJoin) {
+      if (isCartesianCondition(join.condition)) {
+        Some("Cartesian")
+      } else if (minSize(leftSize, rightSize) > BigInt((threshold * 
ratio).toLong)) {
+        Some("OversizedBroadcastFallback")
+      } else {
+        None
+      }
+    } else {
+      if (leftSize > threshold && rightSize > threshold) {
+        Some("Cartesian")
+      } else if (cannotSelectBuildSide(leftSize, rightSize, threshold)) {
+        Some("SecondBNLJ")
+      } else {

Review Comment:
   `cannotSelectBuildSide` is the same predicate as the prior Cartesian check, 
so the `SecondBNLJ` branch can never be reached. Please fix the condition(s) so 
`SecondBNLJ` can be detected distinctly or remove the dead code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to