This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6bae835ccbc8 [SPARK-53720][SQL] Simplify extracting Table from
DataSourceV2Relation (#52460)
6bae835ccbc8 is described below
commit 6bae835ccbc8850ac5e2ab0225c6cd75921f06b4
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Oct 13 16:19:45 2025 -0700
[SPARK-53720][SQL] Simplify extracting Table from DataSourceV2Relation
(#52460)
### What changes were proposed in this pull request?
This PR adds a new extractor for `Table` from `DataSourceV2Relation`.
### Why are the changes needed?
As we see over time, `DataSourceV2Relation` continues to evolve and has
many args. Frequently, we only need to get the table instance and are not
interested in other arguments. Therefore, it makes sense to add a new extractor
for this common use case. In particular, I plan to add `TimeTravelSpec` and I
don't want to update a ton of places for no good reason.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR relies on existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
---
.../spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala | 8 ++++----
.../spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala | 4 ++--
.../spark/sql/catalyst/analysis/RewriteUpdateTable.scala | 4 ++--
.../org/apache/spark/sql/catalyst/planning/patterns.scala | 5 ++---
.../apache/spark/sql/catalyst/plans/logical/v2Commands.scala | 10 +++++-----
.../sql/execution/datasources/v2/DataSourceV2Relation.scala | 4 ++++
.../src/main/scala/org/apache/spark/sql/classic/Dataset.scala | 5 ++---
.../scala/org/apache/spark/sql/execution/CacheManager.scala | 4 ++--
.../spark/sql/execution/datasources/DataSourceStrategy.scala | 5 ++---
.../spark/sql/execution/datasources/FallBackFileSourceV2.scala | 4 ++--
.../sql/execution/datasources/v2/DataSourceV2Strategy.scala | 4 ++--
.../datasources/parquet/ParquetPartitionDiscoverySuite.scala | 4 ++--
.../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala | 4 ++--
13 files changed, 33 insertions(+), 32 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
index 9c63e091eaf5..0dc217788fd0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2,
SupportsRowLevelOperations, TruncatableTable}
import org.apache.spark.sql.connector.write.{RowLevelOperationTable,
SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
ExtractV2Table}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
@@ -40,11 +40,11 @@ object RewriteDeleteFromTable extends
RewriteRowLevelCommand {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
EliminateSubqueryAliases(aliasedTable) match {
- case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond ==
TrueLiteral =>
+ case ExtractV2Table(_: TruncatableTable) if cond == TrueLiteral =>
// don't rewrite as the table supports truncation
d
- case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _,
_) =>
+ case r @ ExtractV2Table(t: SupportsRowLevelOperations) =>
val table = buildOperationTable(t, DELETE,
CaseInsensitiveStringMap.empty())
table.operation match {
case _: SupportsDelta =>
@@ -53,7 +53,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
buildReplaceDataPlan(r, table, cond)
}
- case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
+ case ExtractV2Table(_: SupportsDeleteV2) =>
// don't rewrite as the table supports deletes only with filters
d
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 9e67aa156fa2..8b5b690aa740 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -29,7 +29,7 @@ import
org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.write.{RowLevelOperationTable,
SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
ExtractV2Table}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -125,7 +125,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand
with PredicateHelper
if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
EliminateSubqueryAliases(aliasedTable) match {
- case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _,
_, _) =>
+ case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
validateMergeIntoConditions(m)
val table = buildOperationTable(tbl, MERGE,
CaseInsensitiveStringMap.empty())
table.operation match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
index b2955ca00687..a4453ae51734 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
import org.apache.spark.sql.connector.write.{RowLevelOperationTable,
SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
ExtractV2Table}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -40,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
if u.resolved && u.rewritable && u.aligned =>
EliminateSubqueryAliases(aliasedTable) match {
- case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _,
_, _) =>
+ case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
val table = buildOperationTable(tbl, UPDATE,
CaseInsensitiveStringMap.empty())
val updateCond = cond.getOrElse(TrueLiteral)
table.operation match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 54a4e75c90c9..b95c4624b8c5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation, ExtractV2Table}
import org.apache.spark.sql.internal.SQLConf
trait OperationHelper extends AliasHelper with PredicateHelper {
@@ -436,8 +436,7 @@ object GroupBasedRowLevelOperation {
type ReturnType = (ReplaceData, Expression, Option[Expression], LogicalPlan)
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
- case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _),
- cond, query, _, _, groupFilterCond, _) =>
+ case rd @ ReplaceData(ExtractV2Table(table), cond, query, _, _,
groupFilterCond, _) =>
// group-based UPDATEs that are rewritten as UNION read the table twice
val allowMultipleReads = rd.operation.command == UPDATE
val readRelation = findReadRelation(table, query, allowMultipleReads)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index a5cba44aac6a..cd0c2742df3d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.write.{DeltaWrite,
RowLevelOperation, RowL
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE,
MERGE, UPDATE}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
ExtractV2Table}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType,
DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField,
StructType}
import org.apache.spark.util.ArrayImplicits._
@@ -263,7 +263,7 @@ case class ReplaceData(
lazy val operation: RowLevelOperation = {
EliminateSubqueryAliases(table) match {
- case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _,
_) =>
+ case ExtractV2Table(RowLevelOperationTable(_, operation)) =>
operation
case _ =>
throw new AnalysisException(
@@ -345,7 +345,7 @@ case class WriteDelta(
lazy val operation: SupportsDelta = {
EliminateSubqueryAliases(table) match {
- case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _,
_) =>
+ case ExtractV2Table(RowLevelOperationTable(_, operation)) =>
operation.asInstanceOf[SupportsDelta]
case _ =>
throw new AnalysisException(
@@ -834,7 +834,7 @@ case class UpdateTable(
lazy val rewritable: Boolean = {
EliminateSubqueryAliases(table) match {
- case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) =>
true
+ case ExtractV2Table(_: SupportsRowLevelOperations) => true
case _ => false
}
}
@@ -878,7 +878,7 @@ case class MergeIntoTable(
lazy val rewritable: Boolean = {
EliminateSubqueryAliases(targetTable) match {
- case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) =>
true
+ case ExtractV2Table(_: SupportsRowLevelOperations) => true
case _ => false
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 26f406999494..180a14df865b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -231,6 +231,10 @@ case class StreamingDataSourceV2ScanRelation(
override protected def stringArgs: Iterator[Any] = stringArgsVal.iterator
}
+object ExtractV2Table {
+ def unapply(relation: DataSourceV2Relation): Option[Table] =
Some(relation.table)
+}
+
object DataSourceV2Relation {
def create(
table: Table,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
index b8ffa09dfa05..3f377531f91f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
@@ -60,7 +60,7 @@ import
org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter,
ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelationWithTable
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation, FileTable}
+import
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation,
ExtractV2Table, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
@@ -1733,8 +1733,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
- case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _,
_, _, _),
- _, _, _, _) =>
+ case DataSourceV2ScanRelation(ExtractV2Table(table: FileTable), _, _, _,
_) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index a8292a8dbaa3..a04486d36a64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -32,7 +32,7 @@ import
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex,
HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
ExtractV2Table, FileTable}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -431,7 +431,7 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
case _ => false
}
- case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
+ case ExtractV2Table(fileTable: FileTable) =>
refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)
case _ => false
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2e47f08ac115..b1fcf6f4b3a7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
PushedDownOperators}
+import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table,
PushedDownOperators}
import org.apache.spark.sql.execution.streaming.runtime.StreamingRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
@@ -319,8 +319,7 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
i.copy(table = DDLUtils.readHiveTable(tableMeta))
case append @ AppendData(
- DataSourceV2Relation(
- V1Table(table: CatalogTable), _, _, _, _), _, _, _, _, _) if
!append.isByName =>
+ ExtractV2Table(V1Table(table: CatalogTable)), _, _, _, _, _) if
!append.isByName =>
InsertIntoStatement(UnresolvedCatalogRelation(table),
table.partitionColumnNames.map(name => name -> None).toMap,
Seq.empty, append.query, false, append.isByName)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index 8c7203bca625..60c459ecf540 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement,
LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table,
FileTable}
/**
* Replace the File source V2 table in [[InsertIntoStatement]] to V1
[[FileFormat]].
@@ -35,7 +35,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
class FallBackFileSourceV2(sparkSession: SparkSession) extends
Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(
- d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _,
_) =>
+ d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) =>
val v1FileFormat =
table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b07e0442d4f0..98ea63862ac2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -264,7 +264,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
invalidateCache) :: Nil
}
- case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _),
_, _,
+ case AppendData(r @ ExtractV2Table(v1: SupportsWrite), _, _,
_, Some(write), analyzedQuery) if
v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
@@ -278,7 +278,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil
- case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _,
_, _, _), _, _,
+ case OverwriteByExpression(r @ ExtractV2Table(v1: SupportsWrite), _, _,
_, _, Some(write), analyzedQuery) if
v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 324fe148592a..796b14a08ec0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter,
DateTimeUtils, TimeFor
import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table,
FileTable}
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -1498,7 +1498,7 @@ class ParquetV2PartitionDiscoverySuite extends
ParquetPartitionDiscoverySuite {
(1 to 10).map(i => (i, i.toString)).toDF("a",
"b").write.parquet(dir.getCanonicalPath)
val queryExecution =
spark.read.parquet(dir.getCanonicalPath).queryExecution
queryExecution.analyzed.collectFirst {
- case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
+ case ExtractV2Table(fileTable: FileTable) =>
assert(fileTable.fileIndex.partitionSpec() ===
PartitionSpec.emptySpec)
}.getOrElse {
fail(s"Expecting a matching DataSourceV2Relation, but
got:\n$queryExecution")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 067b0ca285d5..b0aa71a7e1b3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2Relation, FileScan, FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
ExtractV2Table, FileScan, FileTable}
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
import org.apache.spark.sql.execution.streaming.runtime._
import org.apache.spark.sql.execution.streaming.sinks.{FileStreamSink,
FileStreamSinkLog, SinkFileStatus}
@@ -776,7 +776,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
// Verify that MetadataLogFileIndex is being used and the correct
partitioning schema has
// been inferred
val table = df.queryExecution.analyzed.collect {
- case DataSourceV2Relation(table: FileTable, _, _, _, _) => table
+ case ExtractV2Table(table: FileTable) => table
}
assert(table.size === 1)
assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]