This is an automated email from the ASF dual-hosted git repository.
gengliangwang pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new b5d5e41b47f2 [SPARK-56680][SQL] DSv2 INSERT and Insert-Only MERGE
Metrics
b5d5e41b47f2 is described below
commit b5d5e41b47f2b13d8900141aafc809035a2d1bbe
Author: Ziya Mukhtarov <[email protected]>
AuthorDate: Wed May 13 10:39:09 2026 -0700
[SPARK-56680][SQL] DSv2 INSERT and Insert-Only MERGE Metrics
### What changes were proposed in this pull request?
MERGE and INSERT / write metrics for DSv2.
This PR only handles pure-inserts. Overwrites will be handled in a future
PR, as those require some values from the connector.
### Why are the changes needed?
For better visibility into what happened as a result of DML queries.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added metric verification to existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes #55586 from ZiyaZa/insert-only-merge-metrics.
Lead-authored-by: Ziya Mukhtarov <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit c4d16a5bf0bd1b9dcac831454f869744854da79e)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/sql/connector/write/InsertSummary.java | 34 +++++++
.../catalyst/analysis/RewriteMergeIntoTable.scala | 6 +-
.../sql/catalyst/plans/logical/v2Commands.scala | 20 ++++
.../sql/connector/write/InsertSummaryImpl.scala | 24 +++++
.../sql/connector/catalog/InMemoryBaseTable.scala | 27 ++++--
.../catalog/InMemoryRowLevelOperationTable.scala | 20 ++--
.../sql/connector/catalog/InMemoryTable.scala | 3 +-
.../catalog/InMemoryTableWithV2Filter.scala | 3 +-
.../datasources/v2/DataSourceV2Strategy.scala | 25 ++++-
.../datasources/v2/TableCapabilityCheck.scala | 5 +-
.../sql/execution/datasources/v2/V2Writes.scala | 9 +-
.../datasources/v2/WriteToDataSourceV2Exec.scala | 74 ++++++++++----
.../DataSourceV2DataFrameSessionCatalogSuite.scala | 7 +-
.../sql/connector/DataSourceV2DataFrameSuite.scala | 17 ++++
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 1 +
.../spark/sql/connector/InsertIntoTests.scala | 106 +++++++++++++++++++++
.../sql/connector/MergeIntoTableSuiteBase.scala | 30 ++++++
.../spark/sql/connector/V1WriteFallbackSuite.scala | 3 +
18 files changed, 364 insertions(+), 50 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/InsertSummary.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/InsertSummary.java
new file mode 100644
index 000000000000..40f41bf23844
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/InsertSummary.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * Provides an informational summary of the INSERT operation producing write.
+ *
+ * @since 4.2.0
+ */
+@Evolving
+public interface InsertSummary extends WriteSummary {
+
+ /**
+ * Returns the number of inserted rows, or -1 if not found.
+ */
+ long numInsertedRows();
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 168f30623ee4..8281f89bd2e8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, And,
Attribute, Attribu
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, JoinType,
LeftAnti, LeftOuter, RightOuter}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction,
Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction,
MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project, ReplaceData,
UpdateAction, WriteDelta}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction, Filter,
HintInfo, InsertAction, InsertOnlyMerge, Join, JoinHint, LogicalPlan,
MergeAction, MergeIntoTable, MergeRows, NO_BROADCAST_AND_REPLICATION, Project,
ReplaceData, UpdateAction, WriteDelta}
import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Delete,
Discard, Insert, Instruction, Keep, ROW_ID, Split, Update}
import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{COPY_OPERATION,
INSERT_OPERATION, OPERATION_COLUMN, UPDATE_OPERATION}
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
@@ -73,7 +73,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand
with PredicateHelper
}
val project = Project(projectList, joinPlan)
- AppendData.byPosition(r, project)
+ InsertOnlyMerge(r, project)
case _ =>
m
@@ -114,7 +114,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand
with PredicateHelper
output = generateExpandOutput(r.output, outputs),
joinPlan)
- AppendData.byPosition(r, mergeRows)
+ InsertOnlyMerge(r, mergeRows)
case _ =>
m
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 7b657ce34df4..b1ab46ee9481 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -194,6 +194,26 @@ object AppendData {
}
}
+/**
+ * Append data to an existing table as the result of an insert-only MERGE
rewrite.
+ *
+ * Functionally equivalent to [[AppendData]] but distinguishes the row-level
MERGE rewrite path.
+ */
+case class InsertOnlyMerge(
+ table: NamedRelation,
+ query: LogicalPlan,
+ write: Option[Write] = None,
+ analyzedQuery: Option[LogicalPlan] = None) extends V2WriteCommand with
TransactionalWrite {
+ override val isByName: Boolean = false
+ override val withSchemaEvolution: Boolean = false
+ override val writePrivileges: Set[TableWritePrivilege] =
Set(TableWritePrivilege.INSERT)
+ override def withNewQuery(newQuery: LogicalPlan): InsertOnlyMerge =
copy(query = newQuery)
+ override def withNewTable(newTable: NamedRelation): InsertOnlyMerge =
copy(table = newTable)
+ override def storeAnalyzedQuery(): Command = copy(analyzedQuery =
Some(query))
+ override protected def withNewChildInternal(newChild: LogicalPlan):
InsertOnlyMerge =
+ copy(query = newChild)
+}
+
/**
* Overwrite data matching a filter in an existing table.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala
new file mode 100644
index 000000000000..97c2e082c257
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.write
+
+/**
+ * Implementation of [[InsertSummary]] that provides INSERT operation summary.
+ */
+private[sql] case class InsertSummaryImpl(numInsertedRows: Long) extends
InsertSummary {
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 53e57153030b..f582f3e408cb 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -788,30 +788,43 @@ abstract class InMemoryBaseTable(
}
override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+
+ protected def doCommit(messages: Array[WriterCommitMessage]): Unit
+
+ override final def commit(messages: Array[WriterCommitMessage]): Unit = {
+ doCommit(messages)
+ commits += Commit(Instant.now().toEpochMilli)
+ }
+
+ override final def commit(
+ messages: Array[WriterCommitMessage],
+ summary: WriteSummary): Unit = {
+ doCommit(messages)
+ commits += Commit(Instant.now().toEpochMilli, writeSummary =
Some(summary))
+ }
}
class Append(val info: LogicalWriteInfo) extends TestBatchWrite {
-
- override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
+ override protected def doCommit(
+ messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
withData(messages.map(_.asInstanceOf[BufferedRows]))
- commits += Commit(Instant.now().toEpochMilli)
}
}
class DynamicOverwrite(val info: LogicalWriteInfo) extends TestBatchWrite {
- override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
+ override protected def doCommit(
+ messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
val newData = messages.map(_.asInstanceOf[BufferedRows])
dataMap --= newData.flatMap(_.rows.map(getKey))
withData(newData)
- commits += Commit(Instant.now().toEpochMilli)
}
}
class TruncateAndAppend(val info: LogicalWriteInfo) extends TestBatchWrite {
- override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
+ override protected def doCommit(
+ messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
dataMap.clear()
withData(messages.map(_.asInstanceOf[BufferedRows]))
- commits += Commit(Instant.now().toEpochMilli)
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
index 6778f4489e45..5c0bc0b143f3 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.connector.catalog
-import java.time.Instant
import java.util
import org.apache.spark.sql.catalyst.InternalRow
@@ -26,7 +25,7 @@ import
org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
import org.apache.spark.sql.connector.expressions.{FieldReference,
LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite,
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory,
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering,
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo,
SupportsDelta, Write, WriteBuilder, WriterCommitMessage, WriteSummary}
+import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite,
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory,
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering,
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo,
SupportsDelta, Write, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -143,18 +142,11 @@ class InMemoryRowLevelOperationTable private (
override def description(): String = "InMemoryPartitionReplaceOperation"
}
- abstract class RowLevelOperationBatchWrite extends TestBatchWrite {
-
- override def commit(messages: Array[WriterCommitMessage], metrics:
WriteSummary): Unit = {
- commit(messages)
- commits += Commit(Instant.now().toEpochMilli, Some(metrics))
- }
- }
-
private case class PartitionBasedReplaceData(scan: InMemoryBatchScan)
- extends RowLevelOperationBatchWrite {
+ extends TestBatchWrite {
- override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
+ override protected def doCommit(
+ messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
val newData = messages.map(_.asInstanceOf[BufferedRows])
val readRows = scan.data.flatMap(_.asInstanceOf[BufferedRows].rows)
val readPartitions = readRows.map(r => getKey(r, schema)).distinct
@@ -216,12 +208,12 @@ class InMemoryRowLevelOperationTable private (
}
}
- private object TestDeltaBatchWrite extends RowLevelOperationBatchWrite with
DeltaBatchWrite{
+ private object TestDeltaBatchWrite extends TestBatchWrite with
DeltaBatchWrite {
override def createBatchWriterFactory(info: PhysicalWriteInfo):
DeltaWriterFactory = {
new
DeltaBufferedRowsWriterFactory(CatalogV2Util.v2ColumnsToStructType(columns()))
}
- override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ override protected def doCommit(messages: Array[WriterCommitMessage]):
Unit = {
val newData = messages.map(_.asInstanceOf[BufferedRows])
withDeletes(newData)
withData(newData, columns())
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
index 15ed4136dbda..66db9c18fa98 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala
@@ -205,7 +205,8 @@ class InMemoryTable(
private class Overwrite(filters: Array[Filter]) extends TestBatchWrite {
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
- override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
+ override protected def doCommit(
+ messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
val deleteKeys = InMemoryTable.filtersToKeys(
dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq,
filters)
dataMap --= deleteKeys
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala
index f2827faf5943..e9d73d0f9fe1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableWithV2Filter.scala
@@ -140,7 +140,8 @@ class InMemoryTableWithV2Filter(
private class Overwrite(predicates: Array[Predicate]) extends TestBatchWrite
{
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
- override def commit(messages: Array[WriterCommitMessage]): Unit =
dataMap.synchronized {
+ override protected def doCommit(
+ messages: Array[WriterCommitMessage]): Unit = dataMap.synchronized {
val deleteKeys = InMemoryTableWithV2Filter.filtersToKeys(
dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq,
predicates)
dataMap --= deleteKeys
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index e11347581109..3c58298ec921 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.EXPR
-import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier,
ResolvedNamespace, ResolvedPartitionSpec, ResolvedPersistentView,
ResolvedTable, ResolvedTempView}
+import org.apache.spark.sql.catalyst.analysis.{NamedRelation,
ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec,
ResolvedPersistentView, ResolvedTable, ResolvedTempView}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute,
DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper,
SubqueryExpression}
@@ -41,7 +41,7 @@ import
org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not =>
V2Not, Or => V2Or, Predicate}
import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream,
MicroBatchStream, SupportsRealTimeMode}
-import org.apache.spark.sql.connector.write.V1Write
+import org.apache.spark.sql.connector.write.{V1Write, Write}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec,
LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec,
ScalarSubquery => ExecScalarSubquery, SparkPlan, SparkStrategy => Strategy}
import org.apache.spark.sql.execution.command.{CommandUtils,
CreateMetricViewCommand, MetricViewHelper}
@@ -491,8 +491,8 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
invalidateCache) :: Nil
}
- case AppendData(r @ ExtractV2Table(v1: SupportsWrite), _, _,
- _, _, Some(write), analyzedQuery) if
v1.supports(TableCapability.V1_BATCH_WRITE) =>
+ case AppendWrite(r @ ExtractV2Table(v1: SupportsWrite), Some(write),
analyzedQuery)
+ if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
assert(analyzedQuery.isDefined)
@@ -505,6 +505,9 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case AppendData(r: DataSourceV2Relation, query, _, _, _, Some(write), _) =>
AppendDataExec(planLater(query), refreshCache(r), write, r.name) :: Nil
+ case InsertOnlyMerge(r: DataSourceV2Relation, query, Some(write), _) =>
+ InsertOnlyMergeExec(planLater(query), refreshCache(r), write, r.name) ::
Nil
+
case OverwriteByExpression(r @ ExtractV2Table(v1: SupportsWrite), _, _,
_, _, _, Some(write), analyzedQuery) if
v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
@@ -843,6 +846,20 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
}
}
+/**
+ * Pattern that matches either an [[AppendData]] or an [[InsertOnlyMerge]] and
exposes the
+ * fields needed to plan the v1 batch-write fallback path.
+ */
+private object AppendWrite {
+ def unapply(
+ plan: LogicalPlan
+ ): Option[(NamedRelation, Option[Write], Option[LogicalPlan])] = plan match {
+ case a: AppendData => Some((a.table, a.write, a.analyzedQuery))
+ case m: InsertOnlyMerge => Some((m.table, m.write, m.analyzedQuery))
+ case _ => None
+ }
+}
+
private[sql] object DataSourceV2Strategy extends Logging {
private def translateLeafNodeFilterV2(predicate: Expression):
Option[Predicate] = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
index b25059bd7bac..988aa86db1d3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
InsertOnlyMerge, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -49,6 +49,9 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
case AppendData(r: DataSourceV2Relation, _, _, _, _, _, _) if
!supportsBatchWrite(r.table) =>
throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name)
+ case InsertOnlyMerge(r: DataSourceV2Relation, _, _, _) if
!supportsBatchWrite(r.table) =>
+ throw QueryCompilationErrors.unsupportedAppendInBatchModeError(r.name)
+
case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _, _, _, _)
if !r.table.supports(BATCH_WRITE) ||
!r.table.supports(OVERWRITE_DYNAMIC) =>
throw
QueryCompilationErrors.unsupportedDynamicOverwriteInBatchModeError(r.table)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
index 70a2bf6d5b8b..0cbf260457ff 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
@@ -22,7 +22,7 @@ import java.util.UUID
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic, ReplaceData, WriteDelta}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData,
InsertOnlyMerge, LogicalPlan, OverwriteByExpression,
OverwritePartitionsDynamic, ReplaceData, WriteDelta}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
@@ -51,6 +51,13 @@ object V2Writes extends Rule[LogicalPlan] with
PredicateHelper {
val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query,
r.funCatalog)
a.copy(write = Some(write), query = newQuery)
+ case m @ InsertOnlyMerge(r: DataSourceV2Relation, query, None, _) =>
+ val writeOptions = r.options.asCaseSensitiveMap.asScala.toMap
+ val writeBuilder = newWriteBuilder(r.table, writeOptions, query.schema)
+ val write = writeBuilder.build()
+ val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query,
r.funCatalog)
+ m.copy(write = Some(write), query = newQuery)
+
case o @ OverwriteByExpression(
r: DataSourceV2Relation, deleteExpr, query, options, _, _, None, _) =>
// fail if any filter cannot be converted. correctness depends on
removing all matching data.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 3cbfed40d876..5d8b5a081c90 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -33,16 +33,16 @@ import
org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter,
MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation,
RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage,
WriteSummary}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter,
InsertSummaryImpl, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation,
RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage,
WriteSummary}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command._
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan,
SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric,
SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
-import org.apache.spark.util.{LongAccumulator, Utils}
import org.apache.spark.util.ArrayImplicits._
+import org.apache.spark.util.Utils
/**
* Deprecated logical plan for writing data into data source v2. This is being
replaced by more
@@ -294,6 +294,37 @@ case class AppendDataExec(
override def withTransaction(txn: Option[Transaction]): AppendDataExec =
copy(transaction = txn)
override protected def withNewChildInternal(newChild: SparkPlan):
AppendDataExec =
copy(query = newChild)
+
+ override protected def getWriteSummary(): Option[WriteSummary] =
+ Some(InsertSummaryImpl(numInsertedRows = numOutputRowsMetric.value))
+}
+
+/**
+ * Physical plan for an insert-only MERGE rewrite. Behaves like
[[AppendDataExec]] but emits a
+ * [[org.apache.spark.sql.connector.write.MergeSummary]] so commit metadata
reports the operation
+ * as a MERGE, with all output rows accounted for as inserts.
+ */
+case class InsertOnlyMergeExec(
+ query: SparkPlan,
+ refreshCache: () => Unit,
+ write: Write,
+ tableName: String,
+ transaction: Option[Transaction] = None) extends V2ExistingTableWriteExec {
+ override def withTransaction(txn: Option[Transaction]): InsertOnlyMergeExec =
+ copy(transaction = txn)
+ override protected def withNewChildInternal(newChild: SparkPlan):
InsertOnlyMergeExec =
+ copy(query = newChild)
+
+ override protected def getWriteSummary(): Option[WriteSummary] =
+ Some(MergeSummaryImpl(
+ numTargetRowsCopied = 0L,
+ numTargetRowsDeleted = 0L,
+ numTargetRowsUpdated = 0L,
+ numTargetRowsInserted = numOutputRowsMetric.value,
+ numTargetRowsMatchedUpdated = 0L,
+ numTargetRowsMatchedDeleted = 0L,
+ numTargetRowsNotMatchedBySourceUpdated = 0L,
+ numTargetRowsNotMatchedBySourceDeleted = 0L))
}
/**
@@ -477,17 +508,18 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec
with TransactionalExec {
trait RowLevelWriteExec extends V2ExistingTableWriteExec {
def rowLevelCommand: RowLevelOperation.Command
- override protected lazy val sparkMetrics: Map[String, SQLMetric] =
rowLevelCommand match {
- case UPDATE =>
- Map(
- "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated rows"),
- "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
- case DELETE =>
- Map(
- "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of
deleted rows"),
- "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
- case _ => Map.empty
- }
+ override protected lazy val sparkMetrics: Map[String, SQLMetric] =
super.sparkMetrics ++ (
+ rowLevelCommand match {
+ case UPDATE =>
+ Map(
+ "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated rows"),
+ "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
+ case DELETE =>
+ Map(
+ "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of
deleted rows"),
+ "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
+ case _ => Map.empty
+ })
/**
* Returns the value of the named metric, or -1 if the metric is not found.
@@ -551,6 +583,12 @@ trait V2TableWriteExec
override def customMetrics: Map[String, SQLMetric] = Map.empty
+ protected lazy val numOutputRowsMetric: SQLMetric =
+ SQLMetrics.createMetric(sparkContext, "number of output rows")
+
+ override protected def sparkMetrics: Map[String, SQLMetric] = Map(
+ "numOutputRows" -> numOutputRowsMetric)
+
protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = {
val rdd: RDD[InternalRow] = {
val tempRdd = query.execute()
@@ -568,7 +606,6 @@ trait V2TableWriteExec
PhysicalWriteInfoImpl(rdd.getNumPartitions))
val useCommitCoordinator = batchWrite.useCommitCoordinator
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
- val totalNumRowsAccumulator = new LongAccumulator()
logInfo(log"Start processing data source write support: " +
log"${MDC(LogKeys.BATCH_WRITE, batchWrite)}. The input RDD has " +
@@ -586,11 +623,14 @@ trait V2TableWriteExec
(index, result: DataWritingSparkTaskResult) => {
val commitMessage = result.writerCommitMessage
messages(index) = commitMessage
- totalNumRowsAccumulator.add(result.numRows)
+ numOutputRowsMetric.add(result.numRows)
batchWrite.onDataWriterCommit(commitMessage)
}
)
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
Seq(numOutputRowsMetric))
+
val writeSummary = getWriteSummary()
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE,
batchWrite)} is committing.")
writeSummary match {
@@ -598,7 +638,7 @@ trait V2TableWriteExec
case None => batchWrite.commit(messages)
}
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE,
batchWrite)} committed.")
- commitProgress =
Some(StreamWriterCommitProgress(totalNumRowsAccumulator.value))
+ commitProgress =
Some(StreamWriterCommitProgress(numOutputRowsMetric.value))
} catch {
case cause: Throwable =>
logError(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 4db59b36c1fe..97cdebe2d32d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -181,7 +181,7 @@ object InMemoryTableSessionCatalog {
private [connector] trait SessionCatalogTest[T <: Table, Catalog <:
TestV2SessionCatalogBase[T]]
extends SharedSparkSession
- with BeforeAndAfter {
+ with BeforeAndAfter { self: InsertIntoSQLOnlyTests =>
protected def catalog(name: String): CatalogPlugin = {
spark.sessionState.catalogManager.catalog(name)
@@ -215,6 +215,7 @@ private [connector] trait SessionCatalogTest[T <: Table,
Catalog <: TestV2Sessio
val t1 = "tbl"
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.format(v2Format).saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
}
@@ -222,6 +223,7 @@ private [connector] trait SessionCatalogTest[T <: Table,
Catalog <: TestV2Sessio
val t1 = "tbl"
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.format(v2Format).mode("append").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
}
@@ -245,10 +247,12 @@ private [connector] trait SessionCatalogTest[T <: Table,
Catalog <: TestV2Sessio
df.select("id", "data").write.format(v2Format).saveAsTable(t1)
}
df.write.format(v2Format).mode("append").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
// Check that appends are by name
df.select($"data",
$"id").write.format(v2Format).mode("append").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df.union(df))
}
@@ -284,6 +288,7 @@ private [connector] trait SessionCatalogTest[T <: Table,
Catalog <: TestV2Sessio
val t1 = "tbl"
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.format(v2Format).mode("ignore").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 5cda5169369e..c532ef359a7c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -102,7 +102,9 @@ class DataSourceV2DataFrameSuite
sql(s"CREATE TABLE $t2 (id bigint, data string) USING foo")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.insertInto(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
spark.table(t1).write.insertInto(t2)
+ checkInsertMetrics(t2, numInsertedRows = 3)
checkAnswer(spark.table(t2), df)
}
}
@@ -112,6 +114,7 @@ class DataSourceV2DataFrameSuite
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
checkAnswer(spark.table(t1), df)
}
}
@@ -129,6 +132,7 @@ class DataSourceV2DataFrameSuite
// appends are by name not by position
df.select($"data", $"id").write.mode("append").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
checkAnswer(spark.table(t1), df)
}
}
@@ -157,6 +161,7 @@ class DataSourceV2DataFrameSuite
withTable(t1) {
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.mode("ignore").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
checkAnswer(spark.table(t1), df)
}
}
@@ -190,6 +195,7 @@ class DataSourceV2DataFrameSuite
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
df.write.option("other", "20").mode("append").saveAsTable(t1)
+ checkInsertMetrics(t1, numInsertedRows = 3)
sparkContext.listenerBus.waitUntilEmpty()
plan match {
@@ -391,24 +397,29 @@ class DataSourceV2DataFrameSuite
val df1 = Seq((1, "hr")).toDF("id", "dep")
df1.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 1)
sql(s"ALTER TABLE $tableName ADD COLUMN txt STRING DEFAULT
'initial-text'")
val df2 = Seq((2, "hr"), (3, "software")).toDF("id", "dep")
df2.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
sql(s"ALTER TABLE $tableName ALTER COLUMN txt SET DEFAULT 'new-text'")
val df3 = Seq((4, "hr"), (5, "hr")).toDF("id", "dep")
df3.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
val df4 = Seq((6, "hr", null), (7, "hr", "explicit-text")).toDF("id",
"dep", "txt")
df4.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
sql(s"ALTER TABLE $tableName ALTER COLUMN txt DROP DEFAULT")
val df5 = Seq((8, "hr"), (9, "hr")).toDF("id", "dep")
df5.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
@@ -432,11 +443,13 @@ class DataSourceV2DataFrameSuite
val df1 = Seq(1, 2).toDF("id")
df1.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
sql(s"ALTER TABLE $tableName ALTER COLUMN dep SET DEFAULT 'it'")
val df2 = Seq(3, 4).toDF("id")
df2.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
@@ -450,6 +463,7 @@ class DataSourceV2DataFrameSuite
val df3 = Seq(1, 2).toDF("id")
df3.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 2)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
@@ -493,11 +507,13 @@ class DataSourceV2DataFrameSuite
val df1 = Seq(1).toDF("id")
df1.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 1)
sql(s"ALTER TABLE $tableName ALTER COLUMN dep SET DEFAULT ('i' || 't')")
val df2 = Seq(2).toDF("id")
df2.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
@@ -536,6 +552,7 @@ class DataSourceV2DataFrameSuite
val df3 = Seq(1).toDF("id")
df3.writeTo(tableName).append()
+ checkInsertMetrics(tableName, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $tableName"),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index e49cb692e3b3..cb7531a0dbaf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -439,6 +439,7 @@ class DataSourceV2SQLSuiteV1Filter
Seq((basicCatalog, basicIdentifier), (atomicCatalog,
atomicIdentifier)).foreach {
case (catalog, identifier) =>
spark.sql(s"CREATE TABLE $identifier USING foo AS SELECT id, data FROM
source")
+ checkInsertMetrics(identifier, numInsertedRows = 3)
val table = catalog.loadTable(Identifier.of(Array(), "table_name"))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index fa6edc96ec9f..4f023136a6fe 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -24,6 +24,9 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.TypeUtils._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.connector.catalog.InMemoryBaseTable
+import org.apache.spark.sql.connector.write.InsertSummary
+import org.apache.spark.sql.execution.datasources.v2.ExtractV2Table
import org.apache.spark.sql.functions.{array, lit, map, struct}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE,
PartitionOverwriteMode}
@@ -60,6 +63,7 @@ abstract class InsertIntoTests(
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
doInsert(t1, df)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
}
@@ -70,6 +74,7 @@ abstract class InsertIntoTests(
val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
doInsert(t1, dfr)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
}
@@ -79,6 +84,7 @@ abstract class InsertIntoTests(
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
doInsert(t1, df)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, df)
}
}
@@ -89,6 +95,7 @@ abstract class InsertIntoTests(
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
val df2 = Seq((4L, "d"), (5L, "e"), (6L, "f")).toDF("id", "data")
doInsert(t1, df)
+ checkInsertMetrics(t1, numInsertedRows = 3)
doInsert(t1, df2, SaveMode.Overwrite)
verifyTable(t1, df2)
}
@@ -99,6 +106,7 @@ abstract class InsertIntoTests(
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
doInsert(t1, init)
+ checkInsertMetrics(t1, numInsertedRows = 2)
val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
doInsert(t1, df, SaveMode.Overwrite)
@@ -114,6 +122,7 @@ abstract class InsertIntoTests(
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
val init = Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data")
doInsert(t1, init)
+ checkInsertMetrics(t1, numInsertedRows = 2)
val dfr = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("data", "id")
doInsert(t1, dfr, SaveMode.Overwrite)
@@ -210,6 +219,15 @@ trait InsertIntoSQLOnlyTests
/** Check that the results in `tableName` match the `expected` DataFrame. */
protected def verifyTable(tableName: String, expected: DataFrame): Unit
+ protected def checkInsertMetrics(tableName: String, numInsertedRows: Long):
Unit = {
+ val inMemoryTable =
spark.table(tableName).queryExecution.analyzed.collectFirst {
+ case ExtractV2Table(t) => t.asInstanceOf[InMemoryBaseTable]
+ }.get
+ val summary =
inMemoryTable.commits.last.writeSummary.get.asInstanceOf[InsertSummary]
+ assert(summary.numInsertedRows() === numInsertedRows,
+ s"Expected numInsertedRows=$numInsertedRows, got
${summary.numInsertedRows()}")
+ }
+
protected val v2Format: String
protected val catalogAndNamespace: String
@@ -293,6 +311,7 @@ trait InsertIntoSQLOnlyTests
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
sql(s"INSERT INTO $t1 PARTITION (id = 23) SELECT data FROM $view")
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, sql(s"SELECT 23, data FROM $view"))
}
}
@@ -303,6 +322,7 @@ trait InsertIntoSQLOnlyTests
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'also-deleted')")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) SELECT * FROM $view")
verifyTable(t1, Seq(
(1, "a"),
@@ -317,6 +337,7 @@ trait InsertIntoSQLOnlyTests
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id) SELECT * FROM $view")
verifyTable(t1, Seq(
(1, "a"),
@@ -332,6 +353,7 @@ trait InsertIntoSQLOnlyTests
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'also-deleted')")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM $view")
verifyTable(t1, Seq(
(1, "a"),
@@ -346,6 +368,7 @@ trait InsertIntoSQLOnlyTests
withTableAndData(t1) { view =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy'), (4L, 'keep')")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 SELECT * FROM $view")
verifyTable(t1, Seq(
(1, "a"),
@@ -361,6 +384,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p1 int) " +
s"USING $v2Format PARTITIONED BY (p1)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 23), (4L, 'keep', 2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p1 = 23) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 23),
@@ -377,6 +401,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'also-deleted',
2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id, p = 2) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 2),
@@ -393,6 +418,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'also-deleted',
2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 2),
@@ -409,6 +435,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'also-deleted',
2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 2),
@@ -424,6 +451,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2, id) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 2),
@@ -439,6 +467,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id, p = 2) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 2),
@@ -454,6 +483,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (p = 2) SELECT * FROM
$view")
verifyTable(t1, Seq(
(1, "a", 2),
@@ -469,6 +499,7 @@ trait InsertIntoSQLOnlyTests
sql(s"CREATE TABLE $t1 (id bigint, data string, p int) " +
s"USING $v2Format PARTITIONED BY (id, p)")
sql(s"INSERT INTO $t1 VALUES (2L, 'dummy', 2), (4L, 'keep', 2)")
+ checkInsertMetrics(t1, numInsertedRows = 2)
sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 2, p = 2) SELECT data
FROM $view")
verifyTable(t1, Seq(
(2, "a", 2),
@@ -491,6 +522,7 @@ trait InsertIntoSQLOnlyTests
df.where("true").take(5)
df.where("true").tail(5)
+ checkInsertMetrics(t1, numInsertedRows = 3)
verifyTable(t1, spark.table(view))
}
}
@@ -510,9 +542,11 @@ trait InsertIntoSQLOnlyTests
withTable(t1) {
sql(s"CREATE TABLE $t1 (c1 INT DEFAULT 42, c2 STRING DEFAULT 'hello')
USING $v2Format")
sql(s"INSERT INTO $t1 VALUES (1, DEFAULT)")
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(sql(s"SELECT * FROM $t1"), Row(1, "hello"))
sql(s"INSERT INTO $t1 VALUES (DEFAULT, DEFAULT)")
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1 ORDER BY c1"),
Seq(Row(1, "hello"), Row(42, "hello")))
@@ -565,8 +599,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
s"TBLPROPERTIES ('auto-schema-evolution' = 'false')")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// Same column count, no evolution needed: should succeed even without
capability.
doInsertWithSchemaEvolution(t1, Seq((2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
}
}
@@ -576,7 +612,9 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1, Seq((2L, "b")).toDF("x", "y"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// No evolution
verifyTable(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
}
@@ -587,8 +625,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "b", true)).toDF("id", "data", "active"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq[(Long, String, java.lang.Boolean)](
(1L, "a", null),
(2L, "b", true)
@@ -601,8 +641,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "b", true, 100L)).toDF("id", "data", "active", "score"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq[(Long, String, java.lang.Boolean, java.lang.Long)](
(1L, "a", null, null),
(2L, "b", true, 100L)
@@ -615,7 +657,9 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1, Seq((2L, "b", true)).toDF("x", "y", "z"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq[(Long, String, java.lang.Boolean)](
(1L, "a", null),
(2L, "b", true)
@@ -629,6 +673,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsertWithSchemaEvolution(t1,
Seq((1L, "a", true)).toDF("id", "data", "active"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq(
(1L, "a", true)
).toDF("id", "data", "active"))
@@ -642,9 +687,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1, "Alice")).toDF("id", "name")
.select($"id", struct($"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2, "Bob", 30)).toDF("id", "name", "age")
.select($"id", struct($"name", $"age").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1, Row("Alice", null)), Row(2, Row("Bob", 30))))
@@ -658,9 +705,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1, "Alice")).toDF("id", "name")
.select($"id", struct($"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2, "Bob", 30)).toDF("id", "firstName", "age")
.select($"id", struct($"firstName", $"age").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1, Row("Alice", null)), Row(2, Row("Bob", 30))))
@@ -672,8 +721,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq(("b", true, 2L)).toDF("data", "active", "id"), byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq[(Long, String, java.lang.Boolean)](
(1L, "a", null),
(2L, "b", true)
@@ -688,10 +739,12 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1, "Alice")).toDF("id", "name")
.select($"id", struct($"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2, 30, "Bob")).toDF("id", "age", "name")
.select($"id", struct($"age", $"name").as("info")),
byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1, Row("Alice", null)), Row(2, Row("Bob", 30))))
@@ -705,10 +758,12 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1, "Alice")).toDF("id", "name")
.select($"id", struct($"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2, 30, "Bob", "NYC")).toDF("id", "age", "name", "city")
.select($"id", struct($"age", $"name", $"city").as("info")),
byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1, Row("Alice", null, null)), Row(2, Row("Bob", 30, "NYC"))))
@@ -720,8 +775,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq(("b", 2L)).toDF("data", "id"), byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
// No evolution
verifyTable(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
}
@@ -732,8 +789,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq(("b", 2L)).toDF("x", "y"), byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq[(java.lang.Long, String, String, java.lang.Long)](
(1L, "a", null, null),
(null, null, "b", 2L)
@@ -748,6 +807,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1, "Alice")).toDF("id", "name")
.select($"id", struct($"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2, 30, "Bob")).toDF("id", "age", "name")
.select($"id", struct($"age", $"name").as("info")),
@@ -764,6 +824,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
doInsert(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 2)
// REPLACE WHERE only deletes rows matching the predicate, then inserts
new data.
doInsertWithSchemaEvolution(t1,
Seq((2L, "x", true), (4L, "y", false)).toDF("id", "data", "active"),
@@ -781,6 +842,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
doInsert(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 2)
doInsertWithSchemaEvolution(t1,
Seq((true, "x", 2L), (false, "y", 4L)).toDF("active", "data", "id"),
byName = true,
@@ -801,6 +863,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
val initDf = Seq((1L, "Alice"), (2L, "Bob")).toDF("id", "name")
.select($"id", struct($"name").as("info"))
doInsert(t1, initDf)
+ checkInsertMetrics(t1, numInsertedRows = 2)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bobby", 25)).toDF("id", "name", "age")
.select($"id", struct($"name", $"age").as("info")),
@@ -820,6 +883,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
val initDf = Seq((1L, "Alice"), (2L, "Bob")).toDF("id", "name")
.select($"id", struct($"name").as("info"))
doInsert(t1, initDf)
+ checkInsertMetrics(t1, numInsertedRows = 2)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bobby", 25)).toDF("id", "name", "age")
.select($"id", struct($"age", $"name").as("info")),
@@ -853,6 +917,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
doInsert(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 2)
// Overwrite with schema evolution adding a new column, dynamic mode
should only replace
// partitions present in the inserted data.
doInsertWithSchemaEvolution(t1,
@@ -874,6 +939,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
doInsert(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 2)
doInsertWithSchemaEvolution(t1,
Seq((true, "x", 2L), (false, "y", 3L)).toDF("active", "data", "id"),
mode = SaveMode.Overwrite,
@@ -894,6 +960,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format
PARTITIONED BY (id)")
doInsert(t1, Seq((1L, "a"), (2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 2)
// Static mode overwrites the entire table.
doInsertWithSchemaEvolution(t1,
Seq((2L, "x", true), (3L, "y", false)).toDF("id", "data", "active"),
@@ -949,8 +1016,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
sql(s"CREATE TABLE $t1 (id bigint) USING $v2Format")
doInsertWithSchemaEvolution(t1,
Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "b", true)).toDF("id", "data", "active"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
verifyTable(t1, Seq[(Long, String, java.lang.Boolean)](
(1L, "a", null),
(2L, "b", true)
@@ -965,9 +1034,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice")).toDF("id", "name")
.select($"id", struct(struct($"name").as("nested")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bob", 30)).toDF("id", "name", "age")
.select($"id", struct(struct($"name",
$"age").as("nested")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, Row(Row("Alice", null))), Row(2L, Row(Row("Bob", 30)))))
@@ -981,10 +1052,12 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice")).toDF("id", "name")
.select($"id", struct(struct($"name").as("nested")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bob", 30)).toDF("id", "name", "age")
.select($"id", struct(struct($"age",
$"name").as("nested")).as("info")),
byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, Row(Row("Alice", null))), Row(2L, Row(Row("Bob", 30)))))
@@ -998,9 +1071,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice")).toDF("id", "name")
.select($"id", array(struct($"name")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bob", 30)).toDF("id", "name", "age")
.select($"id", array(struct($"name", $"age")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(
@@ -1016,9 +1091,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "A", "Alice")).toDF("id", "key", "name")
.select($"id", map($"key", struct($"name")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "B", "Bob", 30)).toDF("id", "key", "name", "age")
.select($"id", map($"key", struct($"name", $"age")).as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(
@@ -1034,9 +1111,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice", "A")).toDF("id", "name", "value")
.select($"id", map(struct($"name"), $"value").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bob", 30, "B")).toDF("id", "name", "age", "value")
.select($"id", map(struct($"name", $"age"), $"value").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(
@@ -1050,8 +1129,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id int, data string) USING $v2Format")
doInsert(t1, Seq((1, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((Long.MaxValue, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, "a"), Row(Long.MaxValue, "b")))
@@ -1064,8 +1145,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id int, data string) USING $v2Format")
doInsert(t1, Seq((1, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq(("b", Long.MaxValue)).toDF("data", "id"), byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, "a"), Row(Long.MaxValue, "b")))
@@ -1078,8 +1161,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id int, data string) USING $v2Format")
doInsert(t1, Seq((1, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((Long.MaxValue, "b", true)).toDF("id", "data", "active"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(
@@ -1098,9 +1183,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice", 100)).toDF("id", "name", "value")
.select($"id", struct($"value", $"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bob", Long.MaxValue)).toDF("id", "name", "value")
.select($"id", struct($"value", $"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT id, info.value, info.name FROM $t1"),
Seq(Row(1L, 100L, "Alice"), Row(2L, Long.MaxValue, "Bob")))
@@ -1116,10 +1203,12 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice", 100)).toDF("id", "name", "value")
.select($"id", struct($"value", $"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "Bob", Long.MaxValue)).toDF("id", "name", "value")
.select($"id", struct($"name", $"value").as("info")),
byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT id, info.value, info.name FROM $t1"),
Seq(Row(1L, 100L, "Alice"), Row(2L, Long.MaxValue, "Bob")))
@@ -1135,9 +1224,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, 100)).toDF("id", "value")
.select($"id", array(struct($"value")).as("arr")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, Long.MaxValue)).toDF("id", "value")
.select($"id", array(struct($"value")).as("arr")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT id, arr[0].value FROM $t1"),
Seq(Row(1L, 100L), Row(2L, Long.MaxValue)))
@@ -1154,9 +1245,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "k1", 100)).toDF("id", "key", "value")
.select($"id", map($"key", struct($"value")).as("m")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((2L, "k2", Long.MaxValue)).toDF("id", "key", "value")
.select($"id", map($"key", struct($"value")).as("m")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT id, m['k1'].value, m['k2'].value FROM $t1"),
Seq(Row(1L, 100L, null), Row(2L, null, Long.MaxValue)))
@@ -1171,6 +1264,7 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id int, data string) USING $v2Format")
doInsert(t1, Seq((1, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq((Long.MaxValue, "b")).toDF("id", "data"), mode =
SaveMode.Overwrite)
checkAnswer(
@@ -1185,8 +1279,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// Inserting an int into a long column should not narrow the schema.
doInsertWithSchemaEvolution(t1, Seq((2, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, "a"), Row(2L, "b")))
@@ -1200,9 +1296,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id float, data string) USING $v2Format")
doInsert(t1, Seq((1f, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// Inserting a double into a float should widen the schema, inserting an
int into a string
// should retain the string type.
doInsertWithSchemaEvolution(t1, Seq((2d, 3)).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1d, "a"), Row(2d, "3")))
@@ -1239,9 +1337,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// Insert a null value with NullType - should not change the target
column type.
doInsertWithSchemaEvolution(t1,
Seq(2L).toDF("id").withColumn("data", lit(null)))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, "a"), Row(2L, null)))
@@ -1254,9 +1354,11 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
doInsert(t1, Seq((1L, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
doInsertWithSchemaEvolution(t1,
Seq(2L).toDF("id").withColumn("data", lit(null)),
byName = true)
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1L, "a"), Row(2L, null)))
@@ -1271,10 +1373,12 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
doInsert(t1,
Seq((1L, "Alice", 100)).toDF("id", "name", "value")
.select($"id", struct($"value", $"name").as("info")))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// Insert with NullType for nested field - should not change the struct
field type.
doInsertWithSchemaEvolution(t1,
Seq(2L).toDF("id")
.withColumn("info", struct(lit(null).as("value"),
lit("Bob").as("name"))))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT id, info.value, info.name FROM $t1"),
Seq(Row(1L, 100, "Alice"), Row(2L, null, "Bob")))
@@ -1288,8 +1392,10 @@ trait InsertIntoSchemaEvolutionTests { this:
InsertIntoTests =>
withTable(t1) {
sql(s"CREATE TABLE $t1 (id int, data string) USING $v2Format")
doInsert(t1, Seq((1, "a")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
// Insert without schema evolution - should cast to target type, not
widen.
doInsert(t1, Seq((2L, "b")).toDF("id", "data"))
+ checkInsertMetrics(t1, numInsertedRows = 1)
checkAnswer(
sql(s"SELECT * FROM $t1"),
Seq(Row(1, "a"), Row(2, "b")))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index aaf45f0f5f7a..b902074b547c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -484,6 +484,16 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(1, 100, "hr"), // insert
Row(2, 200, "finance"), // insert
Row(3, 300, "hr"))) // insert
+
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsInserted === 3L)
+ assert(mergeSummary.numTargetRowsCopied === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -510,6 +520,16 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Seq(
Row(2, 200, "finance"), // insert
Row(3, 300, "hr"))) // insert
+
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsInserted === 2L)
+ assert(mergeSummary.numTargetRowsCopied === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
@@ -539,6 +559,16 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
Row(1, 100, "hr"), // insert
Row(2, 200, "finance"), // insert
Row(3, 300, "hr"))) // insert
+
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsInserted === 3L)
+ assert(mergeSummary.numTargetRowsCopied === 0L)
+ assert(mergeSummary.numTargetRowsUpdated === 0L)
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
index 721b86593bac..3e48c5222e6f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V1WriteFallbackSuite.scala
@@ -253,6 +253,9 @@ class V1WriteFallbackSessionCatalogSuite
extends InsertIntoTests(supportsDynamicOverwrite = false,
includeSQLOnlyTests = true)
with SessionCatalogTest[InMemoryTableWithV1Fallback, V1FallbackTableCatalog]
{
+ // V1 fallback writes do not flow through V2TableWriteExec, so no
InsertSummary is emitted.
+ override protected def checkInsertMetrics(tableName: String,
numInsertedRows: Long): Unit = ()
+
override protected val v2Format = classOf[InMemoryV1Provider].getName
override protected val catalogClassName: String =
classOf[V1FallbackTableCatalog].getName
override protected val catalogAndNamespace: String = ""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]