This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1996434b646 [SPARK-40550][SQL] DataSource V2: Handle DELETE commands
for delta-based sources
1996434b646 is described below
commit 1996434b646040c65c09d9efb7888548900f00fc
Author: aokolnychyi <[email protected]>
AuthorDate: Thu Jan 19 15:20:12 2023 -0800
[SPARK-40550][SQL] DataSource V2: Handle DELETE commands for delta-based
sources
### What changes were proposed in this pull request?
This PR adds support for DELETE commands for delta-based sources and
implements the API added in PR #38004.
Suppose there is a data source capable of encoding deletes using primary
keys (`pk`). Also, this data source requires knowing the source file, which can
be projected via a metadata column (`_file`), to encode deletes efficiently.
As an example, there will be a table with 1 file that contains 3 records.
```
pk | salary | department
------------------------
1, 100, hr
2, 50, software
3, 150, hardware
```
This PR would rewrite `DELETE FROM t WHERE salary <= 100` to perform the
following steps:
- find records that need to be removed by scanning the table with the
delete condition;
- project required columns to encode deletes (`pk` + `_file` in our case);
- form a set of changes by adding a new column `__row_operation` column
with value `delete`;
- write the set of changes to the table using `WriteDeltaExec` and
`DeltaWriter`;
The set of changes to encode for the DELETE statement above will look like
this:
```
__row_operation | pk | _file
----------------------------
delete, 1, file_a.parquet
delete, 2, file_a.parquet
```
As opposed to group-based deletes that Spark already supports, the new
logic will be able to discard records that did not change in the file that had
matches (i.e. the record with `pk = 3` did not match the condition and was
discarded).
Then `WriteDeltaExec` will handle this set of changes and translate them
into `delete()` calls on `DeltaWriter`. In the future, this logic will be
extended to also cover UPDATEs and MERGEs by adding `update` and `insert` row
operations to the set of changes supported by `WriteDeltaExec`.
### Why are the changes needed?
Thes changes are needed as per SPIP SPARK-35801.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR comes with tests.
Closes #38005 from aokolnychyi/spark-40550-proto.
Authored-by: aokolnychyi <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/resources/error/error-classes.json | 6 +
.../spark/sql/catalyst/ProjectingInternalRow.scala | 117 +++++++++++
.../catalyst/analysis/RewriteDeleteFromTable.scala | 40 +++-
.../catalyst/analysis/RewriteRowLevelCommand.scala | 61 +++++-
.../ReplaceNullWithFalseInPredicate.scala | 3 +-
.../sql/catalyst/plans/logical/v2Commands.scala | 123 ++++++++++-
.../spark/sql/catalyst/util/RowDeltaUtils.scala | 28 +++
.../sql/catalyst/util/WriteDeltaProjections.scala | 25 +++
.../spark/sql/errors/QueryCompilationErrors.scala | 6 +
.../sql/connector/catalog/InMemoryBaseTable.scala | 14 +-
.../catalog/InMemoryRowLevelOperationTable.scala | 84 +++++++-
.../datasources/v2/DataSourceV2Strategy.scala | 5 +
.../v2/OptimizeMetadataOnlyDeleteFromTable.scala | 6 +-
.../sql/execution/datasources/v2/V2Writes.scala | 34 ++-
.../datasources/v2/WriteToDataSourceV2Exec.scala | 113 +++++++++-
.../sql/connector/DeleteFromTableSuiteBase.scala | 234 +++++++++++----------
.../connector/DeltaBasedDeleteFromTableSuite.scala | 42 ++++
17 files changed, 798 insertions(+), 143 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 78b1bf0743d..993d1d084e7 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1048,6 +1048,12 @@
],
"sqlState" : "42000"
},
+ "NULLABLE_ROW_ID_ATTRIBUTES" : {
+ "message" : [
+ "Row ID attributes cannot be nullable: <nullableRowIdAttrs>."
+ ],
+ "sqlState" : "42000"
+ },
"NULL_MAP_KEY" : {
"message" : [
"Cannot use null as map key."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
new file mode 100644
index 00000000000..429ce805bf2
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ProjectingInternalRow.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal, StructType}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+/**
+ * An [[InternalRow]] that projects particular columns from another
[[InternalRow]] without copying
+ * the underlying data.
+ */
+case class ProjectingInternalRow(schema: StructType, colOrdinals: Seq[Int])
extends InternalRow {
+ assert(schema.size == colOrdinals.size)
+
+ private var row: InternalRow = _
+
+ override def numFields: Int = colOrdinals.size
+
+ def project(row: InternalRow): Unit = {
+ this.row = row
+ }
+
+ override def setNullAt(i: Int): Unit = {
+ throw new UnsupportedOperationException(s"Cannot modify
${getClass.getName}")
+ }
+
+ override def update(i: Int, value: Any): Unit = {
+ throw new UnsupportedOperationException(s"Cannot modify
${getClass.getName}")
+ }
+
+ override def copy(): InternalRow = {
+ val newRow = if (row != null) row.copy() else null
+ val newProjection = ProjectingInternalRow(schema, colOrdinals)
+ newProjection.project(newRow)
+ newProjection
+ }
+
+ override def isNullAt(ordinal: Int): Boolean = {
+ row.isNullAt(colOrdinals(ordinal))
+ }
+
+ override def getBoolean(ordinal: Int): Boolean = {
+ row.getBoolean(colOrdinals(ordinal))
+ }
+
+ override def getByte(ordinal: Int): Byte = {
+ row.getByte(colOrdinals(ordinal))
+ }
+
+ override def getShort(ordinal: Int): Short = {
+ row.getShort(colOrdinals(ordinal))
+ }
+
+ override def getInt(ordinal: Int): Int = {
+ row.getInt(colOrdinals(ordinal))
+ }
+
+ override def getLong(ordinal: Int): Long = {
+ row.getLong(colOrdinals(ordinal))
+ }
+
+ override def getFloat(ordinal: Int): Float = {
+ row.getFloat(colOrdinals(ordinal))
+ }
+
+ override def getDouble(ordinal: Int): Double = {
+ row.getDouble(colOrdinals(ordinal))
+ }
+
+ override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal =
{
+ row.getDecimal(colOrdinals(ordinal), precision, scale)
+ }
+
+ override def getUTF8String(ordinal: Int): UTF8String = {
+ row.getUTF8String(colOrdinals(ordinal))
+ }
+
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ row.getBinary(colOrdinals(ordinal))
+ }
+
+ override def getInterval(ordinal: Int): CalendarInterval = {
+ row.getInterval(colOrdinals(ordinal))
+ }
+
+ override def getStruct(ordinal: Int, numFields: Int): InternalRow = {
+ row.getStruct(colOrdinals(ordinal), numFields)
+ }
+
+ override def getArray(ordinal: Int): ArrayData = {
+ row.getArray(colOrdinals(ordinal))
+ }
+
+ override def getMap(ordinal: Int): MapData = {
+ row.getMap(colOrdinals(ordinal))
+ }
+
+ override def get(ordinal: Int, dataType: DataType): AnyRef = {
+ row.get(colOrdinals(ordinal), dataType)
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
index 394e43dce72..b4e077671d4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
@@ -17,12 +17,13 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, Expression,
Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, EqualNullSafe,
Expression, Literal, Not}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, ReplaceData}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, Project, ReplaceData, WriteDelta}
+import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2,
SupportsRowLevelOperations, TruncatableTable}
+import org.apache.spark.sql.connector.write.{RowLevelOperationTable,
SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
-import org.apache.spark.sql.connector.write.RowLevelOperationTable
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -45,7 +46,12 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand
{
case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _,
_) =>
val table = buildOperationTable(t, DELETE,
CaseInsensitiveStringMap.empty())
- buildReplaceDataPlan(r, table, cond)
+ table.operation match {
+ case _: SupportsDelta =>
+ buildWriteDeltaPlan(r, table, cond)
+ case _ =>
+ buildReplaceDataPlan(r, table, cond)
+ }
case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
// don't rewrite as the table supports deletes only with filters
@@ -82,4 +88,30 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand
{
val writeRelation = relation.copy(table = operationTable)
ReplaceData(writeRelation, cond, remainingRowsPlan, relation)
}
+
+ // build a rewrite plan for sources that support row deltas
+ private def buildWriteDeltaPlan(
+ relation: DataSourceV2Relation,
+ operationTable: RowLevelOperationTable,
+ cond: Expression): WriteDelta = {
+
+ // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+ val operation = operationTable.operation.asInstanceOf[SupportsDelta]
+ val rowIdAttrs = resolveRowIdAttrs(relation, operation)
+ val metadataAttrs = resolveRequiredMetadataAttrs(relation, operation)
+
+ // construct a read relation and include all required metadata columns
+ val readRelation = buildRelationWithAttrs(relation, operationTable,
metadataAttrs, rowIdAttrs)
+
+ // construct a plan that only contains records to delete
+ val deletedRowsPlan = Filter(cond, readRelation)
+ val operationType = Alias(Literal(DELETE_OPERATION), OPERATION_COLUMN)()
+ val requiredWriteAttrs = dedupAttrs(rowIdAttrs ++ metadataAttrs)
+ val project = Project(operationType +: requiredWriteAttrs, deletedRowsPlan)
+
+ // build a plan to write deletes to the table
+ val writeRelation = relation.copy(table = operationTable)
+ val projections = buildWriteDeltaProjections(project, Nil, rowIdAttrs,
metadataAttrs)
+ WriteDelta(writeRelation, cond, project, relation, projections)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
index bf8c3e27f4d..1181d85e8be 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommand.scala
@@ -19,13 +19,17 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId,
V2ExpressionUtils}
+import org.apache.spark.sql.catalyst.ProjectingInternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, ExprId, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
-import org.apache.spark.sql.connector.write.{RowLevelOperation,
RowLevelOperationInfoImpl, RowLevelOperationTable}
+import org.apache.spark.sql.connector.write.{RowLevelOperation,
RowLevelOperationInfoImpl, RowLevelOperationTable, SupportsDelta}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
@@ -42,9 +46,10 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
protected def buildRelationWithAttrs(
relation: DataSourceV2Relation,
table: RowLevelOperationTable,
- metadataAttrs: Seq[AttributeReference]): DataSourceV2Relation = {
+ metadataAttrs: Seq[AttributeReference],
+ rowIdAttrs: Seq[AttributeReference] = Nil): DataSourceV2Relation = {
- val attrs = dedupAttrs(relation.output ++ metadataAttrs)
+ val attrs = dedupAttrs(relation.output ++ rowIdAttrs ++ metadataAttrs)
relation.copy(table = table, output = attrs)
}
@@ -68,4 +73,52 @@ trait RewriteRowLevelCommand extends Rule[LogicalPlan] {
operation.requiredMetadataAttributes,
relation)
}
+
+ protected def resolveRowIdAttrs(
+ relation: DataSourceV2Relation,
+ operation: SupportsDelta): Seq[AttributeReference] = {
+
+ val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+ operation.rowId,
+ relation)
+
+ val nullableRowIdAttrs = rowIdAttrs.filter(_.nullable)
+ if (nullableRowIdAttrs.nonEmpty) {
+ throw QueryCompilationErrors.nullableRowIdError(nullableRowIdAttrs)
+ }
+
+ rowIdAttrs
+ }
+
+ protected def buildWriteDeltaProjections(
+ plan: LogicalPlan,
+ rowAttrs: Seq[Attribute],
+ rowIdAttrs: Seq[Attribute],
+ metadataAttrs: Seq[Attribute]): WriteDeltaProjections = {
+
+ val rowProjection = if (rowAttrs.nonEmpty) {
+ Some(newLazyProjection(plan, rowAttrs))
+ } else {
+ None
+ }
+
+ val rowIdProjection = newLazyProjection(plan, rowIdAttrs)
+
+ val metadataProjection = if (metadataAttrs.nonEmpty) {
+ Some(newLazyProjection(plan, metadataAttrs))
+ } else {
+ None
+ }
+
+ WriteDeltaProjections(rowProjection, rowIdProjection, metadataProjection)
+ }
+
+ private def newLazyProjection(
+ plan: LogicalPlan,
+ attrs: Seq[Attribute]): ProjectingInternalRow = {
+
+ val colOrdinals = attrs.map(attr => plan.output.indexWhere(_.exprId ==
attr.exprId))
+ val schema = StructType.fromAttributes(attrs)
+ ProjectingInternalRow(schema, colOrdinals)
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
index 3ffaca6f54c..9d81ba2fadb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceNullWithFalseInPredicate.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.expressions.{And, ArrayExists,
ArrayFilter, CaseWhen, EqualNullSafe, Expression, If, In, InSet,
LambdaFunction, Literal, MapFilter, Not, Or}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction,
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan,
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction,
UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteAction,
DeleteFromTable, Filter, InsertAction, InsertStarAction, Join, LogicalPlan,
MergeAction, MergeIntoTable, ReplaceData, UpdateAction, UpdateStarAction,
UpdateTable, WriteDelta}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.{INSET, NULL_LITERAL,
TRUE_OR_FALSE_LITERAL}
import org.apache.spark.sql.types.BooleanType
@@ -55,6 +55,7 @@ object ReplaceNullWithFalseInPredicate extends
Rule[LogicalPlan] {
case f @ Filter(cond, _) => f.copy(condition = replaceNullWithFalse(cond))
case j @ Join(_, _, _, Some(cond), _) => j.copy(condition =
Some(replaceNullWithFalse(cond)))
case rd @ ReplaceData(_, cond, _, _, _) => rd.copy(condition =
replaceNullWithFalse(cond))
+ case wd @ WriteDelta(_, cond, _, _, _, _) => wd.copy(condition =
replaceNullWithFalse(cond))
case d @ DeleteFromTable(_, cond) => d.copy(condition =
replaceNullWithFalse(cond))
case u @ UpdateTable(_, _, Some(cond)) => u.copy(condition =
Some(replaceNullWithFalse(cond)))
case m: MergeIntoTable =>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index eaf74324322..9508b2fb993 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -21,16 +21,16 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext,
EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec,
ResolvedIdentifier, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, Expression, MetadataAttribute,
NamedExpression, Unevaluable, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
import org.apache.spark.sql.catalyst.trees.BinaryLike
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, RowDeltaUtils,
WriteDeltaProjections}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.connector.write.{RowLevelOperation,
RowLevelOperationTable, Write}
+import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation,
RowLevelOperationTable, SupportsDelta, Write}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{BooleanType, DataType, MetadataBuilder,
StringType, StructType}
+import org.apache.spark.sql.types.{BooleanType, DataType, IntegerType,
MetadataBuilder, StringType, StructField, StructType}
// For v2 DML commands, it may end up with the v1 fallback code path and need
to build a DataFrame
// which is required by the DS v1 API. We need to keep the analyzed input
query plan to build
@@ -274,6 +274,121 @@ case class ReplaceData(
}
}
+/**
+ * Writes a delta of rows to an existing table during a row-level operation.
+ *
+ * This node references a query that translates a logical DELETE, UPDATE,
MERGE operation into
+ * a set of row-level changes to be encoded in the table. Each row in the
query represents either
+ * a delete, update or insert and stores the operation type in a special
column.
+ *
+ * This node is constructed in rules that rewrite DELETE, UPDATE, MERGE
operations for data sources
+ * that can handle deltas of rows.
+ *
+ * @param table a plan that references a row-level operation table
+ * @param condition a condition that defines matching records
+ * @param query a query with a delta of records that should written
+ * @param originalTable a plan for the original table for which the row-level
command was triggered
+ * @param projections projections for row ID, row, metadata attributes
+ * @param write a logical write, if already constructed
+ */
+case class WriteDelta(
+ table: NamedRelation,
+ condition: Expression,
+ query: LogicalPlan,
+ originalTable: NamedRelation,
+ projections: WriteDeltaProjections,
+ write: Option[DeltaWrite] = None) extends RowLevelWrite {
+
+ override val isByName: Boolean = false
+ override val stringArgs: Iterator[Any] = Iterator(table, query, write)
+
+ override lazy val references: AttributeSet = query.outputSet
+
+ lazy val operation: SupportsDelta = {
+ EliminateSubqueryAliases(table) match {
+ case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _,
_) =>
+ operation.asInstanceOf[SupportsDelta]
+ case _ =>
+ throw new AnalysisException(s"Cannot retrieve row-level operation from
$table")
+ }
+ }
+
+ override def outputResolved: Boolean = {
+ assert(table.resolved && query.resolved,
+ "`outputResolved` can only be called when `table` and `query` are both
resolved.")
+
+ operationResolved && rowAttrsResolved && rowIdAttrsResolved &&
metadataAttrsResolved
+ }
+
+ private def operationResolved: Boolean = {
+ val attr = query.output.head
+ attr.name == RowDeltaUtils.OPERATION_COLUMN && attr.dataType ==
IntegerType && !attr.nullable
+ }
+
+ // validates row projection output is compatible with table attributes
+ private def rowAttrsResolved: Boolean = {
+ table.skipSchemaResolution || (projections.rowProjection match {
+ case Some(projection) =>
+ table.output.size == projection.schema.size &&
+ projection.schema.zip(table.output).forall { case (field, outAttr) =>
+ isCompatible(field, outAttr)
+ }
+ case None =>
+ true
+ })
+ }
+
+ // validates row ID projection output is compatible with row ID attributes
+ private def rowIdAttrsResolved: Boolean = {
+ val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+ operation.rowId,
+ originalTable)
+
+ val projectionSchema = projections.rowIdProjection.schema
+ rowIdAttrs.size == projectionSchema.size && projectionSchema.forall {
field =>
+ rowIdAttrs.exists(rowIdAttr => isCompatible(field, rowIdAttr))
+ }
+ }
+
+ // validates metadata projection output is compatible with metadata
attributes
+ private def metadataAttrsResolved: Boolean = {
+ projections.metadataProjection match {
+ case Some(projection) =>
+ val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
+ operation.requiredMetadataAttributes,
+ originalTable)
+
+ val projectionSchema = projection.schema
+ metadataAttrs.size == projectionSchema.size && projectionSchema.forall
{ field =>
+ metadataAttrs.exists(metadataAttr => isCompatible(field,
metadataAttr))
+ }
+ case None =>
+ true
+ }
+ }
+
+ // checks if a projection field is compatible with a table attribute
+ private def isCompatible(inField: StructField, outAttr: NamedExpression):
Boolean = {
+ val inType =
CharVarcharUtils.getRawType(inField.metadata).getOrElse(inField.dataType)
+ val outType =
CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
+ // names and types must match, nullability must be compatible
+ inField.name == outAttr.name &&
+ DataType.equalsIgnoreCompatibleNullability(inType, outType) &&
+ (outAttr.nullable || !inField.nullable)
+ }
+
+ override def withNewQuery(newQuery: LogicalPlan): V2WriteCommand =
copy(query = newQuery)
+
+ override def withNewTable(newTable: NamedRelation): V2WriteCommand =
copy(table = newTable)
+
+ // WriteDelta has no v1 fallback
+ override def storeAnalyzedQuery(): Command = this
+
+ override protected def withNewChildInternal(newChild: LogicalPlan):
WriteDelta = {
+ copy(query = newChild)
+ }
+}
+
/** A trait used for logical plan nodes that create or replace V2 table
definitions. */
trait V2CreateTablePlan extends LogicalPlan {
def tableName: Identifier
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala
new file mode 100644
index 00000000000..57f2a91092e
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RowDeltaUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+/**
+ * A utility that holds constants for handling deltas of rows.
+ */
+object RowDeltaUtils {
+ final val OPERATION_COLUMN: String = "__row_operation"
+ final val DELETE_OPERATION: Int = 1
+ final val UPDATE_OPERATION: Int = 2
+ final val INSERT_OPERATION: Int = 3
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/WriteDeltaProjections.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/WriteDeltaProjections.scala
new file mode 100644
index 00000000000..90f0be60c53
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/WriteDeltaProjections.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.ProjectingInternalRow
+
+case class WriteDeltaProjections(
+ rowProjection: Option[ProjectingInternalRow],
+ rowIdProjection: ProjectingInternalRow,
+ metadataProjection: Option[ProjectingInternalRow])
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 5fe141e7286..c415fb91c5d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3418,4 +3418,10 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
SparkException.internalError(
"The operation `dataType` is not supported.")
}
+
+ def nullableRowIdError(nullableRowIdAttrs: Seq[AttributeReference]):
Throwable = {
+ new AnalysisException(
+ errorClass = "NULLABLE_ROW_ID_ATTRIBUTES",
+ messageParameters = Map("nullableRowIdAttrs" ->
nullableRowIdAttrs.mkString(", ")))
+ }
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index 12653cae402..28b68a71a47 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -232,6 +232,17 @@ abstract class InMemoryBaseTable(
dataMap(key).clear()
}
+ def withDeletes(data: Array[BufferedRows]): InMemoryBaseTable = {
+ data.foreach { p =>
+ dataMap ++= dataMap.map { case (key, currentRows) =>
+ val newRows = new BufferedRows(currentRows.key)
+ newRows.rows ++= currentRows.rows.filter(r =>
!p.deletes.contains(r.getInt(0)))
+ key -> newRows
+ }
+ }
+ this
+ }
+
def withData(data: Array[BufferedRows]): InMemoryBaseTable = {
withData(data, schema)
}
@@ -521,6 +532,7 @@ object InMemoryBaseTable {
class BufferedRows(val key: Seq[Any] = Seq.empty) extends WriterCommitMessage
with InputPartition with HasPartitionKey with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()
+ val deletes = new mutable.ArrayBuffer[Int]()
def withRow(row: InternalRow): BufferedRows = {
rows.append(row)
@@ -615,7 +627,7 @@ private object BufferedRowsWriterFactory extends
DataWriterFactory with Streamin
}
private class BufferWriter extends DataWriter[InternalRow] {
- private val buffer = new BufferedRows
+ protected val buffer = new BufferedRows
override def write(row: InternalRow): Unit = buffer.rows.append(row.copy())
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
index 08c22a02b85..4db15efe9eb 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryRowLevelOperationTable.scala
@@ -19,10 +19,11 @@ package org.apache.spark.sql.connector.catalog
import java.util
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
import org.apache.spark.sql.connector.expressions.{FieldReference,
LogicalExpressions, NamedReference, SortDirection, SortOrder, Transform}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo,
RequiresDistributionAndOrdering, RowLevelOperation, RowLevelOperationBuilder,
RowLevelOperationInfo, Write, WriteBuilder, WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DeltaBatchWrite,
DeltaWrite, DeltaWriteBuilder, DeltaWriter, DeltaWriterFactory,
LogicalWriteInfo, PhysicalWriteInfo, RequiresDistributionAndOrdering,
RowLevelOperation, RowLevelOperationBuilder, RowLevelOperationInfo,
SupportsDelta, Write, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -34,17 +35,22 @@ class InMemoryRowLevelOperationTable(
properties: util.Map[String, String])
extends InMemoryTable(name, schema, partitioning, properties) with
SupportsRowLevelOperations {
+ private final val PARTITION_COLUMN_REF =
FieldReference(PartitionKeyColumn.name)
+ private final val SUPPORTS_DELTAS = "supports-deltas"
+
// used in row-level operation tests to verify replaced partitions
var replacedPartitions: Seq[Seq[Any]] = Seq.empty
override def newRowLevelOperationBuilder(
info: RowLevelOperationInfo): RowLevelOperationBuilder = {
- () => PartitionBasedOperation(info.command)
+ if (properties.getOrDefault(SUPPORTS_DELTAS, "false") == "true") {
+ () => DeltaBasedOperation(info.command)
+ } else {
+ () => PartitionBasedOperation(info.command)
+ }
}
case class PartitionBasedOperation(command: Command) extends
RowLevelOperation {
- private final val PARTITION_COLUMN_REF =
FieldReference(PartitionKeyColumn.name)
-
var configuredScan: InMemoryBatchScan = _
override def requiredMetadataAttributes(): Array[NamedReference] = {
@@ -97,4 +103,74 @@ class InMemoryRowLevelOperationTable(
withData(newData, schema)
}
}
+
+ case class DeltaBasedOperation(command: Command) extends RowLevelOperation
with SupportsDelta {
+ private final val PK_COLUMN_REF = FieldReference("pk")
+
+ override def requiredMetadataAttributes(): Array[NamedReference] = {
+ Array(PARTITION_COLUMN_REF)
+ }
+
+ override def rowId(): Array[NamedReference] = Array(PK_COLUMN_REF)
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
ScanBuilder = {
+ new InMemoryScanBuilder(schema)
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): DeltaWriteBuilder =
+ new DeltaWriteBuilder {
+ override def build(): DeltaWrite = new DeltaWrite with
RequiresDistributionAndOrdering {
+
+ override def requiredDistribution(): Distribution = {
+ Distributions.clustered(Array(PARTITION_COLUMN_REF))
+ }
+
+ override def requiredOrdering(): Array[SortOrder] = {
+ Array[SortOrder](
+ LogicalExpressions.sort(
+ PARTITION_COLUMN_REF,
+ SortDirection.ASCENDING,
+ SortDirection.ASCENDING.defaultNullOrdering())
+ )
+ }
+
+ override def toBatch: DeltaBatchWrite = TestDeltaBatchWrite
+ }
+ }
+ }
+
+ private object TestDeltaBatchWrite extends DeltaBatchWrite {
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DeltaWriterFactory = {
+ DeltaBufferedRowsWriterFactory
+ }
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ withDeletes(messages.map(_.asInstanceOf[BufferedRows]))
+ withData(messages.map(_.asInstanceOf[BufferedRows]))
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+ }
+}
+
+private object DeltaBufferedRowsWriterFactory extends DeltaWriterFactory {
+ override def createWriter(partitionId: Int, taskId: Long):
DeltaWriter[InternalRow] = {
+ new DeltaBufferWriter
+ }
+}
+
+private class DeltaBufferWriter extends BufferWriter with
DeltaWriter[InternalRow] {
+
+ override def delete(meta: InternalRow, id: InternalRow): Unit =
buffer.deletes += id.getInt(0)
+
+ override def update(meta: InternalRow, id: InternalRow, row: InternalRow):
Unit = {
+ buffer.deletes += id.getInt(0)
+ write(row)
+ }
+
+ override def insert(row: InternalRow): Unit = write(row)
+
+ override def write(row: InternalRow): Unit = super[BufferWriter].write(row)
+
+ override def commit(): WriterCommitMessage = buffer
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 1e2a65d9ec2..29f0da1158f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -312,6 +312,11 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
// use the original relation to refresh the cache
ReplaceDataExec(planLater(query), refreshCache(r), write) :: Nil
+ case WriteDelta(_: DataSourceV2Relation, _, query, r:
DataSourceV2Relation, projections,
+ Some(write)) =>
+ // use the original relation to refresh the cache
+ WriteDeltaExec(planLater(query), refreshCache(r), projections, write) ::
Nil
+
case WriteToContinuousDataSource(writer, query, customMetrics) =>
WriteToContinuousDataSourceExec(writer, planLater(query), customMetrics)
:: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
index 0302928229f..abb9d728c78 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/OptimizeMetadataOnlyDeleteFromTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper,
SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable,
DeleteFromTableWithFilters, LogicalPlan, ReplaceData, RowLevelWrite}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable,
DeleteFromTableWithFilters, LogicalPlan, ReplaceData, RowLevelWrite, WriteDelta}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2,
TruncatableTable}
import org.apache.spark.sql.connector.expressions.filter.Predicate
@@ -77,6 +77,10 @@ object OptimizeMetadataOnlyDeleteFromTable extends
Rule[LogicalPlan] with Predic
val command = rd.operation.command
Some(rd, command, cond, originalTable)
+ case wd @ WriteDelta(_, cond, _, originalTable, _, _) =>
+ val command = wd.operation.command
+ Some(wd, command, cond, originalTable)
+
case _ =>
None
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
index 5e0ade2474a..8f7fed561c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2Writes.scala
@@ -17,15 +17,16 @@
package org.apache.spark.sql.execution.datasources.v2
-import java.util.UUID
+import java.util.{Optional, UUID}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic, Project, ReplaceData}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan,
OverwriteByExpression, OverwritePartitionsDynamic, Project, ReplaceData,
WriteDelta}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl,
SupportsDynamicOverwrite, SupportsOverwriteV2, SupportsTruncate, Write,
WriteBuilder}
+import org.apache.spark.sql.connector.write.{DeltaWriteBuilder,
LogicalWriteInfoImpl, SupportsDynamicOverwrite, SupportsOverwriteV2,
SupportsTruncate, Write, WriteBuilder}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite,
WriteToMicroBatchDataSource}
import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
@@ -102,6 +103,11 @@ object V2Writes extends Rule[LogicalPlan] with
PredicateHelper {
// project away any metadata columns that could be used for distribution
and ordering
rd.copy(write = Some(write), query = Project(rd.dataInput, newQuery))
+ case wd @ WriteDelta(r: DataSourceV2Relation, _, query, _, projections,
None) =>
+ val deltaWriteBuilder = newDeltaWriteBuilder(r.table, Map.empty,
projections)
+ val deltaWrite = deltaWriteBuilder.build()
+ val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite,
query, r.funCatalog)
+ wd.copy(write = Some(deltaWrite), query = newQuery)
}
private def buildWriteForMicroBatch(
@@ -137,4 +143,26 @@ object V2Writes extends Rule[LogicalPlan] with
PredicateHelper {
val info = LogicalWriteInfoImpl(queryId, rowSchema, writeOptions.asOptions)
table.asWritable.newWriteBuilder(info)
}
+
+ private def newDeltaWriteBuilder(
+ table: Table,
+ writeOptions: Map[String, String],
+ projections: WriteDeltaProjections,
+ queryId: String = UUID.randomUUID().toString): DeltaWriteBuilder = {
+
+ val rowSchema =
projections.rowProjection.map(_.schema).getOrElse(StructType(Nil))
+ val rowIdSchema = projections.rowIdProjection.schema
+ val metadataSchema = projections.metadataProjection.map(_.schema)
+
+ val info = LogicalWriteInfoImpl(
+ queryId,
+ rowSchema,
+ writeOptions.asOptions,
+ Optional.of(rowIdSchema),
+ Optional.ofNullable(metadataSchema.orNull))
+
+ val writeBuilder = table.asWritable.newWriteBuilder(info)
+ assert(writeBuilder.isInstanceOf[DeltaWriteBuilder], s"$writeBuilder must
be DeltaWriteBuilder")
+ writeBuilder.asInstanceOf[DeltaWriteBuilder]
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index ee820fd597d..490b7082223 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -21,18 +21,19 @@ import java.util.UUID
import scala.collection.JavaConverters._
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, TableSpec,
UnaryNode}
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils,
WriteDeltaProjections}
+import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION,
INSERT_OPERATION, UPDATE_OPERATION}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
StagedTable, StagingTableCatalog, SupportsWrite, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
-import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory,
LogicalWriteInfoImpl, PhysicalWriteInfoImpl, V1Write, Write,
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, DeltaWrite, DeltaWriter, LogicalWriteInfoImpl,
PhysicalWriteInfoImpl, V1Write, Write, WriterCommitMessage}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric,
SQLMetrics}
@@ -298,6 +299,30 @@ case class ReplaceDataExec(
}
}
+/**
+ * Physical plan node to write a delta of rows to an existing table.
+ */
+case class WriteDeltaExec(
+ query: SparkPlan,
+ refreshCache: () => Unit,
+ projections: WriteDeltaProjections,
+ write: DeltaWrite) extends V2ExistingTableWriteExec {
+
+ override lazy val stringArgs: Iterator[Any] = Iterator(query, write)
+
+ override lazy val writingTask: WritingSparkTask[_] = {
+ if (projections.metadataProjection.isDefined) {
+ DeltaWithMetadataWritingSparkTask(projections)
+ } else {
+ DeltaWritingSparkTask(projections)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
WriteDeltaExec = {
+ copy(query = newChild)
+ }
+}
+
case class WriteToDataSourceV2Exec(
batchWrite: BatchWrite,
refreshCache: () => Unit,
@@ -339,6 +364,7 @@ trait V2ExistingTableWriteExec extends V2TableWriteExec {
*/
trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
def query: SparkPlan
+ def writingTask: WritingSparkTask[_] = DataWritingSparkTask
var commitProgress: Option[StreamWriterCommitProgress] = None
@@ -360,6 +386,8 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode {
tempRdd
}
}
+ // introduce a local var to avoid serializing the whole class
+ val task = writingTask
val writerFactory = batchWrite.createBatchWriterFactory(
PhysicalWriteInfoImpl(rdd.getNumPartitions))
val useCommitCoordinator = batchWrite.useCommitCoordinator
@@ -376,8 +404,7 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode {
sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[InternalRow]) =>
- DataWritingSparkTask.run(writerFactory, context, iter,
useCommitCoordinator,
- writeMetrics),
+ task.run(writerFactory, context, iter, useCommitCoordinator,
writeMetrics),
rdd.partitions.indices,
(index, result: DataWritingSparkTaskResult) => {
val commitMessage = result.writerCommitMessage
@@ -410,7 +437,10 @@ trait V2TableWriteExec extends V2CommandExec with
UnaryExecNode {
}
}
-object DataWritingSparkTask extends Logging {
+trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with
Serializable {
+
+ protected def write(writer: W, row: InternalRow): Unit
+
def run(
writerFactory: DataWriterFactory,
context: TaskContext,
@@ -422,7 +452,7 @@ object DataWritingSparkTask extends Logging {
val partId = context.partitionId()
val taskId = context.taskAttemptId()
val attemptId = context.attemptNumber()
- val dataWriter = writerFactory.createWriter(partId, taskId)
+ val dataWriter = writerFactory.createWriter(partId, taskId).asInstanceOf[W]
var count = 0L
// write the data and commit this writer.
@@ -434,7 +464,7 @@ object DataWritingSparkTask extends Logging {
// Count is here.
count += 1
- dataWriter.write(iter.next())
+ write(dataWriter, iter.next())
}
CustomMetrics.updateMetrics(dataWriter.currentMetricsValues,
customMetrics)
@@ -477,6 +507,73 @@ object DataWritingSparkTask extends Logging {
}
}
+object DataWritingSparkTask extends WritingSparkTask[DataWriter[InternalRow]] {
+ override protected def write(writer: DataWriter[InternalRow], row:
InternalRow): Unit = {
+ writer.write(row)
+ }
+}
+
+case class DeltaWritingSparkTask(
+ projections: WriteDeltaProjections) extends
WritingSparkTask[DeltaWriter[InternalRow]] {
+
+ private lazy val rowProjection = projections.rowProjection.orNull
+ private lazy val rowIdProjection = projections.rowIdProjection
+
+ override protected def write(writer: DeltaWriter[InternalRow], row:
InternalRow): Unit = {
+ val operation = row.getInt(0)
+
+ operation match {
+ case DELETE_OPERATION =>
+ rowIdProjection.project(row)
+ writer.delete(null, rowIdProjection)
+
+ case UPDATE_OPERATION =>
+ rowProjection.project(row)
+ rowIdProjection.project(row)
+ writer.update(null, rowIdProjection, rowProjection)
+
+ case INSERT_OPERATION =>
+ rowProjection.project(row)
+ writer.insert(rowProjection)
+
+ case other =>
+ throw new SparkException(s"Unexpected operation ID: $other")
+ }
+ }
+}
+
+case class DeltaWithMetadataWritingSparkTask(
+ projections: WriteDeltaProjections) extends
WritingSparkTask[DeltaWriter[InternalRow]] {
+
+ private lazy val rowProjection = projections.rowProjection.orNull
+ private lazy val rowIdProjection = projections.rowIdProjection
+ private lazy val metadataProjection = projections.metadataProjection.orNull
+
+ override protected def write(writer: DeltaWriter[InternalRow], row:
InternalRow): Unit = {
+ val operation = row.getInt(0)
+
+ operation match {
+ case DELETE_OPERATION =>
+ rowIdProjection.project(row)
+ metadataProjection.project(row)
+ writer.delete(metadataProjection, rowIdProjection)
+
+ case UPDATE_OPERATION =>
+ rowProjection.project(row)
+ rowIdProjection.project(row)
+ metadataProjection.project(row)
+ writer.update(metadataProjection, rowIdProjection, rowProjection)
+
+ case INSERT_OPERATION =>
+ rowProjection.project(row)
+ writer.insert(rowProjection)
+
+ case other =>
+ throw new SparkException(s"Unexpected operation ID: $other")
+ }
+ }
+}
+
private[v2] trait TableWriteExecHelper extends V2TableWriteExec with
SupportsV1Write {
protected def writeToTable(
catalog: TableCatalog,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index d9a12b47ec2..14b951e66db 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryRowLevelOpera
import org.apache.spark.sql.connector.expressions.LogicalExpressions._
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec,
ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec,
ReplaceDataExec, WriteDeltaExec}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.QueryExecutionListener
@@ -50,6 +51,10 @@ abstract class DeleteFromTableSuiteBase
protected val ident: Identifier = Identifier.of(namespace, "test_table")
protected val tableNameAsString: String = "cat." + ident.toString
+ protected def extraTableProps: java.util.Map[String, String] = {
+ Collections.emptyMap[String, String]
+ }
+
protected def catalog: InMemoryRowLevelOperationTableCatalog = {
val catalog = spark.sessionState.catalogManager.catalog("cat")
catalog.asTableCatalog.asInstanceOf[InMemoryRowLevelOperationTableCatalog]
@@ -70,7 +75,7 @@ abstract class DeleteFromTableSuiteBase
}
test("delete from empty tables") {
- createTable("id INT, dep STRING")
+ createTable("pk INT NOT NULL, id INT, dep STRING")
sql(s"DELETE FROM $tableNameAsString WHERE id <= 1")
@@ -78,76 +83,76 @@ abstract class DeleteFromTableSuiteBase
}
test("delete with basic filters") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "software" }
- |{ "id": 3, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "software" }
+ |{ "pk": 3, "id": 3, "dep": "hr" }
|""".stripMargin)
sql(s"DELETE FROM $tableNameAsString WHERE id <= 1")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "software") :: Row(3, "hr") :: Nil)
+ Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)
}
test("delete with aliases") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "software" }
- |{ "id": 3, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "software" }
+ |{ "pk": 3, "id": 3, "dep": "hr" }
|""".stripMargin)
sql(s"DELETE FROM $tableNameAsString AS t WHERE t.id <= 1 OR t.dep = 'hr'")
- checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, "software")
:: Nil)
+ checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2,
"software") :: Nil)
}
test("delete with IN predicates") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "software" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "software" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
sql(s"DELETE FROM $tableNameAsString WHERE id IN (1, null)")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "software") :: Row(null, "hr") :: Nil)
+ Row(2, 2, "software") :: Row(3, null, "hr") :: Nil)
}
test("delete with NOT IN predicates") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "software" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "software" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
sql(s"DELETE FROM $tableNameAsString WHERE id NOT IN (null, 1)")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(1, "hr") :: Row(2, "software") :: Row(null, "hr") :: Nil)
+ Row(1, 1, "hr") :: Row(2, 2, "software") :: Row(3, null, "hr") :: Nil)
sql(s"DELETE FROM $tableNameAsString WHERE id NOT IN (1, 10)")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(1, "hr") :: Row(null, "hr") :: Nil)
+ Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil)
}
test("delete with conditions on nested columns") {
- createAndInitTable("id INT, complex STRUCT<c1:INT,c2:STRING>, dep STRING",
- """{ "id": 1, "complex": { "c1": 3, "c2": "v1" }, "dep": "hr" }
- |{ "id": 2, "complex": { "c1": 2, "c2": "v2" }, "dep": "software" }
+ createAndInitTable("pk INT NOT NULL, id INT, complex
STRUCT<c1:INT,c2:STRING>, dep STRING",
+ """{ "pk": 1, "id": 1, "complex": { "c1": 3, "c2": "v1" }, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "complex": { "c1": 2, "c2": "v2" }, "dep":
"software" }
|""".stripMargin)
sql(s"DELETE FROM $tableNameAsString WHERE complex.c1 = id + 2")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, Row(2, "v2"), "software") :: Nil)
+ Row(2, 2, Row(2, "v2"), "software") :: Nil)
sql(s"DELETE FROM $tableNameAsString t WHERE t.complex.c1 = id")
@@ -156,10 +161,10 @@ abstract class DeleteFromTableSuiteBase
test("delete with IN subqueries") {
withTempView("deleted_id", "deleted_dep") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hardware" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
val deletedIdDF = Seq(Some(0), Some(1), None).toDF()
@@ -178,16 +183,16 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Row(null, "hr") :: Nil)
+ Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
- append("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": -1, "dep": "hr" }
+ append("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 4, "id": 1, "dep": "hr" }
+ |{ "pk": 5, "id": -1, "dep": "hr" }
|""".stripMargin)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(-1, "hr") :: Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr")
:: Nil)
+ Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(2, 2, "hardware") :: Row(3,
null, "hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString
@@ -199,16 +204,16 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(-1, "hr") :: Row(1, "hr") :: Nil)
+ Row(5, -1, "hr") :: Row(4, 1, "hr") :: Nil)
- append("id INT, dep STRING",
- """{ "id": null, "dep": "hr" }
- |{ "id": 2, "dep": "hr" }
+ append("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 6, "id": null, "dep": "hr" }
+ |{ "pk": 7, "id": 2, "dep": "hr" }
|""".stripMargin)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(-1, "hr") :: Row(1, "hr") :: Row(2, "hr") :: Row(null, "hr") ::
Nil)
+ Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(7, 2, "hr") :: Row(6, null,
"hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString
@@ -220,16 +225,16 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(-1, "hr") :: Row(1, "hr") :: Row(null, "hr") :: Nil)
+ Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(6, null, "hr") :: Nil)
}
}
test("delete with multi-column IN subqueries") {
withTempView("deleted_employee") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hardware" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
val deletedEmployeeDF = Seq((None, "hr"), (Some(1), "hr")).toDF()
@@ -243,16 +248,16 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Row(null, "hr") :: Nil)
+ Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
}
}
test("delete with NOT IN subqueries") {
withTempView("deleted_id", "deleted_dep") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hardware" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
val deletedIdDF = Seq(Some(-1), Some(-2), None).toDF()
@@ -269,7 +274,7 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") :: Nil)
+ Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString
@@ -277,17 +282,17 @@ abstract class DeleteFromTableSuiteBase
| id NOT IN (SELECT * FROM deleted_id WHERE value IS NOT NULL)
|""".stripMargin)
- checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(null, "hr") ::
Nil)
+ checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(3, null, "hr")
:: Nil)
- append("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ append("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 4, "id": 1, "dep": "hr" }
+ |{ "pk": 5, "id": 2, "dep": "hardware" }
+ |{ "pk": 6, "id": null, "dep": "hr" }
|""".stripMargin)
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") :: Row(null,
"hr") :: Nil)
+ Row(4, 1, "hr") :: Row(5, 2, "hardware") :: Row(3, null, "hr") ::
Row(6, null, "hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString
@@ -297,7 +302,7 @@ abstract class DeleteFromTableSuiteBase
| dep IN ('software', 'hr')
|""".stripMargin)
- checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, "hardware")
:: Nil)
+ checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(5, 2,
"hardware") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString
@@ -307,7 +312,7 @@ abstract class DeleteFromTableSuiteBase
| EXISTS (SELECT 1 FROM FROM deleted_dep WHERE dep =
deleted_dep.value)
|""".stripMargin)
- checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, "hardware")
:: Nil)
+ checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(5, 2,
"hardware") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString t
@@ -323,10 +328,10 @@ abstract class DeleteFromTableSuiteBase
test("delete with EXISTS subquery") {
withTempView("deleted_id", "deleted_dep") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hardware" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
val deletedIdDF = Seq(Some(-1), Some(-2), None).toDF()
@@ -343,7 +348,7 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(1, "hr") :: Row(2, "hardware") :: Row(null, "hr") :: Nil)
+ Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString t
@@ -353,7 +358,7 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Row(null, "hr") :: Nil)
+ Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString t
@@ -363,7 +368,7 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Nil)
+ Row(2, 2, "hardware") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString t
@@ -375,16 +380,16 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Nil)
+ Row(2, 2, "hardware") :: Nil)
}
}
test("delete with NOT EXISTS subquery") {
withTempView("deleted_id", "deleted_dep") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hardware" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
val deletedIdDF = Seq(Some(-1), Some(-2), None).toDF()
@@ -403,7 +408,7 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(1, "hr") :: Row(null, "hr") :: Nil)
+ Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil)
sql(
s"""DELETE FROM $tableNameAsString t
@@ -411,7 +416,7 @@ abstract class DeleteFromTableSuiteBase
| NOT EXISTS (SELECT 1 FROM deleted_id d WHERE t.id = d.value + 2)
|""".stripMargin)
- checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(1, "hr") ::
Nil)
+ checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") ::
Nil)
sql(
s"""DELETE FROM $tableNameAsString t
@@ -427,10 +432,10 @@ abstract class DeleteFromTableSuiteBase
test("delete with a scalar subquery") {
withTempView("deleted_id") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": null, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "hardware" }
+ |{ "pk": 3, "id": null, "dep": "hr" }
|""".stripMargin)
val deletedIdDF = Seq(Some(1), Some(100), None).toDF()
@@ -444,18 +449,18 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Row(null, "hr") :: Nil)
+ Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil)
}
}
test("delete refreshes relation cache") {
withTempView("temp") {
withCache("temp") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 1, "dep": "hardware" }
- |{ "id": 2, "dep": "hardware" }
- |{ "id": 3, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 1, "dep": "hardware" }
+ |{ "pk": 3, "id": 2, "dep": "hardware" }
+ |{ "pk": 4, "id": 3, "dep": "hr" }
|""".stripMargin)
// define a view on top of the table
@@ -468,7 +473,7 @@ abstract class DeleteFromTableSuiteBase
// verify the view returns expected results
checkAnswer(
sql("SELECT * FROM temp"),
- Row(1, "hr") :: Row(1, "hardware") :: Nil)
+ Row(1, 1, "hr") :: Row(2, 1, "hardware") :: Nil)
// delete some records from the table
sql(s"DELETE FROM $tableNameAsString WHERE id <= 1")
@@ -476,7 +481,7 @@ abstract class DeleteFromTableSuiteBase
// verify the delete was successful
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, "hardware") :: Row(3, "hr") :: Nil)
+ Row(3, 2, "hardware") :: Row(4, 3, "hr") :: Nil)
// verify the view reflects the changes in the table
checkAnswer(sql("SELECT * FROM temp"), Nil)
@@ -485,10 +490,10 @@ abstract class DeleteFromTableSuiteBase
}
test("delete with nondeterministic conditions") {
- createAndInitTable("id INT, dep STRING",
- """{ "id": 1, "dep": "hr" }
- |{ "id": 2, "dep": "software" }
- |{ "id": 3, "dep": "hr" }
+ createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+ """{ "pk": 1, "id": 1, "dep": "hr" }
+ |{ "pk": 2, "id": 2, "dep": "software" }
+ |{ "pk": 3, "id": 3, "dep": "hr" }
|""".stripMargin)
val e = intercept[AnalysisException] {
@@ -498,10 +503,10 @@ abstract class DeleteFromTableSuiteBase
}
test("delete without condition executed as delete with filters") {
- createAndInitTable("id INT, dep INT",
- """{ "id": 1, "dep": 100 }
- |{ "id": 2, "dep": 200 }
- |{ "id": 3, "dep": 100 }
+ createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+ """{ "pk": 1, "id": 1, "dep": 100 }
+ |{ "pk": 2, "id": 2, "dep": 200 }
+ |{ "pk": 3, "id": 3, "dep": 100 }
|""".stripMargin)
executeDeleteWithFilters(s"DELETE FROM $tableNameAsString")
@@ -510,39 +515,39 @@ abstract class DeleteFromTableSuiteBase
}
test("delete with supported predicates gets converted into delete with
filters") {
- createAndInitTable("id INT, dep INT",
- """{ "id": 1, "dep": 100 }
- |{ "id": 2, "dep": 200 }
- |{ "id": 3, "dep": 100 }
+ createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+ """{ "pk": 1, "id": 1, "dep": 100 }
+ |{ "pk": 2, "id": 2, "dep": 200 }
+ |{ "pk": 3, "id": 3, "dep": 100 }
|""".stripMargin)
executeDeleteWithFilters(s"DELETE FROM $tableNameAsString WHERE dep = 100")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, 200) :: Nil)
+ Row(2, 2, 200) :: Nil)
}
test("delete with unsupported predicates cannot be converted into delete
with filters") {
- createAndInitTable("id INT, dep INT",
- """{ "id": 1, "dep": 100 }
- |{ "id": 2, "dep": 200 }
- |{ "id": 3, "dep": 100 }
+ createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+ """{ "pk": 1, "id": 1, "dep": 100 }
+ |{ "pk": 2, "id": 2, "dep": 200 }
+ |{ "pk": 3, "id": 3, "dep": 100 }
|""".stripMargin)
executeDeleteWithRewrite(s"DELETE FROM $tableNameAsString WHERE dep = 100
OR dep < 200")
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, 200) :: Nil)
+ Row(2, 2, 200) :: Nil)
}
test("delete with subquery cannot be converted into delete with filters") {
withTempView("deleted_id") {
- createAndInitTable("id INT, dep INT",
- """{ "id": 1, "dep": 100 }
- |{ "id": 2, "dep": 200 }
- |{ "id": 3, "dep": 100 }
+ createAndInitTable("pk INT NOT NULL, id INT, dep INT",
+ """{ "pk": 1, "id": 1, "dep": 100 }
+ |{ "pk": 2, "id": 2, "dep": 200 }
+ |{ "pk": 3, "id": 3, "dep": 100 }
|""".stripMargin)
val deletedIdDF = Seq(Some(1), Some(100), None).toDF()
@@ -553,14 +558,13 @@ abstract class DeleteFromTableSuiteBase
checkAnswer(
sql(s"SELECT * FROM $tableNameAsString"),
- Row(2, 200) :: Row(3, 100) :: Nil)
+ Row(2, 2, 200) :: Row(3, 3, 100) :: Nil)
}
}
protected def createTable(schemaString: String): Unit = {
val schema = StructType.fromDDL(schemaString)
- val tableProps = Collections.emptyMap[String, String]
- catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))),
tableProps)
+ catalog.createTable(ident, schema, Array(identity(reference(Seq("dep")))),
extraTableProps)
}
protected def createAndInitTable(schemaString: String, jsonData: String):
Unit = {
@@ -569,8 +573,10 @@ abstract class DeleteFromTableSuiteBase
}
private def append(schemaString: String, jsonData: String): Unit = {
- val df = toDF(jsonData, schemaString)
- df.coalesce(1).writeTo(tableNameAsString).append()
+
withSQLConf(SQLConf.LEGACY_RESPECT_NULLABILITY_IN_TEXT_DATASET_CONVERSION.key
-> "true") {
+ val df = toDF(jsonData, schemaString)
+ df.coalesce(1).writeTo(tableNameAsString).append()
+ }
}
private def toDF(jsonData: String, schemaString: String = null): DataFrame =
{
@@ -604,6 +610,8 @@ abstract class DeleteFromTableSuiteBase
executedPlan match {
case _: ReplaceDataExec =>
// OK
+ case _: WriteDeltaExec =>
+ // OK
case other =>
fail("unexpected executed plan: " + other)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
new file mode 100644
index 00000000000..fd7a04ea926
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector
+
+import org.apache.spark.sql.AnalysisException
+
+class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {
+
+ override protected lazy val extraTableProps: java.util.Map[String, String] =
{
+ val props = new java.util.HashMap[String, String]()
+ props.put("supports-deltas", "true")
+ props
+ }
+
+ test("nullable row ID attrs") {
+ createAndInitTable("pk INT, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 300, "dep": 'hr' }
+ |{ "pk": 2, "salary": 150, "dep": 'software' }
+ |{ "pk": 3, "salary": 120, "dep": 'hr' }
+ |""".stripMargin)
+
+ val exception = intercept[AnalysisException] {
+ sql(s"DELETE FROM $tableNameAsString WHERE pk = 1")
+ }
+ assert(exception.message.contains("Row ID attributes cannot be nullable"))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]