aokolnychyi commented on code in PR #7646:
URL: https://github.com/apache/iceberg/pull/7646#discussion_r1198250429
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -66,115 +67,219 @@ case class MergeRowsExec(
child.execute().mapPartitions(processPartition)
}
- private def createProjection(exprs: Seq[Expression], attrs: Seq[Attribute]):
UnsafeProjection = {
- UnsafeProjection.create(exprs, attrs)
+ private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+ UnsafeProjection.create(exprs, child.output)
}
- private def createPredicate(expr: Expression, attrs: Seq[Attribute]):
BasePredicate = {
- GeneratePredicate.generate(expr, attrs)
+ private def createPredicate(expr: Expression): BasePredicate = {
+ GeneratePredicate.generate(expr, child.output)
}
- private def applyProjection(
- actions: Seq[(BasePredicate, Option[UnsafeProjection])],
- inputRow: InternalRow): InternalRow = {
+ // This method is responsible for processing a input row to emit the
resultant row with an
+ // additional column that indicates whether the row is going to be included
in the final
+ // output of merge or not.
+ // 1. Found a target row for which there is no corresponding source row
(join condition not met)
+ // - Only project the target columns if we need to output unchanged rows
(group-based commands)
+ // 2. Found a source row for which there is no corresponding target row
(join condition not met)
+ // - Apply the not matched actions (i.e INSERT actions) if non match
conditions are met.
+ // 3. Found a source row for which there is a corresponding target row (join
condition met)
+ // - Apply the matched actions (i.e DELETE or UPDATE actions) if match
conditions are met.
+ private def processPartition(rowIterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
+ val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+ val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
- // find the first action where the predicate evaluates to true
- // if there are overlapping conditions in actions, use the first matching
action
- // in the example below, when id = 5, both actions match but the first one
is applied
- // WHEN MATCHED AND id > 1 AND id < 10 UPDATE *
- // WHEN MATCHED AND id = 5 OR id = 21 DELETE
+ val matchedActions = matchedConditions.zip(matchedOutputs).map { case
(cond, outputs) =>
+ outputs match {
+ case Seq(output1, output2) =>
+ Split(createPredicate(cond), createProjection(output1),
createProjection(output2))
+ case Seq(output) =>
+ Project(createPredicate(cond), createProjection(output))
+ case Nil =>
+ Project(createPredicate(cond), EmptyProjection)
+ }
+ }
- val pair = actions.find {
- case (predicate, _) => predicate.eval(inputRow)
+ val notMatchedActions = notMatchedConditions.zip(notMatchedOutputs).map {
case (cond, output) =>
+ Project(createPredicate(cond), createProjection(output))
}
- // apply the projection to produce an output row, or return null to
suppress this row
- pair match {
- case Some((_, Some(projection))) =>
- projection.apply(inputRow)
- case _ =>
- null
+ val projectTargetCols = createProjection(targetOutput)
+
+ val cardinalityCheck = if (performCardinalityCheck) {
Review Comment:
I considered using `Option` but I was a bit concerned how it would look like
in bytecode. I'd need to use foreach on it, which has a nested if. I hope JVM
would be smart enough to detect the empty method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]