amogh-jahagirdar commented on code in PR #12736: URL: https://github.com/apache/iceberg/pull/12736#discussion_r2056312051
########## core/src/main/java/org/apache/iceberg/TableUtil.java: ########## @@ -60,4 +61,28 @@ public static String metadataFileLocation(Table table) { "%s does not have a metadata file location", table.getClass().getSimpleName())); } } + + public static boolean supportsRowLineage(Table table) { + Preconditions.checkArgument(null != table, "Invalid table: null"); + if (table instanceof BaseMetadataTable) { + return false; + } + + return formatVersion(table) >= TableMetadata.MIN_FORMAT_VERSION_ROW_LINEAGE; + } + + public static Schema schemaWithRowLineage(Table table) { Review Comment: I agree TableUtil probably isn't the rigt place to keep this method MetadataColumns.withRowLineageSchema(Table table) is better. ########## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTableForRowLineage.scala: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.UpdateTable +import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +object RewriteUpdateTableForRowLineage extends RewriteOperationForRowLineage { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan resolveOperators { + case updateTable@UpdateTable(_, _, _) if shouldUpdatePlan(updateTable) => + updatePlanWithRowLineage(updateTable) + } + } + + // The plan should only be updated if row lineage metadata attributes are present + // in the target table AND lineage attributes are not already + // on the output of operation which indicates the rule already ran + private def shouldUpdatePlan(updateTable: UpdateTable): Boolean = { + val rowLineageAttrs = findRowLineageAttributes(updateTable.metadataOutput) + val allLineageAttrsPresent = rowLineageAttrs.nonEmpty && rowLineageAttrs.forall(updateTable.metadataOutput.contains) + val rowIdAbsentFromOutput = !updateTable.output.exists(_.name == ROW_ID_ATTRIBUTE_NAME) + + allLineageAttrsPresent && rowIdAbsentFromOutput + } + + + private def updatePlanWithRowLineage(updateTable: UpdateTable): LogicalPlan = { + EliminateSubqueryAliases(updateTable.table) match { + case r @ DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => + val rowLineageAttributes = findRowLineageAttributes(updateTable.metadataOutput) + val lastUpdatedSequence = rowLineageAttributes.filter( + attr => attr.name == LAST_UPDATED_SEQUENCE_NUMBER_ATTRIBUTE_NAME).head + + val lineageAssignments = updateTable.assignments ++ + Seq(Assignment(lastUpdatedSequence, Literal(null))) Review Comment: I originally only explicitly assigned the last updated sequence number because the row ID would just be carried over as part of the projection, but it's probably safer to just be very explicit here (just like the merge rule is). ########## spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java: ########## @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.apache.iceberg.TableUtil.schemaWithRowLineage; +import static org.apache.iceberg.spark.Spark3Util.loadIcebergTable; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotRef; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PartitionMap; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; + +public abstract class TestRowLevelOperationsWithLineage extends SparkRowLevelOperationsTestBase { + static final Function<Record, StructLike> UNPARTITIONED_GENERATOR = record -> null; + // Use the first field in the row as identity partition + static final Function<Record, StructLike> IDENTITY_PARTITIONED_GENERATOR = + record -> TestHelpers.Row.of(record.get(0, Integer.class)); + + @BeforeAll + public static void setupSparkConf() { + spark.conf().set("spark.sql.shuffle.partitions", "4"); + } + + @AfterEach + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + sql("DROP TABLE IF EXISTS source"); + } + + @TestTemplate + public void testMergeIntoWithBothMatchedAndNonMatched() + throws NoSuchTableException, ParseException, IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + + createAndInitTable("data INT", null); + createBranchIfNeeded(); + Table table = loadIcebergTable(spark, tableName); + PartitionMap<List<Record>> recordsByPartition = + createRecordsWithRowLineage( Review Comment: >numRecords is always 5 and there are only 2 partition functions. I think it may be better to just have constants up at the top, rather than generating the records. I still have some trouble reading these tests because I have to remember that the IDs are assigned 0-4. For this, would it help if we pass in an explicit range of values (since it's always 5, this range is constant) but I think the fact taht it's a range may help a reader be able to explicitly see that the ids are always 0-4..... >Also, a constant sequence number would be helpful to make this match reality a bit more closely. Sounds good, I'll update to make the seq number constant. I originally made them different so that it more easily distinguishes the values but arguably that doesn't matter for these tests. What we care about is if they're nulled out or preserved (the actual value is inconsequential) ########## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTableForRowLineage.scala: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable +import org.apache.spark.sql.catalyst.plans.logical.UpdateAction +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +object RewriteMergeIntoTableForRowLineage extends RewriteOperationForRowLineage { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperators { + case m @ MergeIntoTable(_, _, _, matchedActions, _, notMatchedBySourceActions) + if m.resolved && m.rewritable && m.aligned && + (matchedActions.nonEmpty || notMatchedBySourceActions.nonEmpty) && + shouldUpdatePlan(m) => + updateMergeIntoForRowLineage(m) + } + } + + protected def updateMergeIntoForRowLineage(mergeIntoTable: MergeIntoTable): LogicalPlan = { + EliminateSubqueryAliases(mergeIntoTable.targetTable) match { + case r: DataSourceV2Relation => + val matchedActions = mergeIntoTable.matchedActions + val notMatchedBySourceActions = mergeIntoTable.notMatchedBySourceActions + val rowLineageAttributes = findRowLineageAttributes(r.metadataOutput) + val rowId = rowLineageAttributes.filter( + attr => attr.name == ROW_ID_ATTRIBUTE_NAME).head + val lastUpdatedSequence = rowLineageAttributes.filter( + attr => attr.name == LAST_UPDATED_SEQUENCE_NUMBER_ATTRIBUTE_NAME).head + + val matchedAssignmentsForLineage = matchedActions.map { + case UpdateAction(cond, actions) => + UpdateAction(cond, actions ++ Seq(Assignment(rowId, rowId), + Assignment(lastUpdatedSequence, Literal(null)))) + + case p => p + } + + val notMatchedBySourceActionsForLineage = notMatchedBySourceActions.map { + case UpdateAction(cond, actions) => + UpdateAction(cond, actions ++ Seq(Assignment(rowId, rowId), + Assignment(lastUpdatedSequence, Literal(null)))) + + case p => p + } + + // Treat row lineage columns as data columns by removing the metadata attribute + // This works around the logic in + // ExposesMetadataColumns, used later in metadata attribute resolution, + // which prevents surfacing other metadata columns when a single metadata column is in the output + val rowLineageAsDataColumns = rowLineageAttributes.map(removeMetadataColumnAttribute) + + val tableWithLineage = r.copy(output = + r.output ++ rowLineageAsDataColumns) + + mergeIntoTable.copy( + targetTable = tableWithLineage, + matchedActions = matchedAssignmentsForLineage, + notMatchedBySourceActions = notMatchedBySourceActionsForLineage) + } + } + + // The plan should only be updated if row lineage metadata attributes are present + // in the target table AND lineage attributes are not already + // on the output of operation which indicates the rule already ran + private def shouldUpdatePlan(mergeIntoTable: MergeIntoTable): Boolean = { + val metadataOutput = mergeIntoTable.targetTable.metadataOutput + val rowLineageAttrs = findRowLineageAttributes(metadataOutput) + val allLineageAttrsPresent = rowLineageAttrs.nonEmpty && rowLineageAttrs.forall(metadataOutput.contains) Review Comment: Hm if `rowLineageAttrs` is empty, the `forall` returns true which we don't want since that means the lineage attributes are not present (in tables not supporting row lineage). Maybe with the suggestion above to always include _row_id this would get reworked anyways -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org