This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ae573f176cd [SPARK-53656][SS] Refactor MemoryStream to use 
SparkSession instead of SQLContext
5ae573f176cd is described below

commit 5ae573f176cd1615b8da973ab515f8063664d435
Author: Ganesha S <[email protected]>
AuthorDate: Mon Oct 20 15:44:07 2025 +0900

    [SPARK-53656][SS] Refactor MemoryStream to use SparkSession instead of 
SQLContext
    
    ### What changes were proposed in this pull request?
    
    Refactor MemoryStream to use SparkSession instead of SQLContext.
    
    ### Why are the changes needed?
    
    SQLContext is deprecated in newer versions of Spark.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Verified that the affected tests are passing successfully.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52402 from ganeshashree/SPARK-53656.
    
    Authored-by: Ganesha S <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../sql/execution/streaming/runtime/memory.scala   | 47 +++++++++----
 .../streaming/sources/ContinuousMemoryStream.scala | 52 ++++++++++++---
 .../streaming/sources/LowLatencyMemoryStream.scala | 50 +++++++++++---
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  2 +-
 ...cProgressTrackingMicroBatchExecutionSuite.scala | 44 ++++++------
 .../sql/execution/streaming/MemorySinkSuite.scala  | 78 ++++++++++++++++++++++
 .../streaming/MicroBatchExecutionSuite.scala       |  4 +-
 .../state/StateStoreCoordinatorSuite.scala         | 13 ++--
 .../streaming/state/StateStoreSuite.scala          |  3 +-
 .../streaming/AcceptsLatestSeenOffsetSuite.scala   | 16 ++---
 .../apache/spark/sql/streaming/StreamSuite.scala   |  2 +-
 .../streaming/StreamingQueryListenerSuite.scala    | 12 ++--
 .../sql/streaming/StreamingQueryManagerSuite.scala | 12 ++--
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  2 +-
 .../streaming/TransformWithStateClusterSuite.scala |  4 +-
 .../sql/streaming/TransformWithStateSuite.scala    |  3 +-
 .../sql/streaming/TriggerAvailableNowSuite.scala   |  2 +-
 .../test/DataStreamReaderWriterSuite.scala         |  4 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |  3 +-
 .../spark/sql/pipelines/graph/elements.scala       |  2 +-
 .../graph/ConnectInvalidPipelineSuite.scala        |  5 +-
 .../graph/ConnectValidPipelineSuite.scala          |  6 ++
 .../pipelines/graph/MaterializeTablesSuite.scala   |  7 +-
 .../sql/pipelines/graph/SystemMetadataSuite.scala  |  4 ++
 .../graph/TriggeredGraphExecutionSuite.scala       |  6 +-
 25 files changed, 283 insertions(+), 100 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/memory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/memory.scala
index 4230797eca46..bf67ed670ec8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/memory.scala
@@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.{Encoder, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -43,32 +43,53 @@ import 
org.apache.spark.sql.internal.connector.SimpleTableProvider
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-object MemoryStream {
+object MemoryStream extends LowPriorityMemoryStreamImplicits {
   protected val currentBlockId = new AtomicInteger(0)
   protected val memoryStreamId = new AtomicInteger(0)
 
-  def apply[A : Encoder](implicit sqlContext: SQLContext): MemoryStream[A] =
-    new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+  def apply[A : Encoder](implicit sparkSession: SparkSession): MemoryStream[A] 
=
+    new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession)
 
-  def apply[A : Encoder](numPartitions: Int)(implicit sqlContext: SQLContext): 
MemoryStream[A] =
-    new MemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext, 
Some(numPartitions))
+  def apply[A : Encoder](numPartitions: Int)(implicit sparkSession: 
SparkSession): MemoryStream[A] =
+    new MemoryStream[A](memoryStreamId.getAndIncrement(), sparkSession, 
Some(numPartitions))
+}
+
+/**
+ * Provides lower-priority implicits for MemoryStream to prevent ambiguity 
when both
+ * SparkSession and SQLContext are in scope. The implicits in the companion 
object,
+ * which use SparkSession, take higher precedence.
+ */
+trait LowPriorityMemoryStreamImplicits {
+  this: MemoryStream.type =>
+
+  // Deprecated: Used when an implicit SQLContext is in scope
+  @deprecated("Use MemoryStream.apply with an implicit SparkSession instead of 
SQLContext", "4.1.0")
+  def apply[A: Encoder]()(implicit sqlContext: SQLContext): MemoryStream[A] =
+    new MemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext.sparkSession)
+
+  @deprecated("Use MemoryStream.apply with an implicit SparkSession instead of 
SQLContext", "4.1.0")
+  def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext): 
MemoryStream[A] =
+    new MemoryStream[A](
+      memoryStreamId.getAndIncrement(),
+      sqlContext.sparkSession,
+      Some(numPartitions))
 }
 
 /**
  * A base class for memory stream implementations. Supports adding data and 
resetting.
  */
-abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends 
SparkDataStream {
+abstract class MemoryStreamBase[A : Encoder](sparkSession: SparkSession) 
extends SparkDataStream {
   val encoder = encoderFor[A]
   protected val attributes = toAttributes(encoder.schema)
 
   protected lazy val toRow: ExpressionEncoder.Serializer[A] = 
encoder.createSerializer()
 
   def toDS(): Dataset[A] = {
-    Dataset[A](sqlContext.sparkSession, logicalPlan)
+    Dataset[A](sparkSession, logicalPlan)
   }
 
   def toDF(): DataFrame = {
-    Dataset.ofRows(sqlContext.sparkSession, logicalPlan)
+    Dataset.ofRows(sparkSession, logicalPlan)
   }
 
   def addData(data: A*): OffsetV2 = {
@@ -156,16 +177,16 @@ class MemoryStreamScanBuilder(stream: 
MemoryStreamBase[_]) extends ScanBuilder w
  */
 case class MemoryStream[A : Encoder](
     id: Int,
-    sqlContext: SQLContext,
+    sparkSession: SparkSession,
     numPartitions: Option[Int] = None)
   extends MemoryStreamBaseClass[A](
-    id, sqlContext, numPartitions = numPartitions)
+    id, sparkSession, numPartitions = numPartitions)
 
 abstract class MemoryStreamBaseClass[A: Encoder](
     id: Int,
-    sqlContext: SQLContext,
+    sparkSession: SparkSession,
     numPartitions: Option[Int] = None)
-  extends MemoryStreamBase[A](sqlContext)
+  extends MemoryStreamBase[A](sparkSession)
   with MicroBatchStream
   with SupportsTriggerAvailableNow
   with Logging {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
index 03884d02faeb..8042cacf1374 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala
@@ -27,7 +27,7 @@ import org.json4s.jackson.Serialization
 
 import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.{Encoder, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.connector.read.InputPartition
@@ -44,8 +44,11 @@ import org.apache.spark.util.RpcUtils
  *    ContinuousMemoryStreamInputPartitionReader instances to poll. It returns 
the record at
  *    the specified offset within the list, or null if that offset doesn't yet 
have a record.
  */
-class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, 
numPartitions: Int = 2)
-  extends MemoryStreamBase[A](sqlContext) with ContinuousStream {
+class ContinuousMemoryStream[A : Encoder](
+    id: Int,
+    sparkSession: SparkSession,
+    numPartitions: Int = 2)
+  extends MemoryStreamBase[A](sparkSession) with ContinuousStream {
 
   private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
@@ -109,14 +112,47 @@ class ContinuousMemoryStream[A : Encoder](id: Int, 
sqlContext: SQLContext, numPa
   override def commit(end: Offset): Unit = {}
 }
 
-object ContinuousMemoryStream {
+object ContinuousMemoryStream extends 
LowPriorityContinuousMemoryStreamImplicits {
   protected val memoryStreamId = new AtomicInteger(0)
 
-  def apply[A : Encoder](implicit sqlContext: SQLContext): 
ContinuousMemoryStream[A] =
-    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+  def apply[A : Encoder](implicit sparkSession: SparkSession): 
ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sparkSession)
 
-  def singlePartition[A : Encoder](implicit sqlContext: SQLContext): 
ContinuousMemoryStream[A] =
-    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext, 1)
+  def apply[A : Encoder](numPartitions: Int)(implicit sparkSession: 
SparkSession):
+  ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sparkSession, numPartitions)
+
+  def singlePartition[A : Encoder](implicit sparkSession: SparkSession): 
ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sparkSession, 1)
+}
+
+/**
+ * Provides lower-priority implicits for ContinuousMemoryStream to prevent 
ambiguity when both
+ * SparkSession and SQLContext are in scope. The implicits in the companion 
object,
+ * which use SparkSession, take higher precedence.
+ */
+trait LowPriorityContinuousMemoryStreamImplicits {
+  this: ContinuousMemoryStream.type =>
+
+  // Deprecated: Used when an implicit SQLContext is in scope
+  @deprecated("Use ContinuousMemoryStream with an implicit SparkSession " +
+    "instead of SQLContext", "4.1.0")
+  def apply[A: Encoder]()(implicit sqlContext: SQLContext): 
ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext.sparkSession)
+
+  @deprecated("Use ContinuousMemoryStream with an implicit SparkSession " +
+    "instead of SQLContext", "4.1.0")
+  def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext):
+  ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](
+      memoryStreamId.getAndIncrement(),
+      sqlContext.sparkSession,
+      numPartitions)
+
+  @deprecated("Use ContinuousMemoryStream.singlePartition with an implicit 
SparkSession " +
+    "instead of SQLContext", "4.1.0")
+  def singlePartition[A: Encoder]()(implicit sqlContext: SQLContext): 
ContinuousMemoryStream[A] =
+    new ContinuousMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext.sparkSession, 1)
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
index ee7661442e41..d04f4b5d011c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/LowLatencyMemoryStream.scala
@@ -27,7 +27,7 @@ import org.json4s.jackson.Serialization
 
 import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
-import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.{Encoder, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.connector.read.InputPartition
@@ -60,10 +60,10 @@ import org.apache.spark.util.{Clock, RpcUtils}
  */
 class LowLatencyMemoryStream[A: Encoder](
     id: Int,
-    sqlContext: SQLContext,
+    sparkSession: SparkSession,
     numPartitions: Int = 2,
     clock: Clock = LowLatencyClock.getClock)
-    extends MemoryStreamBaseClass[A](0, sqlContext)
+    extends MemoryStreamBaseClass[A](0, sparkSession)
     with SupportsRealTimeMode {
   private implicit val formats: Formats = Serialization.formats(NoTypeHints)
 
@@ -172,23 +172,53 @@ class LowLatencyMemoryStream[A: Encoder](
   }
 }
 
-object LowLatencyMemoryStream {
+object LowLatencyMemoryStream extends 
LowPriorityLowLatencyMemoryStreamImplicits {
   protected val memoryStreamId = new AtomicInteger(0)
 
-  def apply[A: Encoder](implicit sqlContext: SQLContext): 
LowLatencyMemoryStream[A] =
-    new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), sqlContext)
+  def apply[A: Encoder](implicit sparkSession: SparkSession): 
LowLatencyMemoryStream[A] =
+    new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), 
sparkSession)
 
   def apply[A: Encoder](numPartitions: Int)(
       implicit
-      sqlContext: SQLContext): LowLatencyMemoryStream[A] =
+      sparkSession: SparkSession): LowLatencyMemoryStream[A] =
     new LowLatencyMemoryStream[A](
       memoryStreamId.getAndIncrement(),
-      sqlContext,
+      sparkSession,
       numPartitions = numPartitions
     )
 
-  def singlePartition[A: Encoder](implicit sqlContext: SQLContext): 
LowLatencyMemoryStream[A] =
-    new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext, 1)
+  def singlePartition[A: Encoder](implicit sparkSession: SparkSession): 
LowLatencyMemoryStream[A] =
+    new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), 
sparkSession, 1)
+}
+
+/**
+ * Provides lower-priority implicits for LowLatencyMemoryStream to prevent 
ambiguity when both
+ * SparkSession and SQLContext are in scope. The implicits in the companion 
object,
+ * which use SparkSession, take higher precedence.
+ */
+trait LowPriorityLowLatencyMemoryStreamImplicits {
+  this: LowLatencyMemoryStream.type =>
+
+  // Deprecated: Used when an implicit SQLContext is in scope
+  @deprecated("Use LowLatencyMemoryStream with an implicit SparkSession " +
+    "instead of SQLContext", "4.1.0")
+  def apply[A: Encoder]()(implicit sqlContext: SQLContext): 
LowLatencyMemoryStream[A] =
+    new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext.sparkSession)
+
+  @deprecated("Use LowLatencyMemoryStream with an implicit SparkSession " +
+    "instead of SQLContext", "4.1.0")
+  def apply[A: Encoder](numPartitions: Int)(implicit sqlContext: SQLContext):
+  LowLatencyMemoryStream[A] =
+    new LowLatencyMemoryStream[A](
+      memoryStreamId.getAndIncrement(),
+      sqlContext.sparkSession,
+      numPartitions = numPartitions
+    )
+
+  @deprecated("Use LowLatencyMemoryStream.singlePartition with an implicit 
SparkSession " +
+    "instead of SQLContext", "4.1.0")
+  def singlePartition[A: Encoder]()(implicit sqlContext: SQLContext): 
LowLatencyMemoryStream[A] =
+    new LowLatencyMemoryStream[A](memoryStreamId.getAndIncrement(), 
sqlContext.sparkSession, 1)
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 653ad7bc3433..941fd2205424 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1012,7 +1012,7 @@ class DatasetSuite extends QueryTest
     assert(err.getMessage.contains("An Observation can be used with a Dataset 
only once"))
 
     // streaming datasets are not supported
-    val streamDf = new MemoryStream[Int](0, sqlContext).toDF()
+    val streamDf = new MemoryStream[Int](0, spark).toDF()
     val streamObservation = Observation("stream")
     val streamErr = intercept[IllegalArgumentException] {
       streamDf.observe(streamObservation, 
avg($"value").cast("int").as("avg_val"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
index 218b66b77946..e31e0e70cf39 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/AsyncProgressTrackingMicroBatchExecutionSuite.scala
@@ -67,9 +67,9 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
 
   class MemoryStreamCapture[A: Encoder](
       id: Int,
-      sqlContext: SQLContext,
+      sparkSession: SparkSession,
       numPartitions: Option[Int] = None)
-    extends MemoryStream[A](id, sqlContext, numPartitions = numPartitions) {
+    extends MemoryStream[A](id, sparkSession, numPartitions = numPartitions) {
 
     val commits = new ListBuffer[streaming.Offset]()
     val commitThreads = new ListBuffer[Thread]()
@@ -136,7 +136,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   test("async WAL commits recovery") {
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDF()
 
     var index = 0
@@ -204,7 +204,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("async WAL commits turn on and off") {
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDS()
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
@@ -308,7 +308,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("Fail with once trigger") {
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDF()
 
     val e = intercept[IllegalArgumentException] {
@@ -323,7 +323,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
 
   test("Fail with available now trigger") {
 
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDF()
 
     val e = intercept[IllegalArgumentException] {
@@ -339,7 +339,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   test("switching between async wal commit enabled and trigger once") {
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDF()
 
     var index = 0
@@ -500,7 +500,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   test("switching between async wal commit enabled and available now") {
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDF()
 
     var index = 0
@@ -669,7 +669,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   def testAsyncWriteErrorsAlreadyExists(path: String): Unit = {
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDS()
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
@@ -720,7 +720,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   def testAsyncWriteErrorsPermissionsIssue(path: String): Unit = {
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDS()
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
     val commitDir = new File(checkpointLocation + path)
@@ -778,7 +778,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
 
-    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData = new MemoryStreamCapture[Int](id = 0, spark)
 
     val ds = inputData.toDF()
 
@@ -852,7 +852,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("interval commits and recovery") {
-    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData = new MemoryStreamCapture[Int](id = 0, spark)
     val ds = inputData.toDS()
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
@@ -934,7 +934,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("recovery when first offset is not zero and not commit log entries") {
-    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData = new MemoryStreamCapture[Int](id = 0, spark)
     val ds = inputData.toDS()
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
@@ -961,7 +961,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
     /**
      * start new stream
      */
-    val inputData2 = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData2 = new MemoryStreamCapture[Int](id = 0, spark)
     val ds2 = inputData2.toDS()
     testStream(ds2, extraOptions = Map(
       ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
@@ -995,7 +995,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("recovery non-contiguous log") {
-    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData = new MemoryStreamCapture[Int](id = 0, spark)
     val ds = inputData.toDS()
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
@@ -1088,7 +1088,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("Fail on pipelines using unsupported sinks") {
-    val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+    val inputData = new MemoryStream[Int](id = 0, spark)
     val ds = inputData.toDF()
 
     val e = intercept[IllegalArgumentException] {
@@ -1109,7 +1109,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
 
     withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2", 
SQLConf.ASYNC_LOG_PURGE.key -> "false") {
       withTempDir { checkpointLocation =>
-        val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+        val inputData = new MemoryStreamCapture[Int](id = 0, spark)
         val ds = inputData.toDS()
 
         val clock = new StreamManualClock
@@ -1243,7 +1243,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   test("with async log purging") {
     withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2", 
SQLConf.ASYNC_LOG_PURGE.key -> "true") {
       withTempDir { checkpointLocation =>
-        val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+        val inputData = new MemoryStreamCapture[Int](id = 0, spark)
         val ds = inputData.toDS()
 
         val clock = new StreamManualClock
@@ -1381,7 +1381,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("test multiple gaps in offset and commit logs") {
-    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData = new MemoryStreamCapture[Int](id = 0, spark)
     val ds = inputData.toDS()
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
@@ -1427,7 +1427,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
     /**
      * start new stream
      */
-    val inputData2 = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData2 = new MemoryStreamCapture[Int](id = 0, spark)
     val ds2 = inputData2.toDS()
     testStream(ds2, extraOptions = Map(
       ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
@@ -1460,7 +1460,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
   }
 
   test("recovery when gaps exist in offset and commit log") {
-    val inputData = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData = new MemoryStreamCapture[Int](id = 0, spark)
     val ds = inputData.toDS()
 
     val checkpointLocation = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
@@ -1494,7 +1494,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
     /**
      * start new stream
      */
-    val inputData2 = new MemoryStreamCapture[Int](id = 0, sqlContext = 
sqlContext)
+    val inputData2 = new MemoryStreamCapture[Int](id = 0, spark)
     val ds2 = inputData2.toDS()
     testStream(ds2, extraOptions = Map(
       ASYNC_PROGRESS_TRACKING_ENABLED -> "true",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
index 4ec44eac22e3..e0ec3fd1b907 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala
@@ -343,6 +343,84 @@ class MemorySinkSuite extends StreamTest with 
BeforeAndAfter {
       intsToDF(expected)(schema))
   }
 
+  test("LowPriorityMemoryStreamImplicits works with implicit sqlContext") {
+    // Test that MemoryStream can be created using implicit sqlContext
+    implicit val sqlContext: SQLContext = spark.sqlContext
+
+    // Test MemoryStream[A]() with implicit sqlContext
+    val stream1 = MemoryStream[Int]()
+    assert(stream1 != null)
+
+    // Test MemoryStream[A](numPartitions) with implicit sqlContext
+    val stream2 = MemoryStream[String](3)
+    assert(stream2 != null)
+
+    // Verify the streams work correctly
+    stream1.addData(1, 2, 3)
+    val df1 = stream1.toDF()
+    assert(df1.schema.fieldNames.contains("value"))
+
+    stream2.addData("a", "b", "c")
+    val df2 = stream2.toDF()
+    assert(df2.schema.fieldNames.contains("value"))
+  }
+
+  test("LowPriorityContinuousMemoryStreamImplicits works with implicit 
sqlContext") {
+    import 
org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream
+    // Test that ContinuousMemoryStream can be created using implicit 
sqlContext
+    implicit val sqlContext: SQLContext = spark.sqlContext
+
+    // Test ContinuousMemoryStream[A]() with implicit sqlContext
+    val stream1 = ContinuousMemoryStream[Int]()
+    assert(stream1 != null)
+
+    // Test ContinuousMemoryStream[A](numPartitions) with implicit sqlContext
+    val stream2 = ContinuousMemoryStream[String](3)
+    assert(stream2 != null)
+
+    // Test ContinuousMemoryStream.singlePartition with implicit sqlContext
+    val stream3 = ContinuousMemoryStream.singlePartition[Int]()
+    assert(stream3 != null)
+
+    // Verify the streams work correctly
+    stream1.addData(Seq(1, 2, 3))
+    stream2.addData(Seq("a", "b", "c"))
+    stream3.addData(Seq(10, 20))
+
+    // Basic verification that streams are functional
+    assert(stream1.initialOffset() != null)
+    assert(stream2.initialOffset() != null)
+    assert(stream3.initialOffset() != null)
+  }
+
+  test("LowPriorityLowLatencyMemoryStreamImplicits works with implicit 
sqlContext") {
+    import org.apache.spark.sql.execution.streaming.LowLatencyMemoryStream
+    // Test that LowLatencyMemoryStream can be created using implicit 
sqlContext
+    implicit val sqlContext: SQLContext = spark.sqlContext
+
+    // Test LowLatencyMemoryStream[A]() with implicit sqlContext
+    val stream1 = LowLatencyMemoryStream[Int]()
+    assert(stream1 != null)
+
+    // Test LowLatencyMemoryStream[A](numPartitions) with implicit sqlContext
+    val stream2 = LowLatencyMemoryStream[String](3)
+    assert(stream2 != null)
+
+    // Test LowLatencyMemoryStream.singlePartition with implicit sqlContext
+    val stream3 = LowLatencyMemoryStream.singlePartition[Int]()
+    assert(stream3 != null)
+
+    // Verify the streams work correctly
+    stream1.addData(Seq(1, 2, 3))
+    stream2.addData(Seq("a", "b", "c"))
+    stream3.addData(Seq(10, 20))
+
+    // Basic verification that streams are functional
+    assert(stream1.initialOffset() != null)
+    assert(stream2.initialOffset() != null)
+    assert(stream3.initialOffset() != null)
+  }
+
   private implicit def intsToDF(seq: Seq[Int])(implicit schema: StructType): 
DataFrame = {
     require(schema.fields.length === 1)
     sqlContext.createDataset(seq).toDF(schema.fieldNames.head)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index 3fec6e816b83..bd5dc846fd58 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -54,7 +54,7 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter with Match
   test("async log purging") {
     withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2", 
SQLConf.ASYNC_LOG_PURGE.key -> "true") {
       withTempDir { checkpointLocation =>
-        val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+        val inputData = new MemoryStream[Int](id = 0, spark)
         val ds = inputData.toDS()
         testStream(ds)(
           StartStream(checkpointLocation = 
checkpointLocation.getCanonicalPath),
@@ -99,7 +99,7 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter with Match
   test("error notifier test") {
     withSQLConf(SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2", 
SQLConf.ASYNC_LOG_PURGE.key -> "true") {
       withTempDir { checkpointLocation =>
-        val inputData = new MemoryStream[Int](id = 0, sqlContext = sqlContext)
+        val inputData = new MemoryStream[Int](id = 0, spark)
         val ds = inputData.toDS()
         val e = intercept[StreamingQueryException] {
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 723bb0a87623..79bcdbca9ec6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -123,11 +123,10 @@ class StateStoreCoordinatorSuite extends SparkFunSuite 
with SharedSparkContext {
   test("query stop deactivates related store providers") {
     var coordRef: StateStoreCoordinatorRef = null
     try {
-      val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
+      implicit val spark: SparkSession = 
SparkSession.builder().sparkContext(sc).getOrCreate()
       SparkSession.setActiveSession(spark)
       import spark.implicits._
       coordRef = spark.streams.stateStoreCoordinator
-      implicit val sqlContext = spark.sqlContext
       spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1")
 
       // Start a query and run a batch to load state stores
@@ -254,7 +253,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
       ) {
         case (coordRef, spark) =>
           import spark.implicits._
-          implicit val sqlContext = spark.sqlContext
+          implicit val sparkSession: SparkSession = spark
           val inputData = MemoryStream[Int]
           val query = setUpStatefulQuery(inputData, "query")
           // Add, commit, and wait multiple times to force snapshot versions 
and time difference
@@ -290,7 +289,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
       ) {
         case (coordRef, spark) =>
           import spark.implicits._
-          implicit val sqlContext = spark.sqlContext
+          implicit val sparkSession: SparkSession = spark
           // Start a join query and run some data to force snapshot uploads
           val input1 = MemoryStream[Int]
           val input2 = MemoryStream[Int]
@@ -333,7 +332,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
     ) {
       case (coordRef, spark) =>
         import spark.implicits._
-        implicit val sqlContext = spark.sqlContext
+        implicit val sparkSession: SparkSession = spark
         // Start and run two queries together with some data to force snapshot 
uploads
         val input1 = MemoryStream[Int]
         val input2 = MemoryStream[Int]
@@ -400,7 +399,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
     ) {
       case (coordRef, spark) =>
         import spark.implicits._
-        implicit val sqlContext = spark.sqlContext
+        implicit val sparkSession: SparkSession = spark
         // Start a query and run some data to force snapshot uploads
         val inputData = MemoryStream[Int]
         val query = setUpStatefulQuery(inputData, "query")
@@ -444,7 +443,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with 
SharedSparkContext {
     ) {
       case (coordRef, spark) =>
         import spark.implicits._
-        implicit val sqlContext = spark.sqlContext
+        implicit val sparkSession: SparkSession = spark
         // Start a query and run some data to force snapshot uploads
         val inputData = MemoryStream[Int]
         val query = setUpStatefulQuery(inputData, "query")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 0b1483241b92..1acf239df85b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -1206,9 +1206,8 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
   test("SPARK-21145: Restarted queries create new provider instances") {
     try {
       val checkpointLocation = Utils.createTempDir().getAbsoluteFile
-      val spark = SparkSession.builder().master("local[2]").getOrCreate()
+      implicit val spark: SparkSession = 
SparkSession.builder().master("local[2]").getOrCreate()
       SparkSession.setActiveSession(spark)
-      implicit val sqlContext = spark.sqlContext
       spark.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, "1")
       import spark.implicits._
       val inputData = MemoryStream[Int]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/AcceptsLatestSeenOffsetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/AcceptsLatestSeenOffsetSuite.scala
index 2a4abd99f6c1..6a89d39d1e27 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/AcceptsLatestSeenOffsetSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/AcceptsLatestSeenOffsetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.{Encoder}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.connector.read.streaming
@@ -62,7 +62,7 @@ class AcceptsLatestSeenOffsetSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("DataSource V2 source with micro-batch") {
-    val inputData = new TestMemoryStream[Long](0, spark.sqlContext)
+    val inputData = new TestMemoryStream[Long](0, spark)
     val df = inputData.toDF().select("value")
 
     /** Add data to this test source by incrementing its available offset */
@@ -110,7 +110,7 @@ class AcceptsLatestSeenOffsetSuite extends StreamTest with 
BeforeAndAfter {
     //  Test case: when the query is restarted, we expect the execution to 
call `latestSeenOffset`
     //  first. Later as part of the execution, execution may call 
`initialOffset` if the previous
     //  run of the query had no committed batches.
-    val inputData = new TestMemoryStream[Long](0, spark.sqlContext)
+    val inputData = new TestMemoryStream[Long](0, spark)
     val df = inputData.toDF().select("value")
 
     /** Add data to this test source by incrementing its available offset */
@@ -152,7 +152,7 @@ class AcceptsLatestSeenOffsetSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("DataSource V2 source with continuous mode") {
-    val inputData = new TestContinuousMemoryStream[Long](0, spark.sqlContext, 
1)
+    val inputData = new TestContinuousMemoryStream[Long](0, spark, 1)
     val df = inputData.toDF().select("value")
 
     /** Add data to this test source by incrementing its available offset */
@@ -233,9 +233,9 @@ class AcceptsLatestSeenOffsetSuite extends StreamTest with 
BeforeAndAfter {
 
   class TestMemoryStream[A : Encoder](
       _id: Int,
-      _sqlContext: SQLContext,
+      _sparkSession: SparkSession,
       _numPartitions: Option[Int] = None)
-    extends MemoryStream[A](_id, _sqlContext, _numPartitions)
+    extends MemoryStream[A](_id, _sparkSession, _numPartitions)
     with AcceptsLatestSeenOffset {
 
     @volatile var latestSeenOffset: streaming.Offset = null
@@ -260,9 +260,9 @@ class AcceptsLatestSeenOffsetSuite extends StreamTest with 
BeforeAndAfter {
 
   class TestContinuousMemoryStream[A : Encoder](
       _id: Int,
-      _sqlContext: SQLContext,
+      _sparkSession: SparkSession,
       _numPartitions: Int = 2)
-    extends ContinuousMemoryStream[A](_id, _sqlContext, _numPartitions)
+    extends ContinuousMemoryStream[A](_id, _sparkSession, _numPartitions)
     with AcceptsLatestSeenOffset {
 
     @volatile var latestSeenOffset: streaming.Offset = _
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index cbb2eba7ecc8..2ae0de640aaf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -116,7 +116,7 @@ class StreamSuite extends StreamTest {
     val memoryStream = MemoryStream[Int]
     val executionRelation = StreamingExecutionRelation(
       memoryStream, toAttributes(memoryStream.encoder.schema), None)(
-      memoryStream.sqlContext.sparkSession)
+      memoryStream.sparkSession)
     assert(executionRelation.computeStats().sizeInBytes ==
       spark.sessionState.conf.defaultSizeInBytes)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index e1d44efc172e..4eabc82281e1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -63,7 +63,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
   private def testSingleListenerBasic(listener: EventCollector): Unit = {
     val clock = new StreamManualClock
-    val inputData = new MemoryStream[Int](0, sqlContext)
+    val inputData = new MemoryStream[Int](0, spark)
     val df = inputData.toDS().as[Long].map { 10 / _ }
 
     case class AssertStreamExecThreadToWaitForClock()
@@ -333,7 +333,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       spark.streams.addListener(listener)
       try {
         var numTriggers = 0
-        val input = new MemoryStream[Int](0, sqlContext) {
+        val input = new MemoryStream[Int](0, spark) {
           override def latestOffset(startOffset: OffsetV2, limit: ReadLimit): 
OffsetV2 = {
             numTriggers += 1
             super.latestOffset(startOffset, limit)
@@ -375,7 +375,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       collector.reset()
       session.sparkContext.addJobTag(jobTag1)
       session.sparkContext.addJobTag(jobTag2)
-      val mem = MemoryStream[Int](implicitly[Encoder[Int]], session.sqlContext)
+      val mem = MemoryStream[Int](implicitly[Encoder[Int]], session)
       testStream(mem.toDS())(
         AddData(mem, 1, 2, 3),
         CheckAnswer(1, 2, 3)
@@ -400,7 +400,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     def runQuery(session: SparkSession): Unit = {
       collector1.reset()
       collector2.reset()
-      val mem = MemoryStream[Int](implicitly[Encoder[Int]], session.sqlContext)
+      val mem = MemoryStream[Int](implicitly[Encoder[Int]], session)
       testStream(mem.toDS())(
         AddData(mem, 1, 2, 3),
         CheckAnswer(1, 2, 3)
@@ -468,7 +468,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("listener propagates observable metrics") {
     import org.apache.spark.sql.functions._
     val clock = new StreamManualClock
-    val inputData = new MemoryStream[Int](0, sqlContext)
+    val inputData = new MemoryStream[Int](0, spark)
     val df = inputData.toDF()
       .observe(
         name = "my_event",
@@ -564,7 +564,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       }
 
       try {
-        val input = new MemoryStream[Int](0, sqlContext)
+        val input = new MemoryStream[Int](0, spark)
         val clock = new StreamManualClock()
         val result = input.toDF().select("value")
         testStream(result)(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index c0a123a2895c..e42050e088a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -273,8 +273,8 @@ class StreamingQueryManagerSuite extends StreamTest {
   testQuietly("can start a streaming query with the same name in a different 
session") {
     val session2 = spark.cloneSession()
 
-    val ds1 = MemoryStream(Encoders.INT, spark.sqlContext).toDS()
-    val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+    val ds1 = MemoryStream(Encoders.INT, spark).toDS()
+    val ds2 = MemoryStream(Encoders.INT, session2).toDS()
     val queryName = "abc"
 
     val query1 = ds1.writeStream.format("noop").queryName(queryName).start()
@@ -347,8 +347,8 @@ class StreamingQueryManagerSuite extends StreamTest {
       withTempDir { dir =>
         val session2 = spark.cloneSession()
 
-        val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
-        val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+        val ms1 = MemoryStream(Encoders.INT, spark)
+        val ds2 = MemoryStream(Encoders.INT, session2).toDS()
         val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
         val dataLocation = new File(dir, "data").getCanonicalPath
 
@@ -376,8 +376,8 @@ class StreamingQueryManagerSuite extends StreamTest {
         withTempDir { dir =>
           val session2 = spark.cloneSession()
 
-          val ms1 = MemoryStream(Encoders.INT, spark.sqlContext)
-          val ds2 = MemoryStream(Encoders.INT, session2.sqlContext).toDS()
+          val ms1 = MemoryStream(Encoders.INT, spark)
+          val ds2 = MemoryStream(Encoders.INT, session2).toDS()
           val chkLocation = new File(dir, "_checkpoint").getCanonicalPath
           val dataLocation = new File(dir, "data").getCanonicalPath
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 7ea53d41a150..82c6f18955af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -230,7 +230,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     clock = new StreamManualClock
 
     /** Custom MemoryStream that waits for manual clock to reach a time */
-    val inputData = new MemoryStream[Int](0, sqlContext) {
+    val inputData = new MemoryStream[Int](0, spark) {
 
       private def dataAdded: Boolean = currentOffset.offset != -1
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala
index f6f3b2bd8b79..414e8e418f95 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateClusterSuite.scala
@@ -138,7 +138,7 @@ class TransformWithStateClusterSuite extends StreamTest 
with TransformWithStateC
   testWithAndWithoutImplicitEncoders("streaming with transformWithState - " +
    "without initial state") { (spark, useImplicits) =>
     import spark.implicits._
-    val input = MemoryStream(Encoders.STRING, spark.sqlContext)
+    val input = MemoryStream(Encoders.STRING, spark)
     val agg = input.toDS()
       .groupByKey(x => x)
       .transformWithState(new FruitCountStatefulProcessor(useImplicits),
@@ -180,7 +180,7 @@ class TransformWithStateClusterSuite extends StreamTest 
with TransformWithStateC
     val fruitCountInitial = fruitCountInitialDS
       .groupByKey(x => x)
 
-    val input = MemoryStream(Encoders.STRING, spark.sqlContext)
+    val input = MemoryStream(Encoders.STRING, spark)
     val agg = input.toDS()
       .groupByKey(x => x)
       .transformWithState(new 
FruitCountStatefulProcessorWithInitialState(useImplicits),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 2a1ec4c7ab61..aac858719196 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.{SparkException, 
SparkRuntimeException, SparkUnsupported
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Encoders, Row}
 import org.apache.spark.sql.catalyst.util.stringToFile
+import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.execution.datasources.v2.state.StateSourceOptions
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import 
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager
@@ -1008,7 +1009,7 @@ abstract class TransformWithStateSuite extends 
StateStoreMetricsTest
   }
 
   test("transformWithState - lazy iterators can properly get/set keyed state") 
{
-    val spark = this.spark
+    implicit val spark: SparkSession = this.spark
     import spark.implicits._
 
     class ProcessorWithLazyIterators
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
index 3741ee8ab1fe..7b4338dff6b2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TriggerAvailableNowSuite.scala
@@ -98,7 +98,7 @@ class TriggerAvailableNowSuite extends FileStreamSourceTest {
   }
 
   class TestMicroBatchStream extends TestDataFrameProvider {
-    private lazy val memoryStream = MemoryStream[Long](0, spark.sqlContext)
+    private lazy val memoryStream = MemoryStream[Long](0, spark)
 
     override def toDF: DataFrame = memoryStream.toDF()
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 74db2a3843d7..2a186a9296f4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -526,7 +526,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   private def testMemorySinkCheckpointRecovery(chkLoc: String, 
provideInWriter: Boolean): Unit = {
-    val ms = new MemoryStream[Int](0, sqlContext)
+    val ms = new MemoryStream[Int](0, spark)
     val df = ms.toDF().toDF("a")
     val tableName = "test"
     def startQuery: StreamingQuery = {
@@ -585,7 +585,7 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
 
   test("append mode memory sink's do not support checkpoint recovery") {
     import testImplicits._
-    val ms = new MemoryStream[Int](0, sqlContext)
+    val ms = new MemoryStream[Int](0, spark)
     val df = ms.toDF().toDF("a")
     val checkpointLoc = newMetadataDir
     val checkpointDir = new File(checkpointLoc, "offsets")
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 769b633a9c52..caa4ca4581b4 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
+import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, 
Identifier, TableChange, TableInfo}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
@@ -2654,7 +2655,7 @@ class HiveDDLSuite
     import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
     import testImplicits._
 
-    implicit val _sqlContext = spark.sqlContext
+    implicit val sparkSession: SparkSession = spark
 
     withTempView("t1") {
       Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", 
"word").createOrReplaceTempView("t1")
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
index e2c3fdf7994e..87e01ed2021e 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala
@@ -203,7 +203,7 @@ case class VirtualTableInput(
     // create empty streaming/batch df based on input type.
     def createEmptyDF(schema: StructType): DataFrame = readOptions match {
       case _: StreamingReadOptions =>
-        MemoryStream[Row](ExpressionEncoder(schema, lenient = false), 
spark.sqlContext)
+        MemoryStream[Row](ExpressionEncoder(schema, lenient = false), spark)
           .toDF()
       case _ => spark.createDataFrame(new util.ArrayList[Row](), schema)
     }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index f37716b4a24d..7c8181b5b72a 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.pipelines.graph
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.pipelines.utils.{PipelineTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
@@ -423,6 +423,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     val p = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val mem = MemoryStream[Int]
       mem.addData(1)
       registerPersistedView("a", query = dfFlowFunc(mem.toDF()))
@@ -466,6 +467,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     val graph = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       registerMaterializedView("a", query = 
dfFlowFunc(MemoryStream[Int].toDF()))
     }.resolveToDataflowGraph()
 
@@ -489,6 +491,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with 
SharedSparkSession {
 
     val graph = new TestGraphRegistrationContext(spark) {
       registerTable("a")
+      implicit val sparkSession: SparkSession = spark
       registerFlow(
         destinationName = "a",
         name = "once_flow",
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
index 3ac3c0901750..a4bb7c067d87 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.pipelines.graph
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.Union
@@ -158,6 +159,7 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     class P extends TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val ints = MemoryStream[Int]
       ints.addData(1, 2, 3, 4)
       registerPersistedView("a", query = dfFlowFunc(ints.toDF()))
@@ -199,6 +201,7 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     class P extends TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val ints1 = MemoryStream[Int]
       ints1.addData(1, 2, 3, 4)
       val ints2 = MemoryStream[Int]
@@ -359,6 +362,7 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     class P extends TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val mem = MemoryStream[Int]
       registerPersistedView("a", query = dfFlowFunc(mem.toDF()))
       registerTable("b")
@@ -402,6 +406,7 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     val graph = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val mem = MemoryStream[Int]
       mem.addData(1, 2)
       registerPersistedView("complete-view", query = dfFlowFunc(Seq(1, 
2).toDF("x")))
@@ -494,6 +499,7 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     import session.implicits._
 
     val P = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val mem = MemoryStream[Int]
       mem.addData(1, 2)
       registerTemporaryView("a", query = dfFlowFunc(mem.toDF().select($"value" 
as "x")))
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
index 17bddcd446b1..31afc5a27a54 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/MaterializeTablesSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.graph
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
TableCatalog}
 import org.apache.spark.sql.connector.expressions.Expressions
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
@@ -267,8 +268,8 @@ abstract class MaterializeTablesSuite extends 
BaseCoreExecutionTest {
 
   test("invalid schema merge") {
     val session = spark
+    implicit val sparkSession: SparkSession = spark
     import session.implicits._
-    implicit def sqlContext: org.apache.spark.sql.classic.SQLContext = 
spark.sqlContext
 
     val streamInts = MemoryStream[Int]
     streamInts.addData(1, 2)
@@ -338,7 +339,6 @@ abstract class MaterializeTablesSuite extends 
BaseCoreExecutionTest {
   test("specified schema incompatible with existing table") {
     val session = spark
     import session.implicits._
-    implicit def sqlContext: org.apache.spark.sql.classic.SQLContext = 
spark.sqlContext
 
     sql(s"CREATE TABLE ${TestGraphRegistrationContext.DEFAULT_DATABASE}.t6(x 
BOOLEAN)")
     val catalog = 
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
@@ -352,6 +352,7 @@ abstract class MaterializeTablesSuite extends 
BaseCoreExecutionTest {
 
     val ex = intercept[TableMaterializationException] {
       materializeGraph(new TestGraphRegistrationContext(spark) {
+        implicit val sparkSession: SparkSession = spark
         val source: MemoryStream[Int] = MemoryStream[Int]
         source.addData(1, 2)
         registerTable(
@@ -644,8 +645,8 @@ abstract class MaterializeTablesSuite extends 
BaseCoreExecutionTest {
       s"Streaming tables should evolve schema only if not full refresh = 
$isFullRefresh"
     ) {
       val session = spark
+      implicit val sparkSession: SparkSession = spark
       import session.implicits._
-      implicit def sqlContext: org.apache.spark.sql.classic.SQLContext = 
spark.sqlContext
 
       val streamInts = MemoryStream[Int]
       streamInts.addData(1 until 5: _*)
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala
index a8db049b2b68..71a4b7f68404 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.pipelines.graph
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, 
StreamingQueryWrapper}
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
@@ -38,6 +39,7 @@ class SystemMetadataSuite
 
       // create a pipeline with only a single ST
       val graph = new TestGraphRegistrationContext(spark) {
+        implicit val sparkSession: SparkSession = spark
         val mem: MemoryStream[Int] = MemoryStream[Int]
         mem.addData(1, 2, 3)
         registerView("a", query = dfFlowFunc(mem.toDF()))
@@ -105,6 +107,7 @@ class SystemMetadataSuite
     import session.implicits._
 
     val graph = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val mem: MemoryStream[Int] = MemoryStream[Int]
       mem.addData(1, 2, 3)
       registerView("a", query = dfFlowFunc(mem.toDF()))
@@ -169,6 +172,7 @@ class SystemMetadataSuite
     import session.implicits._
 
     val graph = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       val mem: MemoryStream[Int] = MemoryStream[Int]
       mem.addData(1, 2, 3)
       registerView("a", query = dfFlowFunc(mem.toDF()))
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
index 57baf4c2d5b1..36b749cc84d9 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.pipelines.graph
 
 import org.scalatest.time.{Seconds, Span}
 
-import org.apache.spark.sql.{functions, Row}
+import org.apache.spark.sql.{functions, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.classic.{DataFrame, Dataset}
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, 
TableCatalog}
@@ -183,6 +183,7 @@ class TriggeredGraphExecutionSuite extends ExecutionTest 
with SharedSparkSession
 
     // Construct pipeline
     val pipelineDef = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       private val ints = MemoryStream[Int]
       ints.addData(1 until 10: _*)
       registerView("input", query = dfFlowFunc(ints.toDF()))
@@ -259,6 +260,7 @@ class TriggeredGraphExecutionSuite extends ExecutionTest 
with SharedSparkSession
 
     // Construct pipeline
     val pipelineDef = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       private val ints = MemoryStream[Int]
       registerView("input", query = dfFlowFunc(ints.toDF()))
       registerTable(
@@ -309,6 +311,7 @@ class TriggeredGraphExecutionSuite extends ExecutionTest 
with SharedSparkSession
     })
 
     val pipelineDef = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       private val memoryStream = MemoryStream[Int]
       memoryStream.addData(1, 2)
       registerView("input_view", query = dfFlowFunc(memoryStream.toDF()))
@@ -548,6 +551,7 @@ class TriggeredGraphExecutionSuite extends ExecutionTest 
with SharedSparkSession
 
     // Construct pipeline
     val pipelineDef = new TestGraphRegistrationContext(spark) {
+      implicit val sparkSession: SparkSession = spark
       private val memoryStream = MemoryStream[Int]
       memoryStream.addData(1, 2)
       registerView("input_view", query = dfFlowFunc(memoryStream.toDF()))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to