This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e915312f5ab5 [SPARK-53192][CONNECT] Always cache a DataSource in the 
Spark Connect Plan Cache
e915312f5ab5 is described below

commit e915312f5ab5538d02ea3ede129168193194b1d6
Author: Robert Dillitz <r.dill...@gmail.com>
AuthorDate: Mon Aug 11 09:54:13 2025 +0800

    [SPARK-53192][CONNECT] Always cache a DataSource in the Spark Connect Plan 
Cache
    
    ### What changes were proposed in this pull request?
    I believe we can dramatically improve the performance of the 
`SparkConnectPlanner` for plans using the same `Read.DataSource` (`spark.read`) 
multiple times (within the same session) by actively caching them in the [Spark 
Connect Plan 
Cache](https://github.com/apache/spark/commit/a1fc6d57b27d24b832b2f2580e6acd64c4488c62).
    
    At the moment, every occurrence of a `Read.DataSource` issues a separate 
analysis of the `DataSource`, which leads to us kicking off a new Spark Job per 
analysis, if no explicit schema is provided. This leads to very slow plan 
translation, because we need to fetch the (meta)data every time.
    
    For example, the following code, unionizing the same CSV file N times, 
kicks off N+1 Spark Jobs for the analysis of the final DataFrame in Spark 
Connect (compared to exactly 1 for Spark Classic):
    ```scala
    val df = spark.read.csv("abc.csv")
    (0 until N).foldLeft(df)((a, _) => a.union(df)).schema
    ```
    
    I propose to adjust the Spark Connect Plan Cache to always cache a 
`Read.DataSource`, even when it is not the root of a relation. This always 
reduces the required Spark Jobs for analysis to at most 1 per **unique** 
`DataSource`. This has the same effect as when one explicitly analyzes the base 
DataSource today to populate the Spark Connect Plan Cache with its base plan, 
greatly improving the performance for subsequent queries using this 
`DataSource`:
    ```scala
    val df = spark.read.csv("abc.csv")
    df.schema
    (0 until N).foldLeft(df)((a, _) => a.union(df)).schema
    ```
    
    To compensate for the increase in cached plans, I increased the Plan Cache 
size from 16 to 32. The DataSource plans are always leaf nodes, so I do not 
think they will add much memory pressure - this increases the cache size for 
all plans, though, so it is up for debate if we really want to do this.
    
    I also added a flag that lets one turn off this aggressive caching: 
`spark.connect.session.planCache.alwaysCacheDataSourceReadsEnabled`.
    
    Side note:
    This fix works so well because the `SparkConnectPlanner` [actively 
analyses](https://github.com/apache/spark/blob/8b80ea04d0b14d19b819cd4648b5ddd3e1c42650/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1518)
 the `LogicalPlan` when translating the `Read.DataSource`. I am wondering 
whether this makes sense, conceptually, and why we chose to use 
`queryExecution.analyzed` instead of `queryExecution.logical` here.
    
    ### Why are the changes needed?
    Translating `DataSource`s today using the `SparkConnectPlanner` is very 
expensive/ineffective. There are a ton of Spark Connect Plan Cache misses 
because we only cache top-level plans. The improvement greatly improves the 
performance of Spark Connect plan translation involving `Read.DataSource`s, 
which are very common.
    
    ### Does this PR introduce _any_ user-facing change?
    We now always cache the analyzed `Read.DataSource` in the Spark Connect 
Plan Cache instead of only a top-level plan containing it. This is equivalent 
to accessing the `DataSource`'s schema after its creation, so I would argue it 
makes the whole caching experience only more consistent, while greatly 
improving the performance.
    
    ### How was this patch tested?
    Added a test case to `SparkConnectSessionHolderSuite` that checks the 
improved caching and also verifies that it can be turned off with the newly 
added flag.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51921 from dillitz/datasource-caching.
    
    Authored-by: Robert Dillitz <r.dill...@gmail.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../apache/spark/sql/connect/config/Connect.scala  | 13 ++++-
 .../spark/sql/connect/service/SessionHolder.scala  |  7 ++-
 .../service/SparkConnectSessionHolderSuite.scala   | 60 ++++++++++++++++++----
 3 files changed, 68 insertions(+), 12 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 5fe62295d1a5..1887e4ede04d 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -304,7 +304,7 @@ object Connect {
       .version("4.0.0")
       .internal()
       .intConf
-      .createWithDefault(16)
+      .createWithDefault(32)
 
   val CONNECT_SESSION_PLAN_CACHE_ENABLED =
     buildConf("spark.connect.session.planCache.enabled")
@@ -317,6 +317,17 @@ object Connect {
       .booleanConf
       .createWithDefault(true)
 
+  val CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED =
+    
buildConf("spark.connect.session.planCache.alwaysCacheDataSourceReadsEnabled")
+      .doc("When true, always cache the translation of Read.DataSource plans" +
+        " in the plan cache. This massively improves the performance of 
queries that reuse the" +
+        " same Read.DataSource within the same session, since these 
translations/analyses" +
+        " are usually quite costly.")
+      .version("4.1.0")
+      .internal()
+      .booleanConf
+      .createWithDefault(true)
+
   val CONNECT_AUTHENTICATE_TOKEN =
     buildStaticConf("spark.connect.authenticate.token")
       .doc("A pre-shared token that will be used to authenticate clients. This 
secret must be" +
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index b7d8a6566cf9..be59439daefb 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -521,6 +521,11 @@ case class SessionHolder(userId: String, sessionId: 
String, session: SparkSessio
     // We only cache plans that have a plan ID.
     val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
 
+    // Always cache a `Read.DataSource` to avoid re-analyzing the same 
`DataSource` twice.
+    lazy val alwaysCacheDataSourceReadsEnabled = Option(session)
+      
.forall(_.conf.get(Connect.CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED, 
true))
+    lazy val isDataSourceRead = rel.hasRead && rel.getRead.hasDataSource
+
     def getPlanCache(rel: proto.Relation): Option[LogicalPlan] =
       planCache match {
         case Some(cache) if planCacheEnabled && hasPlanId =>
@@ -542,7 +547,7 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
     getPlanCache(rel)
       .getOrElse({
         val plan = transform(rel)
-        if (cachePlan) {
+        if (cachePlan || (alwaysCacheDataSourceReadsEnabled && 
isDataSourceRead)) {
           putPlanCache(rel, plan)
         }
         plan
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
index 159b29ce6b8e..a110b0164f19 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala
@@ -401,19 +401,23 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
   test("Test session plan cache - disabled") {
     val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
     // Disable plan cache of the session
-    
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, 
false)
-    val planner = new SparkConnectPlanner(sessionHolder)
+    try {
+      
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, 
false)
+      val planner = new SparkConnectPlanner(sessionHolder)
 
-    val query = buildRelation("select 1")
+      val query = buildRelation("select 1")
 
-    // If cachePlan is false, the cache is still empty.
-    // Although the cache is created as cache size is greater than zero, it 
won't be used.
-    planner.transformRelation(query, cachePlan = false)
-    assertPlanCache(sessionHolder, Some(Set()))
+      // If cachePlan is false, the cache is still empty.
+      // Although the cache is created as cache size is greater than zero, it 
won't be used.
+      planner.transformRelation(query, cachePlan = false)
+      assertPlanCache(sessionHolder, Some(Set()))
 
-    // Even if we specify "cachePlan = true", the cache is still empty.
-    planner.transformRelation(query, cachePlan = true)
-    assertPlanCache(sessionHolder, Some(Set()))
+      // Even if we specify "cachePlan = true", the cache is still empty.
+      planner.transformRelation(query, cachePlan = true)
+      assertPlanCache(sessionHolder, Some(Set()))
+    } finally {
+      
sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)
+    }
   }
 
   test("Test duplicate operation IDs") {
@@ -440,4 +444,40 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
       sessionHolder.getPipelineExecution(graphId).isEmpty,
       "pipeline execution was not removed")
   }
+
+  gridTest("Actively cache data source reads")(Seq(true, false)) { enabled =>
+    val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
+    val planner = new SparkConnectPlanner(sessionHolder)
+
+    val dataSourceRead = proto.Relation
+      .newBuilder()
+      .setRead(
+        proto.Read
+          .newBuilder()
+          .setDataSource(proto.Read.DataSource
+            .newBuilder()
+            .setSchema("col int")))
+      
.setCommon(proto.RelationCommon.newBuilder().setPlanId(Random.nextLong()).build())
+      .build()
+    val dataSourceReadJoin = proto.Relation
+      .newBuilder()
+      .setJoin(
+        proto.Join
+          .newBuilder()
+          .setLeft(dataSourceRead)
+          .setRight(dataSourceRead)
+          .setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS))
+      
.setCommon(proto.RelationCommon.newBuilder().setPlanId(Random.nextLong()).build())
+      .build()
+
+    sessionHolder.session.conf
+      .set(Connect.CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED, enabled)
+    planner.transformRelation(dataSourceReadJoin, cachePlan = true)
+    val expected = if (enabled) {
+      Set(dataSourceReadJoin, dataSourceRead)
+    } else {
+      Set(dataSourceReadJoin)
+    }
+    assertPlanCache(sessionHolder, Some(expected))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to