mbutrovich commented on PR #4073:
URL:
https://github.com/apache/datafusion-comet/pull/4073#issuecomment-4353674517
Thanks @viirya for picking this up! Plan coverage jumping that much on q69
and q87 is also nice to see.
A few questions below. None feel blocking.
## Questions
Any appetite for a defensive guard in `CometHashJoin` before forwarding
`bhj.isNullAwareAntiJoin`? Spark's `BroadcastHashJoinExec.scala:51-57`
currently enforces `leftKeys.length == 1`, `rightKeys.length == 1`, `joinType
== LeftAnti`, `buildSide == BuildRight`, and `condition.isEmpty` via `require`
at construction. DataFusion enforces the first three at `HashJoinExec::try_new`
(`exec.rs:405-420`). If Spark ever loosened any of those, the serde would
forward `null_aware_anti_join = true` and DataFusion would reject the plan,
which fails the task rather than falling back. Something like this in
`operators.scala`:
```scala
val isNullAwareAntiJoin = join match {
case bhj: BroadcastHashJoinExec => bhj.isNullAwareAntiJoin
case _ => false
}
if (isNullAwareAntiJoin &&
(join.leftKeys.length != 1 || join.rightKeys.length != 1 ||
join.joinType != LeftAnti || join.buildSide != BuildRight ||
join.condition.isDefined)) {
withInfo(join, "null-aware anti join requires single-column LeftAnti
BuildRight with no condition")
return None
}
```
Probably overkill given how stable those invariants are, but the cost is
tiny and it means Comet correctness stops depending on a Spark contract we
don't own. What do you think?
A matching `plan_err!` before `HashJoinExec::try_new` in `planner.rs` would
also make the failure mode clearer if we ever did hit it. DataFusion's own
message is accurate but fairly generic.
## Suggested tests
Two cases I did not see covered here or in DataFusion's
`test_null_aware_anti_join_*` suite.
Empty subquery. `x NOT IN (SELECT y FROM empty)` should return every left
row including ones with NULL in the probe column. Distinct code path from "no
NULLs and no match".
```scala
withParquetTable(Seq[(Int, Integer)]((1, 1), (2, null), (3, 3)), "tbl_a") {
withParquetTable(Seq.empty[(Integer, Int)], "tbl_b") {
val df = sql("SELECT * FROM tbl_a WHERE _2 NOT IN (SELECT _1 FROM
tbl_b)")
checkSparkAnswerAndOperator(df)
}
}
```
Both sides with NULL keys. The existing tests isolate NULL to one side at a
time. Since the algorithm handles build-side NULL and probe-side NULL on
separate branches, might be worth one case that combines them.
```scala
withParquetTable(Seq[(Int, Integer)]((1, 1), (2, null), (3, 3)), "tbl_a") {
withParquetTable(Seq[(Integer, Int)]((1, 100), (null, 200)), "tbl_b") {
val df = sql("SELECT * FROM tbl_a WHERE _2 NOT IN (SELECT _1 FROM
tbl_b)")
checkSparkAnswerAndOperator(df)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]