aokolnychyi commented on code in PR #7646:
URL: https://github.com/apache/iceberg/pull/7646#discussion_r1198358267
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -66,115 +67,219 @@ case class MergeRowsExec(
child.execute().mapPartitions(processPartition)
}
- private def createProjection(exprs: Seq[Expression], attrs: Seq[Attribute]):
UnsafeProjection = {
- UnsafeProjection.create(exprs, attrs)
+ private def createProjection(exprs: Seq[Expression]): UnsafeProjection = {
+ UnsafeProjection.create(exprs, child.output)
}
- private def createPredicate(expr: Expression, attrs: Seq[Attribute]):
BasePredicate = {
- GeneratePredicate.generate(expr, attrs)
+ private def createPredicate(expr: Expression): BasePredicate = {
+ GeneratePredicate.generate(expr, child.output)
}
- private def applyProjection(
- actions: Seq[(BasePredicate, Option[UnsafeProjection])],
- inputRow: InternalRow): InternalRow = {
+ // This method is responsible for processing a input row to emit the
resultant row with an
+ // additional column that indicates whether the row is going to be included
in the final
+ // output of merge or not.
+ // 1. Found a target row for which there is no corresponding source row
(join condition not met)
+ // - Only project the target columns if we need to output unchanged rows
(group-based commands)
+ // 2. Found a source row for which there is no corresponding target row
(join condition not met)
+ // - Apply the not matched actions (i.e INSERT actions) if non match
conditions are met.
+ // 3. Found a source row for which there is a corresponding target row (join
condition met)
+ // - Apply the matched actions (i.e DELETE or UPDATE actions) if match
conditions are met.
+ private def processPartition(rowIterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
+ val isSourceRowPresentPred = createPredicate(isSourceRowPresent)
+ val isTargetRowPresentPred = createPredicate(isTargetRowPresent)
- // find the first action where the predicate evaluates to true
- // if there are overlapping conditions in actions, use the first matching
action
- // in the example below, when id = 5, both actions match but the first one
is applied
- // WHEN MATCHED AND id > 1 AND id < 10 UPDATE *
- // WHEN MATCHED AND id = 5 OR id = 21 DELETE
+ val matchedActions = matchedConditions.zip(matchedOutputs).map { case
(cond, outputs) =>
+ outputs match {
+ case Seq(output1, output2) =>
+ Split(createPredicate(cond), createProjection(output1),
createProjection(output2))
+ case Seq(output) =>
+ Project(createPredicate(cond), createProjection(output))
+ case Nil =>
+ Project(createPredicate(cond), EmptyProjection)
+ }
+ }
- val pair = actions.find {
- case (predicate, _) => predicate.eval(inputRow)
+ val notMatchedActions = notMatchedConditions.zip(notMatchedOutputs).map {
case (cond, output) =>
+ Project(createPredicate(cond), createProjection(output))
}
- // apply the projection to produce an output row, or return null to
suppress this row
- pair match {
- case Some((_, Some(projection))) =>
- projection.apply(inputRow)
- case _ =>
- null
+ val projectTargetCols = createProjection(targetOutput)
+
+ val cardinalityCheck = if (performCardinalityCheck) {
+ val rowIdOrdinal = child.output.indexWhere(attr =>
conf.resolver(attr.name, ROW_ID))
+ assert(rowIdOrdinal != -1, "Cannot find row ID attr")
+ BitmapCardinalityCheck(rowIdOrdinal)
+ } else {
+ EmptyCardinalityCheck
}
+
+ val mergeIterator = if (matchedActions.exists(_.isInstanceOf[Split])) {
+ new SplittingMergeRowIterator(
+ rowIterator, cardinalityCheck, isTargetRowPresentPred,
+ matchedActions, notMatchedActions)
+ } else {
+ new MergeRowIterator(
+ rowIterator, cardinalityCheck, isTargetRowPresentPred,
isSourceRowPresentPred,
+ projectTargetCols, matchedActions.asInstanceOf[Seq[Project]],
notMatchedActions)
+ }
+
+ // null indicates a record must be discarded
+ mergeIterator.filter(_ != null)
}
- private def processPartition(rowIterator: Iterator[InternalRow]):
Iterator[InternalRow] = {
- val inputAttrs = child.output
+ trait Action {
+ def cond: BasePredicate
+ }
+
+ case class Project(cond: BasePredicate, proj: Projection) extends Action {
+ def apply(row: InternalRow): InternalRow = proj.apply(row)
+ }
- val isSourceRowPresentPred = createPredicate(isSourceRowPresent,
inputAttrs)
- val isTargetRowPresentPred = createPredicate(isTargetRowPresent,
inputAttrs)
+ case class Split(cond: BasePredicate, proj: Projection, otherProj:
Projection) extends Action {
+ def projectRow(row: InternalRow): InternalRow = proj.apply(row)
+ def projectExtraRow(row: InternalRow): InternalRow = otherProj.apply(row)
+ }
- val matchedPreds = matchedConditions.map(createPredicate(_, inputAttrs))
- val matchedProjs = matchedOutputs.map {
- case output if output.nonEmpty => Some(createProjection(output,
inputAttrs))
- case _ => None
- }
- val matchedPairs = matchedPreds zip matchedProjs
+ object EmptyProjection extends Projection {
+ override def apply(row: InternalRow): InternalRow = null
+ }
- val notMatchedPreds = notMatchedConditions.map(createPredicate(_,
inputAttrs))
- val notMatchedProjs = notMatchedOutputs.map {
- case output if output.nonEmpty => Some(createProjection(output,
inputAttrs))
- case _ => None
- }
- val nonMatchedPairs = notMatchedPreds zip notMatchedProjs
-
- val projectTargetCols = createProjection(targetOutput, inputAttrs)
-
- // This method is responsible for processing a input row to emit the
resultant row with an
- // additional column that indicates whether the row is going to be
included in the final
- // output of merge or not.
- // 1. Found a target row for which there is no corresponding source row
(join condition not met)
- // - Only project the target columns if we need to output unchanged rows
- // 2. Found a source row for which there is no corresponding target row
(join condition not met)
- // - Apply the not matched actions (i.e INSERT actions) if non match
conditions are met.
- // 3. Found a source row for which there is a corresponding target row
(join condition met)
- // - Apply the matched actions (i.e DELETE or UPDATE actions) if match
conditions are met.
- def processRow(inputRow: InternalRow): InternalRow = {
- if (emitNotMatchedTargetRows && !isSourceRowPresentPred.eval(inputRow)) {
- projectTargetCols.apply(inputRow)
- } else if (!isTargetRowPresentPred.eval(inputRow)) {
- applyProjection(nonMatchedPairs, inputRow)
+ class MergeRowIterator(
+ private val rowIterator: Iterator[InternalRow],
+ private val cardinalityCheck: CardinalityCheck,
+ private val isTargetRowPresentPred: BasePredicate,
+ private val isSourceRowPresentPred: BasePredicate,
+ private val targetTableProj: Projection,
+ private val matchedActions: Seq[Project],
+ private val notMatchedActions: Seq[Project])
+ extends Iterator[InternalRow] {
+
+ override def hasNext: Boolean = rowIterator.hasNext
+
+ override def next(): InternalRow = {
+ val row = rowIterator.next()
+
+ val isSourceRowPresent = isSourceRowPresentPred.eval(row)
+ val isTargetRowPresent = isTargetRowPresentPred.eval(row)
+
+ if (isTargetRowPresent && isSourceRowPresent) {
+ cardinalityCheck.execute(row)
+ applyMatchedActions(row)
+ } else if (isSourceRowPresent) {
+ applyNotMatchedActions(row)
+ } else if (emitNotMatchedTargetRows && isTargetRowPresent) {
+ targetTableProj.apply(row)
} else {
- applyProjection(matchedPairs, inputRow)
+ null
}
}
- val matchedRowIds = new Roaring64Bitmap()
+ private def applyMatchedActions(row: InternalRow): InternalRow = {
+ for (action <- matchedActions) {
+ if (action.cond.eval(row)) {
+ return action.apply(row)
+ }
+ }
- def processRowWithCardinalityCheck(rowIdOrdinal: Int)(inputRow:
InternalRow): InternalRow = {
- val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
- val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
+ if (emitNotMatchedTargetRows) targetTableProj.apply(row) else null
+ }
- if (isSourceRowPresent && isTargetRowPresent) {
- val currentRowId = inputRow.getLong(rowIdOrdinal)
- if (matchedRowIds.contains(currentRowId)) {
- throw new SparkException(
- "The ON search condition of the MERGE statement matched a single
row from " +
- "the target table with multiple rows of the source table. This
could result " +
- "in the target row being operated on more than once with an update
or delete " +
- "operation and is not allowed.")
+ private def applyNotMatchedActions(row: InternalRow): InternalRow = {
+ for (action <- notMatchedActions) {
+ if (action.cond.eval(row)) {
+ return action.apply(row)
}
- matchedRowIds.add(currentRowId)
}
- if (emitNotMatchedTargetRows && !isSourceRowPresent) {
- projectTargetCols.apply(inputRow)
- } else if (!isTargetRowPresent) {
- applyProjection(nonMatchedPairs, inputRow)
+ null
+ }
+ }
+
+ class SplittingMergeRowIterator(
+ private val rowIterator: Iterator[InternalRow],
+ private val cardinalityCheck: CardinalityCheck,
+ private val isTargetRowPresentPred: BasePredicate,
+ private val matchedActions: Seq[Action],
+ private val notMatchedActions: Seq[Project])
+ extends Iterator[InternalRow] {
+
+ var cachedExtraRow: InternalRow = _
+
+ override def hasNext: Boolean = cachedExtraRow != null ||
rowIterator.hasNext
+
+ override def next(): InternalRow = {
+ if (cachedExtraRow != null) {
+ val extraRow = cachedExtraRow
+ cachedExtraRow = null
+ return extraRow
+ }
+
+ val row = rowIterator.next()
+
+ // it should be OK to just check if the target row exists
+ // as this iterator is only used for delta-based row-level plans
+ // that are rewritten using an inner or right outer join
+ if (isTargetRowPresentPred.eval(row)) {
+ cardinalityCheck.execute(row)
+ applyMatchedActions(row)
} else {
- applyProjection(matchedPairs, inputRow)
+ applyNotMatchedActions(row)
}
}
- val processFunc: InternalRow => InternalRow = if (performCardinalityCheck)
{
- val rowIdOrdinal = child.output.indexWhere(attr =>
conf.resolver(attr.name, ROW_ID))
- assert(rowIdOrdinal != -1, "Cannot find row ID attr")
- processRowWithCardinalityCheck(rowIdOrdinal)
- } else {
- processRow
+ private def applyMatchedActions(row: InternalRow): InternalRow = {
+ for (action <- matchedActions) {
Review Comment:
This is something that is invoked per record so I wanted it to produce as
few objects as possible and as simple bytecode as possible, hoping JIT would
then make smart choices.
I do like None and find but I am paranoid it would add more calls.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]