amogh-jahagirdar commented on code in PR #12736:
URL: https://github.com/apache/iceberg/pull/12736#discussion_r2056312051


##########
core/src/main/java/org/apache/iceberg/TableUtil.java:
##########
@@ -60,4 +61,28 @@ public static String metadataFileLocation(Table table) {
               "%s does not have a metadata file location", 
table.getClass().getSimpleName()));
     }
   }
+
+  public static boolean supportsRowLineage(Table table) {
+    Preconditions.checkArgument(null != table, "Invalid table: null");
+    if (table instanceof BaseMetadataTable) {
+      return false;
+    }
+
+    return formatVersion(table) >= 
TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE;
+  }
+
+  public static Schema schemaWithRowLineage(Table table) {

Review Comment:
   I agree TableUtil probably isn't the rigt place to keep this method  
MetadataColumns.withRowLineageSchema(Table table) is better. 



##########
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTableForRowLineage.scala:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.UpdateTable
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+object RewriteUpdateTableForRowLineage extends RewriteOperationForRowLineage {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan resolveOperators {
+      case updateTable@UpdateTable(_, _, _) if shouldUpdatePlan(updateTable) =>
+        updatePlanWithRowLineage(updateTable)
+    }
+  }
+
+  // The plan should only be updated if row lineage metadata attributes are 
present
+  // in the target table AND lineage attributes are not already
+  // on the output of operation which indicates the rule already ran
+  private def shouldUpdatePlan(updateTable: UpdateTable): Boolean = {
+    val rowLineageAttrs = findRowLineageAttributes(updateTable.metadataOutput)
+    val allLineageAttrsPresent = rowLineageAttrs.nonEmpty && 
rowLineageAttrs.forall(updateTable.metadataOutput.contains)
+    val rowIdAbsentFromOutput = !updateTable.output.exists(_.name == 
ROW_ID_ATTRIBUTE_NAME)
+
+    allLineageAttrsPresent && rowIdAbsentFromOutput
+  }
+
+
+  private def updatePlanWithRowLineage(updateTable: UpdateTable): LogicalPlan 
= {
+    EliminateSubqueryAliases(updateTable.table) match {
+      case r @ DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) 
=>
+        val rowLineageAttributes = 
findRowLineageAttributes(updateTable.metadataOutput)
+        val lastUpdatedSequence = rowLineageAttributes.filter(
+          attr => attr.name == 
LAST_UPDATED_SEQUENCE_NUMBER_ATTRIBUTE_NAME).head
+
+        val lineageAssignments = updateTable.assignments ++
+          Seq(Assignment(lastUpdatedSequence, Literal(null)))

Review Comment:
   I originally only explicitly assigned the last updated sequence number 
because the row ID would just be carried over as part of the projection, but 
it's probably safer to just be very explicit here (just like the merge rule is).



##########
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.TableUtil.schemaWithRowLineage;
+import static org.apache.iceberg.spark.Spark3Util.loadIcebergTable;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.encryption.EncryptionUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PartitionMap;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+
+public abstract class TestRowLevelOperationsWithLineage extends 
SparkRowLevelOperationsTestBase {
+  static final Function<Record, StructLike> UNPARTITIONED_GENERATOR = record 
-> null;
+  // Use the first field in the row as identity partition
+  static final Function<Record, StructLike> IDENTITY_PARTITIONED_GENERATOR =
+      record -> TestHelpers.Row.of(record.get(0, Integer.class));
+
+  @BeforeAll
+  public static void setupSparkConf() {
+    spark.conf().set("spark.sql.shuffle.partitions", "4");
+  }
+
+  @AfterEach
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS source");
+  }
+
+  @TestTemplate
+  public void testMergeIntoWithBothMatchedAndNonMatched()
+      throws NoSuchTableException, ParseException, IOException {
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+    createAndInitTable("data INT", null);
+    createBranchIfNeeded();
+    Table table = loadIcebergTable(spark, tableName);
+    PartitionMap<List<Record>> recordsByPartition =
+        createRecordsWithRowLineage(

Review Comment:
   >numRecords is always 5 and there are only 2 partition functions. I think it 
may be better to just have constants up at the top, rather than generating the 
records. I still have some trouble reading these tests because I have to 
remember that the IDs are assigned 0-4.
   
   For this, would it help if we pass in an explicit range of values (since 
it's always 5, this range is constant) but I think the fact taht it's a range 
may help a reader be able to explicitly see that the ids are always 0-4.....
   
   >Also, a constant sequence number would be helpful to make this match 
reality a bit more closely.
   
   Sounds good, I'll update to make the seq number constant. I originally made 
them different so that it more easily distinguishes the values but arguably 
that doesn't matter for these tests. What we care about is if they're nulled 
out or preserved (the actual value is inconsequential)
   
   



##########
spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTableForRowLineage.scala:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+object RewriteMergeIntoTableForRowLineage extends 
RewriteOperationForRowLineage {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperators {
+      case m @ MergeIntoTable(_, _, _, matchedActions, _, 
notMatchedBySourceActions)
+        if m.resolved && m.rewritable && m.aligned &&
+          (matchedActions.nonEmpty || notMatchedBySourceActions.nonEmpty) &&
+          shouldUpdatePlan(m) =>
+        updateMergeIntoForRowLineage(m)
+    }
+  }
+
+  protected def updateMergeIntoForRowLineage(mergeIntoTable: MergeIntoTable): 
LogicalPlan = {
+    EliminateSubqueryAliases(mergeIntoTable.targetTable) match {
+      case r: DataSourceV2Relation =>
+        val matchedActions = mergeIntoTable.matchedActions
+        val notMatchedBySourceActions = 
mergeIntoTable.notMatchedBySourceActions
+        val rowLineageAttributes = findRowLineageAttributes(r.metadataOutput)
+        val rowId = rowLineageAttributes.filter(
+          attr => attr.name == ROW_ID_ATTRIBUTE_NAME).head
+        val lastUpdatedSequence = rowLineageAttributes.filter(
+          attr => attr.name == 
LAST_UPDATED_SEQUENCE_NUMBER_ATTRIBUTE_NAME).head
+
+        val matchedAssignmentsForLineage = matchedActions.map {
+          case UpdateAction(cond, actions) =>
+            UpdateAction(cond, actions ++ Seq(Assignment(rowId, rowId),
+              Assignment(lastUpdatedSequence, Literal(null))))
+
+          case p => p
+        }
+
+        val notMatchedBySourceActionsForLineage = 
notMatchedBySourceActions.map {
+          case UpdateAction(cond, actions) =>
+            UpdateAction(cond, actions ++ Seq(Assignment(rowId, rowId),
+              Assignment(lastUpdatedSequence, Literal(null))))
+
+          case p => p
+        }
+
+        // Treat row lineage columns as data columns by removing the metadata 
attribute
+        // This works around the logic in
+        // ExposesMetadataColumns, used later in metadata attribute resolution,
+        // which prevents surfacing other metadata columns when a single 
metadata column is in the output
+        val rowLineageAsDataColumns = 
rowLineageAttributes.map(removeMetadataColumnAttribute)
+
+        val tableWithLineage = r.copy(output =
+          r.output ++ rowLineageAsDataColumns)
+
+        mergeIntoTable.copy(
+          targetTable = tableWithLineage,
+          matchedActions = matchedAssignmentsForLineage,
+          notMatchedBySourceActions = notMatchedBySourceActionsForLineage)
+    }
+  }
+
+  // The plan should only be updated if row lineage metadata attributes are 
present
+  // in the target table AND lineage attributes are not already
+  // on the output of operation which indicates the rule already ran
+  private def shouldUpdatePlan(mergeIntoTable: MergeIntoTable): Boolean = {
+    val metadataOutput = mergeIntoTable.targetTable.metadataOutput
+    val rowLineageAttrs = findRowLineageAttributes(metadataOutput)
+    val allLineageAttrsPresent = rowLineageAttrs.nonEmpty && 
rowLineageAttrs.forall(metadataOutput.contains)

Review Comment:
   Hm if `rowLineageAttrs` is empty, the `forall` returns true which we don't 
want since that means the lineage attributes are not present (in tables not 
supporting row lineage). Maybe with the suggestion above to always include 
_row_id this would get reworked anyways



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to