amogh-jahagirdar commented on code in PR #12736: URL: https://github.com/apache/iceberg/pull/12736#discussion_r2053163626
########## spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTableForRowLineage.scala: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.Assignment +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable +import org.apache.spark.sql.catalyst.plans.logical.UpdateAction +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation + +object RewriteMergeIntoTableForRowLineage extends RewriteOperationForRowLineage { + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperators { + case m @ MergeIntoTable(_, _, _, matchedActions, _, notMatchedBySourceActions) + if m.resolved && m.rewritable && m.aligned && + (matchedActions.nonEmpty || notMatchedBySourceActions.nonEmpty) && + shouldUpdatePlan(m) => + updateMergeIntoForRowLineage(m) + } + } + + protected def updateMergeIntoForRowLineage(mergeIntoTable: MergeIntoTable): LogicalPlan = { + EliminateSubqueryAliases(mergeIntoTable.targetTable) match { + case r: DataSourceV2Relation => + val matchedActions = mergeIntoTable.matchedActions + val notMatchedBySourceActions = mergeIntoTable.notMatchedBySourceActions + val rowLineageAttributes = findRowLineageAttributes(r.metadataOutput) + val rowId = rowLineageAttributes.filter( + attr => attr.name == ROW_ID_ATTRIBUTE_NAME).head + val lastUpdatedSequence = rowLineageAttributes.filter( + attr => attr.name == LAST_UPDATED_SEQUENCE_NUMBER_ATTRIBUTE_NAME).head + + val matchedAssignmentsForLineage = matchedActions.map { + case UpdateAction(cond, actions) => + UpdateAction(cond, actions ++ Seq(Assignment(rowId, rowId), + Assignment(lastUpdatedSequence, Literal(null)))) + + case p => p + } + + val notMatchedBySourceActionsForLineage = notMatchedBySourceActions.map { + case UpdateAction(cond, actions) => + UpdateAction(cond, actions ++ Seq(Assignment(rowId, rowId), + Assignment(lastUpdatedSequence, Literal(null)))) + + case p => p + } + + // Treat row lineage columns as data columns by removing the metadata attribute + // This works around the logic in + // ExposesMetadataColumns, used later in metadata attribute resolution, + // which prevents surfacing other metadata columns when a single metadata column is in the output + val rowLineageAsDataColumns = rowLineageAttributes.map(removeMetadataColumnAttribute) + + val tableWithLineage = r.copy(output = + r.output ++ rowLineageAsDataColumns) + + mergeIntoTable.copy( + targetTable = tableWithLineage, + matchedActions = matchedAssignmentsForLineage, + notMatchedBySourceActions = notMatchedBySourceActionsForLineage) + } + } + + // The plan should only be updated if row lineage metadata attributes are present + // in the target table AND lineage attributes are not already Review Comment: >Don't we always want to have _row_id available but read null if the table isn't v3? Hm I looked at it as we only want to have _row_id available for v3+ since the existence of that column in the projection could be potentially misleading, though arguably since it would just be null means that projecting it is fine since that implicitly indicates there is no row lineage. >That said, if metadataOutput always has _row_id then you'd need to look at the table in the relation and see if it is a v3 table, probably using the format-version table property. Yeah, we'd have to extract the Iceberg table from the DSV2 table. I don't think that's too bad, I just was concerned that merely indicating _row_id in older tables was problematic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org