This is an automated email from the ASF dual-hosted git repository. szehon-ho pushed a commit to branch spark-57113-follow-up in repository https://gitbox.apache.org/repos/asf/spark.git
commit a261c0de940bd449871249afed81d3dad46ed890 Author: Szehon Ho <[email protected]> AuthorDate: Mon Jun 1 15:21:28 2026 -0700 [SPARK-55347][SDP][FOLLOW-UP] Hoist implicits and imports in AutoCdcScd1KeyDriftSuite Replace the repeated per-test `val session = spark; import session.implicits._` with a single class-level `import testImplicits._`, and import `classic.DataFrame` and `types.MetadataBuilder` instead of using fully-qualified names. --- .../pipelines/graph/AutoCdcScd1KeyDriftSuite.scala | 43 +++------------------- 1 file changed, 6 insertions(+), 37 deletions(-) diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala index cfc90ebc9b2d..828c09b01db5 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.pipelines.graph +import org.apache.spark.sql.classic.DataFrame import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext} import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.MetadataBuilder /** * End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC flow's declared @@ -38,11 +40,10 @@ class AutoCdcScd1KeyDriftSuite with SharedSparkSession with AutoCdcGraphExecutionTestMixin { + import testImplicits._ + test("a pipeline execution that adds a key column to an existing AutoCDC flow triggers " + "KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - // Target table carries both candidate key columns up-front so only the AutoCDC `keys` // declaration differs between the two pipelines. spark.sql( @@ -81,9 +82,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that drops a key column from an existing AutoCDC flow triggers " + "KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - spark.sql( s"CREATE TABLE $catalog.$namespace.target " + s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" @@ -120,9 +118,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that swaps a key in an existing AutoCDC flow for a different name " + "(same arity) triggers KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - spark.sql( s"CREATE TABLE $catalog.$namespace.target " + s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " + @@ -171,8 +166,6 @@ class AutoCdcScd1KeyDriftSuite s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = '["id"]')""" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -193,9 +186,6 @@ class AutoCdcScd1KeyDriftSuite } test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift validation") { - val session = spark - import session.implicits._ - spark.sql( s"CREATE TABLE $catalog.$namespace.target " + s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)" @@ -217,9 +207,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that changes a key column's nullability or metadata in an " + "existing AutoCDC flow does NOT trigger drift") { - val session = spark - import session.implicits._ - // Drift validation compares (name, dataType) pairs as a set. Nullability and column // metadata are part of [[StructField]] but not part of [[DataType]], so they do not gate // semantic equivalence: only the wire-format data type matters for merge correctness. @@ -240,7 +227,7 @@ class AutoCdcScd1KeyDriftSuite val stream2 = MemoryStream[(Option[Int], Long)] stream2.addData((Some(2), 2L)) val baseDf = stream2.toDF().toDF("id", "version") - val md = new org.apache.spark.sql.types.MetadataBuilder() + val md = new MetadataBuilder() .putString("description", "primary key") .build() val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md), baseDf("version")) @@ -249,9 +236,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that wraps an existing AutoCDC flow's key in backticks does NOT " + "trigger drift") { - val session = spark - import session.implicits._ - // Backticks are a SQL-parse syntactic device, not part of the identifier itself. A user // adding or removing backticks around the same logical column must NOT be detected as drift. spark.sql( @@ -270,9 +254,6 @@ class AutoCdcScd1KeyDriftSuite test("a pipeline execution that drops backticks around an existing AutoCDC flow's " + "previously-backtick-quoted key does NOT trigger drift") { - val session = spark - import session.implicits._ - // The reverse direction of the previous test: drift validation must be backtick-invariant // on both the WRITE side (recorded property strips backticks when serializing the key // names in pipeline #1) and the READ side (resolver-aware lookup strips backticks when @@ -293,9 +274,6 @@ class AutoCdcScd1KeyDriftSuite test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key differs only in case " + "from the recorded key triggers KEY_SCHEMA_DRIFT") { - val session = spark - import session.implicits._ - // validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its behavior on // `Id` vs `id` flips with the session conf. Pin the case-sensitive direction: pipeline #1 // seeds the aux table under the default resolver with recorded key `["id"]`, then @@ -336,9 +314,6 @@ class AutoCdcScd1KeyDriftSuite test("under the default (case-insensitive) resolver, an AutoCDC flow whose key differs only " + "in case from the recorded key does NOT trigger drift") { - val session = spark - import session.implicits._ - // Pairs with the case-sensitive test above: same recorded key, but under the default // resolver the two identifiers are equivalent so drift validation must accept pipeline // #2. This pins the negative direction so a regression that accidentally hard-codes a @@ -377,8 +352,6 @@ class AutoCdcScd1KeyDriftSuite s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, $cdcMetadataDdl)" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -412,8 +385,6 @@ class AutoCdcScd1KeyDriftSuite s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = '$malformedKeysArray')" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -449,8 +420,6 @@ class AutoCdcScd1KeyDriftSuite s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = '["region"]')""" ) - val session = spark - import session.implicits._ val stream = MemoryStream[(Int, Long)] stream.addData((1, 1L)) val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), Seq("id")) @@ -477,7 +446,7 @@ class AutoCdcScd1KeyDriftSuite */ private def buildPipeline( flowName: String, - sourceDf: org.apache.spark.sql.classic.DataFrame, + sourceDf: DataFrame, keys: Seq[String]): TestGraphRegistrationContext = singleAutoCdcFlowPipeline(flowName, "target", sourceDf, keys) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
