This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e915312f5ab5 [SPARK-53192][CONNECT] Always cache a DataSource in the Spark Connect Plan Cache e915312f5ab5 is described below commit e915312f5ab5538d02ea3ede129168193194b1d6 Author: Robert Dillitz <r.dill...@gmail.com> AuthorDate: Mon Aug 11 09:54:13 2025 +0800 [SPARK-53192][CONNECT] Always cache a DataSource in the Spark Connect Plan Cache ### What changes were proposed in this pull request? I believe we can dramatically improve the performance of the `SparkConnectPlanner` for plans using the same `Read.DataSource` (`spark.read`) multiple times (within the same session) by actively caching them in the [Spark Connect Plan Cache](https://github.com/apache/spark/commit/a1fc6d57b27d24b832b2f2580e6acd64c4488c62). At the moment, every occurrence of a `Read.DataSource` issues a separate analysis of the `DataSource`, which leads to us kicking off a new Spark Job per analysis, if no explicit schema is provided. This leads to very slow plan translation, because we need to fetch the (meta)data every time. For example, the following code, unionizing the same CSV file N times, kicks off N+1 Spark Jobs for the analysis of the final DataFrame in Spark Connect (compared to exactly 1 for Spark Classic): ```scala val df = spark.read.csv("abc.csv") (0 until N).foldLeft(df)((a, _) => a.union(df)).schema ``` I propose to adjust the Spark Connect Plan Cache to always cache a `Read.DataSource`, even when it is not the root of a relation. This always reduces the required Spark Jobs for analysis to at most 1 per **unique** `DataSource`. This has the same effect as when one explicitly analyzes the base DataSource today to populate the Spark Connect Plan Cache with its base plan, greatly improving the performance for subsequent queries using this `DataSource`: ```scala val df = spark.read.csv("abc.csv") df.schema (0 until N).foldLeft(df)((a, _) => a.union(df)).schema ``` To compensate for the increase in cached plans, I increased the Plan Cache size from 16 to 32. The DataSource plans are always leaf nodes, so I do not think they will add much memory pressure - this increases the cache size for all plans, though, so it is up for debate if we really want to do this. I also added a flag that lets one turn off this aggressive caching: `spark.connect.session.planCache.alwaysCacheDataSourceReadsEnabled`. Side note: This fix works so well because the `SparkConnectPlanner` [actively analyses](https://github.com/apache/spark/blob/8b80ea04d0b14d19b819cd4648b5ddd3e1c42650/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L1518) the `LogicalPlan` when translating the `Read.DataSource`. I am wondering whether this makes sense, conceptually, and why we chose to use `queryExecution.analyzed` instead of `queryExecution.logical` here. ### Why are the changes needed? Translating `DataSource`s today using the `SparkConnectPlanner` is very expensive/ineffective. There are a ton of Spark Connect Plan Cache misses because we only cache top-level plans. The improvement greatly improves the performance of Spark Connect plan translation involving `Read.DataSource`s, which are very common. ### Does this PR introduce _any_ user-facing change? We now always cache the analyzed `Read.DataSource` in the Spark Connect Plan Cache instead of only a top-level plan containing it. This is equivalent to accessing the `DataSource`'s schema after its creation, so I would argue it makes the whole caching experience only more consistent, while greatly improving the performance. ### How was this patch tested? Added a test case to `SparkConnectSessionHolderSuite` that checks the improved caching and also verifies that it can be turned off with the newly added flag. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51921 from dillitz/datasource-caching. Authored-by: Robert Dillitz <r.dill...@gmail.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../apache/spark/sql/connect/config/Connect.scala | 13 ++++- .../spark/sql/connect/service/SessionHolder.scala | 7 ++- .../service/SparkConnectSessionHolderSuite.scala | 60 ++++++++++++++++++---- 3 files changed, 68 insertions(+), 12 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index 5fe62295d1a5..1887e4ede04d 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -304,7 +304,7 @@ object Connect { .version("4.0.0") .internal() .intConf - .createWithDefault(16) + .createWithDefault(32) val CONNECT_SESSION_PLAN_CACHE_ENABLED = buildConf("spark.connect.session.planCache.enabled") @@ -317,6 +317,17 @@ object Connect { .booleanConf .createWithDefault(true) + val CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED = + buildConf("spark.connect.session.planCache.alwaysCacheDataSourceReadsEnabled") + .doc("When true, always cache the translation of Read.DataSource plans" + + " in the plan cache. This massively improves the performance of queries that reuse the" + + " same Read.DataSource within the same session, since these translations/analyses" + + " are usually quite costly.") + .version("4.1.0") + .internal() + .booleanConf + .createWithDefault(true) + val CONNECT_AUTHENTICATE_TOKEN = buildStaticConf("spark.connect.authenticate.token") .doc("A pre-shared token that will be used to authenticate clients. This secret must be" + diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index b7d8a6566cf9..be59439daefb 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -521,6 +521,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio // We only cache plans that have a plan ID. val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId + // Always cache a `Read.DataSource` to avoid re-analyzing the same `DataSource` twice. + lazy val alwaysCacheDataSourceReadsEnabled = Option(session) + .forall(_.conf.get(Connect.CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED, true)) + lazy val isDataSourceRead = rel.hasRead && rel.getRead.hasDataSource + def getPlanCache(rel: proto.Relation): Option[LogicalPlan] = planCache match { case Some(cache) if planCacheEnabled && hasPlanId => @@ -542,7 +547,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio getPlanCache(rel) .getOrElse({ val plan = transform(rel) - if (cachePlan) { + if (cachePlan || (alwaysCacheDataSourceReadsEnabled && isDataSourceRead)) { putPlanCache(rel, plan) } plan diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala index 159b29ce6b8e..a110b0164f19 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala @@ -401,19 +401,23 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { test("Test session plan cache - disabled") { val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark) // Disable plan cache of the session - sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, false) - val planner = new SparkConnectPlanner(sessionHolder) + try { + sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED.key, false) + val planner = new SparkConnectPlanner(sessionHolder) - val query = buildRelation("select 1") + val query = buildRelation("select 1") - // If cachePlan is false, the cache is still empty. - // Although the cache is created as cache size is greater than zero, it won't be used. - planner.transformRelation(query, cachePlan = false) - assertPlanCache(sessionHolder, Some(Set())) + // If cachePlan is false, the cache is still empty. + // Although the cache is created as cache size is greater than zero, it won't be used. + planner.transformRelation(query, cachePlan = false) + assertPlanCache(sessionHolder, Some(Set())) - // Even if we specify "cachePlan = true", the cache is still empty. - planner.transformRelation(query, cachePlan = true) - assertPlanCache(sessionHolder, Some(Set())) + // Even if we specify "cachePlan = true", the cache is still empty. + planner.transformRelation(query, cachePlan = true) + assertPlanCache(sessionHolder, Some(Set())) + } finally { + sessionHolder.session.conf.set(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true) + } } test("Test duplicate operation IDs") { @@ -440,4 +444,40 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession { sessionHolder.getPipelineExecution(graphId).isEmpty, "pipeline execution was not removed") } + + gridTest("Actively cache data source reads")(Seq(true, false)) { enabled => + val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark) + val planner = new SparkConnectPlanner(sessionHolder) + + val dataSourceRead = proto.Relation + .newBuilder() + .setRead( + proto.Read + .newBuilder() + .setDataSource(proto.Read.DataSource + .newBuilder() + .setSchema("col int"))) + .setCommon(proto.RelationCommon.newBuilder().setPlanId(Random.nextLong()).build()) + .build() + val dataSourceReadJoin = proto.Relation + .newBuilder() + .setJoin( + proto.Join + .newBuilder() + .setLeft(dataSourceRead) + .setRight(dataSourceRead) + .setJoinType(proto.Join.JoinType.JOIN_TYPE_CROSS)) + .setCommon(proto.RelationCommon.newBuilder().setPlanId(Random.nextLong()).build()) + .build() + + sessionHolder.session.conf + .set(Connect.CONNECT_ALWAYS_CACHE_DATA_SOURCE_READS_ENABLED, enabled) + planner.transformRelation(dataSourceReadJoin, cachePlan = true) + val expected = if (enabled) { + Set(dataSourceReadJoin, dataSourceRead) + } else { + Set(dataSourceReadJoin) + } + assertPlanCache(sessionHolder, Some(expected)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org