rdblue commented on code in PR #9556: URL: https://github.com/apache/iceberg/pull/9556#discussion_r1468925515
########## spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala: ########## @@ -214,6 +214,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan) val rowFromTargetAttr = resolveAttrRef(ROW_FROM_TARGET_REF, joinPlan) + // The output expression should retain read attributes for correctly determining nullability + val matchedOutputsWithAttrs = matchedActions.map(matchedActionOutput(_, metadataAttrs) :+ readAttrs) Review Comment: This doesn't make sense to me. This does have the intended effect to correctly set the nullability in `buildMergeRowsOutput`, it seems to me like there is a better way than creating a different (fake) version of `matchedOutputs`. I think this works because it is including the original attributes as fake outputs (_for every action output list_). A more direct way of doing the same thing is to make an attribute nullable if it is nullable OR if an output is nullable. In other words, I think the actual problem is in `RewriteRowLevelIcebergCommand.buildMergingOutput`: ```scala protected def buildMergingOutput( outputs: Seq[Seq[Expression]], attrs: Seq[Attribute]): Seq[Attribute] = { // build a correct nullability map for output attributes // an attribute is nullable if at least one output may produce null val nullabilityMap = attrs.indices.map { index => index -> outputs.exists(output => output(index).nullable) }.toMap attrs.zipWithIndex.map { case (attr, index) => AttributeReference(attr.name, attr.dataType, nullabilityMap(index))() } } ``` The change in the current PR updates the `outputs` arg to that method to include a copy of the original `attrs` for every matched expression. What ends up happening is that `outputs.exists` checks whether the corresponding original attr is nullable. You could do the same without this PR's change like this: ```scala protected def buildMergingOutput( outputs: Seq[Seq[Expression]], attrs: Seq[Attribute]): Seq[Attribute] = { // build a correct nullability map for output attributes // an attribute is nullable if at least one output may produce null val nullabilityMap = attrs.indices.map { index => index -> outputs.exists(output => output(index).nullable) }.toMap attrs.zipWithIndex.map { case (attr, index) => AttributeReference(attr.name, attr.dataType, attr.nullable || nullabilityMap(index))() } } ``` Looking at this code also raises a larger question: what is `buildMergingOutput` trying to do? It basically copies the input attrs to be the `MergeRows` output attrs, but then bases nullability on whether unrelated expressions are nullable. The problem is the expression coming from an output comes from an `Assignment` and has nothing to do with the position in the input attrs. @aokolnychyi do you know what's happening here? Why would the output of assignment expressions affect the nullability of the incoming data? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org