This is an automated email from the ASF dual-hosted git repository.

szehon-ho pushed a commit to branch spark-57113-follow-up
in repository https://gitbox.apache.org/repos/asf/spark.git

commit a261c0de940bd449871249afed81d3dad46ed890
Author: Szehon Ho <[email protected]>
AuthorDate: Mon Jun 1 15:21:28 2026 -0700

    [SPARK-55347][SDP][FOLLOW-UP] Hoist implicits and imports in 
AutoCdcScd1KeyDriftSuite
    
    Replace the repeated per-test `val session = spark; import 
session.implicits._`
    with a single class-level `import testImplicits._`, and import
    `classic.DataFrame` and `types.MetadataBuilder` instead of using
    fully-qualified names.
---
 .../pipelines/graph/AutoCdcScd1KeyDriftSuite.scala | 43 +++-------------------
 1 file changed, 6 insertions(+), 37 deletions(-)

diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
index cfc90ebc9b2d..828c09b01db5 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.pipelines.graph
 
+import org.apache.spark.sql.classic.DataFrame
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.MetadataBuilder
 
 /**
  * End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC 
flow's declared
@@ -38,11 +40,10 @@ class AutoCdcScd1KeyDriftSuite
     with SharedSparkSession
     with AutoCdcGraphExecutionTestMixin {
 
+  import testImplicits._
+
   test("a pipeline execution that adds a key column to an existing AutoCDC 
flow triggers " +
     "KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     // Target table carries both candidate key columns up-front so only the 
AutoCDC `keys`
     // declaration differs between the two pipelines.
     spark.sql(
@@ -81,9 +82,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that drops a key column from an existing AutoCDC 
flow triggers " +
     "KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
@@ -120,9 +118,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that swaps a key in an existing AutoCDC flow for 
a different name " +
     "(same arity) triggers KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " +
@@ -171,8 +166,6 @@ class AutoCdcScd1KeyDriftSuite
       s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'["id"]')"""
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -193,9 +186,6 @@ class AutoCdcScd1KeyDriftSuite
   }
 
   test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift 
validation") {
-    val session = spark
-    import session.implicits._
-
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
@@ -217,9 +207,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that changes a key column's nullability or 
metadata in an " +
     "existing AutoCDC flow does NOT trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // Drift validation compares (name, dataType) pairs as a set. Nullability 
and column
     // metadata are part of [[StructField]] but not part of [[DataType]], so 
they do not gate
     // semantic equivalence: only the wire-format data type matters for merge 
correctness.
@@ -240,7 +227,7 @@ class AutoCdcScd1KeyDriftSuite
     val stream2 = MemoryStream[(Option[Int], Long)]
     stream2.addData((Some(2), 2L))
     val baseDf = stream2.toDF().toDF("id", "version")
-    val md = new org.apache.spark.sql.types.MetadataBuilder()
+    val md = new MetadataBuilder()
       .putString("description", "primary key")
       .build()
     val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md), 
baseDf("version"))
@@ -249,9 +236,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that wraps an existing AutoCDC flow's key in 
backticks does NOT " +
     "trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // Backticks are a SQL-parse syntactic device, not part of the identifier 
itself. A user
     // adding or removing backticks around the same logical column must NOT be 
detected as drift.
     spark.sql(
@@ -270,9 +254,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("a pipeline execution that drops backticks around an existing AutoCDC 
flow's " +
     "previously-backtick-quoted key does NOT trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // The reverse direction of the previous test: drift validation must be 
backtick-invariant
     // on both the WRITE side (recorded property strips backticks when 
serializing the key
     // names in pipeline #1) and the READ side (resolver-aware lookup strips 
backticks when
@@ -293,9 +274,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key 
differs only in case " +
     "from the recorded key triggers KEY_SCHEMA_DRIFT") {
-    val session = spark
-    import session.implicits._
-
     // validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its 
behavior on
     // `Id` vs `id` flips with the session conf. Pin the case-sensitive 
direction: pipeline #1
     // seeds the aux table under the default resolver with recorded key 
`["id"]`, then
@@ -336,9 +314,6 @@ class AutoCdcScd1KeyDriftSuite
 
   test("under the default (case-insensitive) resolver, an AutoCDC flow whose 
key differs only " +
     "in case from the recorded key does NOT trigger drift") {
-    val session = spark
-    import session.implicits._
-
     // Pairs with the case-sensitive test above: same recorded key, but under 
the default
     // resolver the two identifiers are equivalent so drift validation must 
accept pipeline
     // #2. This pins the negative direction so a regression that accidentally 
hard-codes a
@@ -377,8 +352,6 @@ class AutoCdcScd1KeyDriftSuite
       s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, 
$cdcMetadataDdl)"
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -412,8 +385,6 @@ class AutoCdcScd1KeyDriftSuite
       s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'$malformedKeysArray')"
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -449,8 +420,6 @@ class AutoCdcScd1KeyDriftSuite
       s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'["region"]')"""
     )
 
-    val session = spark
-    import session.implicits._
     val stream = MemoryStream[(Int, Long)]
     stream.addData((1, 1L))
     val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
@@ -477,7 +446,7 @@ class AutoCdcScd1KeyDriftSuite
    */
   private def buildPipeline(
       flowName: String,
-      sourceDf: org.apache.spark.sql.classic.DataFrame,
+      sourceDf: DataFrame,
       keys: Seq[String]): TestGraphRegistrationContext =
     singleAutoCdcFlowPipeline(flowName, "target", sourceDf, keys)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to