This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 877c3f2bea92 [SPARK-48974][SQL][SS][ML][MLLIB] Use
`SparkSession.implicits` instead of `SQLContext.implicits`
877c3f2bea92 is described below
commit 877c3f2bea924ca9f3fd5b7e9c6cbfb0fc3be958
Author: yangjie01 <[email protected]>
AuthorDate: Wed Jul 24 10:41:07 2024 +0800
[SPARK-48974][SQL][SS][ML][MLLIB] Use `SparkSession.implicits` instead of
`SQLContext.implicits`
### What changes were proposed in this pull request?
This PR replaces `SQLContext.implicits` with `SparkSession.implicits` in
the Spark codebase.
### Why are the changes needed?
Reduce the usage of code from `SQLContext` within the internal code of
Spark.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GitHub Actions
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47457 from LuciferYang/use-sparksession-implicits.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
.../src/main/scala/org/apache/spark/mllib/util/MLUtils.scala | 2 +-
.../apache/spark/ml/classification/FMClassifierSuite.scala | 4 ++--
.../spark/ml/classification/LogisticRegressionSuite.scala | 12 ++++++------
.../apache/spark/ml/recommendation/CollectTopKSuite.scala | 4 ++--
.../apache/spark/ml/regression/LinearRegressionSuite.scala | 4 ++--
.../test/scala/org/apache/spark/ml/util/MLTestingUtils.scala | 2 +-
.../spark/sql/execution/datasources/csv/CSVUtils.scala | 2 +-
.../org/apache/spark/sql/SparkSessionExtensionSuite.scala | 8 ++++----
.../org/apache/spark/sql/streaming/util/BlockingSource.scala | 2 +-
.../spark/sql/hive/HiveContextCompatibilitySuite.scala | 4 ++--
.../org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +-
.../scala/org/apache/spark/sql/hive/ListTablesSuite.scala | 2 +-
.../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +-
.../spark/sql/hive/execution/HiveResolutionSuite.scala | 2 +-
.../apache/spark/sql/hive/execution/HiveTableScanSuite.scala | 2 +-
.../sql/sources/BucketedWriteWithHiveSupportSuite.scala | 2 +-
16 files changed, 28 insertions(+), 28 deletions(-)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index e23423e4c004..1257d2ccfbfb 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -119,7 +119,7 @@ object MLUtils extends Logging {
).resolveRelation(checkFilesExist = false))
.select("value")
- import lines.sqlContext.implicits._
+ import lines.sparkSession.implicits._
lines.select(trim($"value").as("line"))
.filter(not((length($"line") === 0).or($"line".startsWith("#"))))
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala
index 68e83fccf3d1..ff9ce1ca7b9f 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala
@@ -52,8 +52,8 @@ class FMClassifierSuite extends MLTest with
DefaultReadWriteTest {
}
test("FMClassifier: Predictor, Classifier methods") {
- val sqlContext = smallBinaryDataset.sqlContext
- import sqlContext.implicits._
+ val session = smallBinaryDataset.sparkSession
+ import session.implicits._
val fm = new FMClassifier()
val model = fm.fit(smallBinaryDataset)
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index 8e54262e2f61..b0e275f5e193 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -550,8 +550,8 @@ class LogisticRegressionSuite extends MLTest with
DefaultReadWriteTest {
}
test("multinomial logistic regression: Predictor, Classifier methods") {
- val sqlContext = smallMultinomialDataset.sqlContext
- import sqlContext.implicits._
+ val session = smallMultinomialDataset.sparkSession
+ import session.implicits._
val mlr = new LogisticRegression().setFamily("multinomial")
val model = mlr.fit(smallMultinomialDataset)
@@ -590,8 +590,8 @@ class LogisticRegressionSuite extends MLTest with
DefaultReadWriteTest {
}
test("binary logistic regression: Predictor, Classifier methods") {
- val sqlContext = smallBinaryDataset.sqlContext
- import sqlContext.implicits._
+ val session = smallBinaryDataset.sparkSession
+ import session.implicits._
val lr = new LogisticRegression().setFamily("binomial")
val model = lr.fit(smallBinaryDataset)
@@ -1427,8 +1427,8 @@ class LogisticRegressionSuite extends MLTest with
DefaultReadWriteTest {
val trainer2 = (new
LogisticRegression).setFitIntercept(true).setWeightCol("weight")
.setElasticNetParam(1.0).setRegParam(6.0).setStandardization(false)
- val sqlContext = multinomialDataset.sqlContext
- import sqlContext.implicits._
+ val session = multinomialDataset.sparkSession
+ import session.implicits._
val model1 = trainer1.fit(multinomialDataset)
val model2 = trainer2.fit(multinomialDataset)
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/CollectTopKSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/CollectTopKSuite.scala
index 5f532593d512..b79e10d0d267 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/CollectTopKSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/CollectTopKSuite.scala
@@ -29,8 +29,8 @@ class CollectTopKSuite extends MLTest {
override def beforeAll(): Unit = {
super.beforeAll()
- val sqlContext = spark.sqlContext
- import sqlContext.implicits._
+ val session = spark
+ import session.implicits._
dataFrame = Seq(
(0, 3, 54f),
(0, 4, 44f),
diff --git
a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
index d70af42b1389..7ccd3494bd32 100644
---
a/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
+++
b/mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala
@@ -962,8 +962,8 @@ class LinearRegressionSuite extends MLTest with
DefaultReadWriteTest with PMMLRe
}
test("linear regression with weighted samples") {
- val sqlContext = spark.sqlContext
- import sqlContext.implicits._
+ val session = spark
+ import session.implicits._
val numClasses = 0
def modelEquals(m1: LinearRegressionModel, m2: LinearRegressionModel):
Unit = {
assert(m1.coefficients ~== m2.coefficients relTol 0.01)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
index ad4ac7259165..9f6d4d242289 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
@@ -220,7 +220,7 @@ object MLTestingUtils extends SparkFunSuite {
numClasses: Int,
modelEquals: (M, M) => Unit,
outlierRatio: Int): Unit = {
- import data.sqlContext.implicits._
+ import data.sparkSession.implicits._
val outlierDS = data.withColumn("weight", lit(1.0)).as[Instance].flatMap {
case Instance(l, w, f) =>
val outlierLabel = if (numClasses == 0) -l else numClasses - l - 1
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
index d8b52c503ad3..1a48b81fd7e6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala
@@ -32,7 +32,7 @@ object CSVUtils {
// Note that this was separately made by SPARK-18362. Logically, this
should be the same
// with the one below, `filterCommentAndEmpty` but execution path is
different. One of them
// might have to be removed in the near future if possible.
- import lines.sqlContext.implicits._
+ import lines.sparkSession.implicits._
val aliased = lines.toDF("value")
val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
if (options.isCommentSet) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
index 1f0033a0efcd..6779a9d521c0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala
@@ -183,7 +183,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
.contains(MyQueryStagePrepRule()))
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())))
- import session.sqlContext.implicits._
+ import session.implicits._
val data = Seq((100L), (200L), (300L)).toDF("vals").repartition(1)
val df = data.selectExpr("vals + 1")
df.collect()
@@ -225,7 +225,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
session.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
- import session.sqlContext.implicits._
+ import session.implicits._
// perform a join to inject a shuffle exchange
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
@@ -283,7 +283,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
- import session.sqlContext.implicits._
+ import session.implicits._
// perform a join to inject a broadcast exchange
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
@@ -327,7 +327,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with
SQLHelper with Adapt
try {
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
- import session.sqlContext.implicits._
+ import session.implicits._
val input = Seq((100L), (200L), (300L))
val data = input.toDF("vals").repartition(1)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
index c1b29b5130e8..b083d180d991 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -49,7 +49,7 @@ class BlockingSource extends StreamSourceProvider with
StreamSinkProvider {
override def schema: StructType = fakeSchema
override def getOffset: Option[Offset] = Some(new LongOffset(0))
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
- import spark.implicits._
+ import spark.sparkSession.implicits._
Seq[Int]().toDS().toDF()
}
override def stop(): Unit = {}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
index 935ee90731aa..86555358905d 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala
@@ -54,7 +54,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite {
test("basic operations") {
val _hc = hc
- import _hc.implicits._
+ import _hc.sparkSession.implicits._
val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x")
val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b",
"c")
.select($"a", $"b")
@@ -71,7 +71,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite {
test("basic DDLs") {
val _hc = hc
- import _hc.implicits._
+ import _hc.sparkSession.implicits._
val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
assert(databases.toSeq == Seq("default"))
hc.sql("CREATE DATABASE mee_db")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index e88a37f019b7..69abb1d1673e 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -733,7 +733,7 @@ object SPARK_9757 extends QueryTest {
val hiveContext = new TestHiveContext(sparkContext)
spark = hiveContext.sparkSession
- import hiveContext.implicits._
+ import hiveContext.sparkSession.implicits._
val dir = Utils.createTempDir()
dir.delete()
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index 49cd0885e722..9de5c6aab9cc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -30,7 +30,7 @@ class ListTablesSuite extends QueryTest
with TestHiveSingleton
with BeforeAndAfterAll {
import hiveContext._
- import hiveContext.implicits._
+ import hiveContext.sparkSession.implicits._
val df = sparkContext.parallelize((1 to 10).map(i => (i,
s"str$i"))).toDF("key", "value")
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 24d1e24b30c8..e5e180e7c135 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -47,7 +47,7 @@ case class TestData(a: Int, b: String)
*/
@SlowHiveTest
class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with
BeforeAndAfter {
- import org.apache.spark.sql.hive.test.TestHive.implicits._
+ import org.apache.spark.sql.hive.test.TestHive.sparkSession.implicits._
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 8e2a5a66172b..d7d859f57e5b 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.test.TestHive.{read, sparkContext, sql}
-import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.hive.test.TestHive.sparkSession.implicits._
import org.apache.spark.tags.SlowHiveTest
case class Nested(a: Int, B: Int)
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index 8280b9624fa2..033d86194bda 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.hive.test.TestHive.sparkSession.implicits._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.tags.SlowHiveTest
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
index dcd00d3011ff..d80caca9cd0e 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteWithHiveSupportSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, Expression,
HiveHash, Literal, Pmod}
import org.apache.spark.sql.hive.HiveUtils
-import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.hive.test.TestHive.sparkSession.implicits._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]