This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c121bb557447 [SPARK-49111][SQL] Move withProjectAndFilter to the
companion object of DataSourceV2Strategy
c121bb557447 is described below
commit c121bb557447709c19fc01b68b3a34b8838abc6d
Author: Uros Stankovic <[email protected]>
AuthorDate: Fri Aug 9 09:25:28 2024 +0200
[SPARK-49111][SQL] Move withProjectAndFilter to the companion object of
DataSourceV2Strategy
### What changes were proposed in this pull request?
Move static method `withProjectAndFilter` to object in DataSourceV2Strategy
### Why are the changes needed?
It provides better opportunities for reuse, since object of strategy is not
needed anymore for function invocation, and a code is also cleaner.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Simple refactor, there is no new changes that can be tested.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47606 from urosstan-db/data-source-v2-strategy-refactor.
Authored-by: Uros Stankovic <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../datasources/v2/DataSourceV2Strategy.scala | 51 +++++++++++++---------
1 file changed, 31 insertions(+), 20 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index bb138c0fcd0a..b0a89173060a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -53,21 +53,6 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
import DataSourceV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
- private def withProjectAndFilter(
- project: Seq[NamedExpression],
- filters: Seq[Expression],
- scan: LeafExecNode,
- needsUnsafeConversion: Boolean): SparkPlan = {
- val filterCondition = filters.reduceLeftOption(And)
- val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
-
- if (withFilter.output != project || needsUnsafeConversion) {
- ProjectExec(project, withFilter)
- } else {
- withFilter
- }
- }
-
private def refreshCache(r: DataSourceV2Relation)(): Unit = {
session.sharedState.cacheManager.recacheByPlan(session, r)
}
@@ -128,12 +113,14 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
unsafeRowRDD,
v1Relation,
tableIdentifier)
- withProjectAndFilter(project, filters, dsScan, needsUnsafeConversion =
false) :: Nil
+ DataSourceV2Strategy.withProjectAndFilter(
+ project, filters, dsScan, needsUnsafeConversion = false) :: Nil
case PhysicalOperation(project, filters,
DataSourceV2ScanRelation(_, scan: LocalScan, output, _, _)) =>
val localScanExec = LocalTableScanExec(output,
scan.rows().toImmutableArraySeq)
- withProjectAndFilter(project, filters, localScanExec,
needsUnsafeConversion = false) :: Nil
+ DataSourceV2Strategy.withProjectAndFilter(
+ project, filters, localScanExec, needsUnsafeConversion = false) :: Nil
case PhysicalOperation(project, filters, relation:
DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
@@ -146,7 +133,8 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val batchExec = BatchScanExec(relation.output, relation.scan,
runtimeFilters,
relation.ordering, relation.relation.table,
StoragePartitionJoinParams(relation.keyGroupedPartitioning))
- withProjectAndFilter(project, postScanFilters, batchExec,
!batchExec.supportsColumnar) :: Nil
+ DataSourceV2Strategy.withProjectAndFilter(
+ project, postScanFilters, batchExec, !batchExec.supportsColumnar) ::
Nil
case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isDefined =>
@@ -156,7 +144,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
// Add a Project here to make sure we produce unsafe rows.
- withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
+ DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec,
!scanExec.supportsColumnar) :: Nil
case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isEmpty =>
@@ -165,7 +153,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream,
r.startOffset.get)
// Add a Project here to make sure we produce unsafe rows.
- withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
+ DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec,
!scanExec.supportsColumnar) :: Nil
case WriteToDataSourceV2(relationOpt, writer, query, customMetrics) =>
val invalidateCacheFunc: () => Unit = () => relationOpt match {
@@ -645,6 +633,29 @@ private[sql] object DataSourceV2Strategy extends Logging {
logWarning(log"Can't translate ${MDC(EXPR, other)} to source filter,
unsupported expression")
None
}
+
+ /**
+ * Creates new spark plan that should apply given filters and projections to
given scan node
+ * @param project Projection list that should be output of returned spark
plan
+ * @param filters Filter list that should be applied to scan node
+ * @param scan Scan node
+ * @param needsUnsafeConversion Value that indicates whether unsafe
conversion is needed
+ * @return SparkPlan tree composed of scan node and eventually
filter/project nodes
+ */
+ protected[sql] def withProjectAndFilter(
+ project: Seq[NamedExpression],
+ filters: Seq[Expression],
+ scan: LeafExecNode,
+ needsUnsafeConversion: Boolean): SparkPlan = {
+ val filterCondition = filters.reduceLeftOption(And)
+ val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+
+ if (withFilter.output != project || needsUnsafeConversion) {
+ ProjectExec(project, withFilter)
+ } else {
+ withFilter
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]