aokolnychyi commented on code in PR #7646:
URL: https://github.com/apache/iceberg/pull/7646#discussion_r1198250429


##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -66,115 +67,219 @@ case class MergeRowsExec(
     child.execute().mapPartitions(processPartition)
   }
 
-  private def createProjection(exprs: Seq[Expression], attrs: Seq[Attribute]): 
UnsafeProjection = {
-    UnsafeProjection.create(exprs, attrs)
+  private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+    UnsafeProjection.create(exprs, child.output)
   }
 
-  private def createPredicate(expr: Expression, attrs: Seq[Attribute]): 
BasePredicate = {
-    GeneratePredicate.generate(expr, attrs)
+  private def createPredicate(expr: Expression): BasePredicate = {
+    GeneratePredicate.generate(expr, child.output)
   }
 
-  private def applyProjection(
-      actions: Seq[(BasePredicate, Option[UnsafeProjection])],
-      inputRow: InternalRow): InternalRow = {
+  // This method is responsible for processing a input row to emit the 
resultant row with an
+  // additional column that indicates whether the row is going to be included 
in the final
+  // output of merge or not.
+  // 1. Found a target row for which there is no corresponding source row 
(join condition not met)
+  //    - Only project the target columns if we need to output unchanged rows 
(group-based commands)
+  // 2. Found a source row for which there is no corresponding target row 
(join condition not met)
+  //    - Apply the not matched actions (i.e INSERT actions) if non match 
conditions are met.
+  // 3. Found a source row for which there is a corresponding target row (join 
condition met)
+  //    - Apply the matched actions (i.e DELETE or UPDATE actions) if match 
conditions are met.
+  private def processPartition(rowIterator: Iterator[InternalRow]): 
Iterator[InternalRow] = {
+    val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+    val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
 
-    // find the first action where the predicate evaluates to true
-    // if there are overlapping conditions in actions, use the first matching 
action
-    // in the example below, when id = 5, both actions match but the first one 
is applied
-    //   WHEN MATCHED AND id > 1 AND id < 10 UPDATE *
-    //   WHEN MATCHED AND id = 5 OR id = 21 DELETE
+    val matchedActions = matchedConditions.zip(matchedOutputs).map { case 
(cond, outputs) =>
+      outputs match {
+        case Seq(output1, output2) =>
+          Split(createPredicate(cond), createProjection(output1), 
createProjection(output2))
+        case Seq(output) =>
+          Project(createPredicate(cond), createProjection(output))
+        case Nil =>
+          Project(createPredicate(cond), EmptyProjection)
+      }
+    }
 
-    val pair = actions.find {
-      case (predicate, _) => predicate.eval(inputRow)
+    val notMatchedActions = notMatchedConditions.zip(notMatchedOutputs).map { 
case (cond, output) =>
+      Project(createPredicate(cond), createProjection(output))
     }
 
-    // apply the projection to produce an output row, or return null to 
suppress this row
-    pair match {
-      case Some((_, Some(projection))) =>
-        projection.apply(inputRow)
-      case _ =>
-        null
+    val projectTargetCols = createProjection(targetOutput)
+
+    val cardinalityCheck = if (performCardinalityCheck) {

Review Comment:
   I considered using `Option` but I was a bit concerned how it would look like 
in bytecode. I'd need to use foreach on it, which has a nested if. I hope JVM 
would be smart enough to detect the empty method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to