Copilot commented on code in PR #2864:
URL: https://github.com/apache/sedona/pull/2864#discussion_r3157573406
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala:
##########
@@ -169,14 +177,8 @@ case class BroadcastIndexJoinExec(
.iterator
.asScala
.asInstanceOf[Iterator[Geometry]]
- .filter(candidate =>
- evaluator.eval(
- preparedGeometries.getOrElseUpdate(
- candidate, {
- factory.create(candidate)
- }),
- geom))
- .map(candidate =>
joinedRow.withRight(candidate.getUserData.asInstanceOf[UnsafeRow]))
+ .filter(candidate => refiner.matches(candidate, geom))
+ .map(candidate => joinedRow.withRight(refiner.unpackRow(candidate)))
.exists(boundCondition)
Review Comment:
`semiJoin` also assumes `geom` is non-null and calls
`geom.getEnvelopeInternal`. Since `createStreamShapes` can return null shapes
when the join key is null, this can throw an NPE. Align behavior with
`antiJoin`/`outerJoin` by treating null stream shapes as having zero candidates
(thus producing no output for LeftSemi).
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryBase.scala:
##########
@@ -57,6 +58,30 @@ trait TraitJoinQueryBase {
spatialRdd
}
+ /**
+ * Builds a SpatialRDD from a column of GeographyUDT bytes. Each row becomes
a JTS geometry
+ * whose envelope is the Geography's lat/lng bounding rectangle
(full-longitude when the
+ * rectangle wraps the antimeridian). The Geography object is carried
alongside the original row
+ * in `userData` via [[GeographyJoinShape]] so the join executor can perform
S2-based predicate
+ * refinement and emit the row.
+ */
+ def toGeographySpatialRDD(
+ rdd: RDD[UnsafeRow],
+ shapeExpression: Expression): SpatialRDD[Geometry] = {
+ val spatialRdd = new SpatialRDD[Geometry]
+ spatialRdd.setRawSpatialRDD(
+ rdd
+ .map { x =>
+ val geog =
+
GeographyWKBSerializer.deserialize(shapeExpression.eval(x).asInstanceOf[Array[Byte]])
+ val shape = JoinedGeometry.geographyToEnvelopeGeometry(geog)
+ shape.setUserData(GeographyJoinShape(geog, x.copy))
+ shape
Review Comment:
`toGeographySpatialRDD` deserializes `shapeExpression.eval(x)`
unconditionally. If the Geography join key column is nullable, a null value
will cause `GeographyWKBSerializer.deserialize(null)` to throw and fail the
broadcast-index build. Consider adding a null check (skip the row or create a
null/empty envelope entry consistent with how nulls are handled on the stream
side).
```suggestion
.flatMap { x =>
val geogBytes = shapeExpression.eval(x).asInstanceOf[Array[Byte]]
if (geogBytes == null) {
None
} else {
val geog = GeographyWKBSerializer.deserialize(geogBytes)
val shape = JoinedGeometry.geographyToEnvelopeGeometry(geog)
shape.setUserData(GeographyJoinShape(geog, x.copy))
Some(shape)
}
```
##########
common/src/main/java/org/apache/sedona/common/S2Geography/WkbS2Shape.java:
##########
@@ -229,6 +243,18 @@ private int findChain(int edgeId) {
return 0;
}
+ /**
+ * Returns true when the ring's first and last vertex coordinates are
byte-identical (i.e. the
+ * ring is closed in the standard WKB sense). Sedona's own WKBWriter
produces open rings, so a
+ * cheap byte-level comparison lets us distinguish the two cases without
running through the
+ * S2Point conversion.
+ */
+ private static boolean firstAndLastEqual(ByteBuffer buf, int byteOffset, int
numCoords) {
+ int lastOffset = byteOffset + (numCoords - 1) * 16;
+ return buf.getDouble(byteOffset) == buf.getDouble(lastOffset)
+ && buf.getDouble(byteOffset + 8) == buf.getDouble(lastOffset + 8);
+ }
Review Comment:
The comment for `firstAndLastEqual` says it checks whether coordinates are
"byte-identical" and does a "byte-level comparison", but the implementation
compares doubles using `==` (numeric equality). Either update the comment to
match the behavior, or implement an actual byte-wise comparison if that
distinction matters for correctness.
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala:
##########
@@ -146,20 +158,16 @@ case class BroadcastIndexJoinExec(
.iterator
.asScala
.asInstanceOf[Iterator[Geometry]]
- .filter(candidate =>
- evaluator.eval(
- preparedGeometries.getOrElseUpdate(candidate, {
factory.create(candidate) }),
- geom))
- .map(candidate =>
joinedRow.withRight(candidate.getUserData.asInstanceOf[UnsafeRow]))
+ .filter(candidate => refiner.matches(candidate, geom))
+ .map(candidate => joinedRow.withRight(refiner.unpackRow(candidate)))
.filter(boundCondition)
}
}
Review Comment:
`innerJoin` calls `geom.getEnvelopeInternal` without guarding for `geom ==
null`, but `createStreamShapes` can emit `(null, row)` when the join key
evaluates to null. This will throw a NullPointerException instead of treating
null join keys as non-matching rows. Consider short-circuiting null stream
shapes (e.g., return `Iterator.empty` for inner joins) before querying the
index.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]