rdblue commented on code in PR #9556:
URL: https://github.com/apache/iceberg/pull/9556#discussion_r1468925515


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -214,6 +214,8 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelIcebergCommand with Predicat
 
     val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan)
     val rowFromTargetAttr = resolveAttrRef(ROW_FROM_TARGET_REF, joinPlan)
+    // The output expression should retain read attributes for correctly 
determining nullability
+    val matchedOutputsWithAttrs = matchedActions.map(matchedActionOutput(_, 
metadataAttrs) :+ readAttrs)

Review Comment:
   This doesn't make sense to me. This does have the intended effect to 
correctly set the nullability in `buildMergeRowsOutput`, it seems to me like 
there is a better way than creating a different (fake) version of 
`matchedOutputs`. I think this works because it is including the original 
attributes as fake outputs (_for every action output list_). A more direct way 
of doing the same thing is to make an attribute nullable if it is nullable OR 
if an output is nullable.
   
   In other words, I think the actual problem is in 
`RewriteRowLevelIcebergCommand.buildMergingOutput`:
   
   ```scala
     protected def buildMergingOutput(
         outputs: Seq[Seq[Expression]],
         attrs: Seq[Attribute]): Seq[Attribute] = {
   
       // build a correct nullability map for output attributes
       // an attribute is nullable if at least one output may produce null
       val nullabilityMap = attrs.indices.map { index =>
         index -> outputs.exists(output => output(index).nullable)
       }.toMap
   
       attrs.zipWithIndex.map { case (attr, index) =>
         AttributeReference(attr.name, attr.dataType, nullabilityMap(index))()
       }
     }
   ```
   
   The change in the current PR updates the `outputs` arg to that method to 
include a copy of the original `attrs` for every matched expression. What ends 
up happening is that `outputs.exists` checks whether the corresponding original 
attr is nullable. You could do the same without this PR's change like this:
   
   ```scala
     protected def buildMergingOutput(
         outputs: Seq[Seq[Expression]],
         attrs: Seq[Attribute]): Seq[Attribute] = {
   
       // build a correct nullability map for output attributes
       // an attribute is nullable if at least one output may produce null
       val nullabilityMap = attrs.indices.map { index =>
         index -> outputs.exists(output => output(index).nullable)
       }.toMap
   
       attrs.zipWithIndex.map { case (attr, index) =>
         AttributeReference(attr.name, attr.dataType, attr.nullable || 
nullabilityMap(index))()
       }
     }
   ```
   
   Looking at this code also raises a larger question: what is 
`buildMergingOutput` trying to do? It basically copies the input attrs to be 
the `MergeRows` output attrs, but then bases nullability on whether unrelated 
expressions are nullable. The problem is the expression coming from an output 
comes from an `Assignment` and has nothing to do with the position in the input 
attrs.
   
   @aokolnychyi do you know what's happening here? Why would the output of 
assignment expressions affect the nullability of the incoming data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to