Copilot commented on code in PR #2946:
URL: https://github.com/apache/sedona/pull/2946#discussion_r3216323776
##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala:
##########
@@ -88,4 +89,57 @@ object GeoParquetSpatialFilter {
}
override def simpleString: String = s"$columnName ${predicateType.name}
$queryWindow"
}
+
+ /**
+ * Pushdown filter for predicates that operate on a Box2D-typed column (e.g.
+ * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col,
lit_box)`).
+ *
+ * Per-file evaluation: walks the file's GeoParquet column metadata to find
the geometry column
+ * whose covering metadata points at `box2dColumnName`, then prunes using
that geometry column's
+ * recorded bbox.
+ *
+ * Both intersects and contains map to a file-level INTERSECTS check:
per-row containment
+ * implies per-row intersection, which implies the file's union envelope
must intersect the
+ * query box for any row to match. If no geometry column references this
Box2D column as its
+ * covering, the file is kept (cannot prune safely).
+ *
+ * @param box2dColumnName
+ * the Box2D column referenced by the predicate
+ * @param queryBox
+ * the literal Box2D from the predicate's RHS
+ */
+ case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D)
+ extends GeoParquetSpatialFilter {
+
+ override def evaluate(columns: Map[String, GeometryFieldMetaData]):
Boolean = {
+ // Find the geometry column whose covering metadata points at this Box2D
column.
+ val matchingGeomEntry = columns.find { case (_, field) =>
+ field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName))
+ }
+
+ matchingGeomEntry match {
+ case Some((_, field)) =>
+ // Use the geometry column's recorded bbox to prune. The union of
per-row Box2D values
+ // is a superset of the geometry column's bbox (covering boxes are
at least as wide as
+ // their geometries), so if the geom-column bbox does not intersect
the query box, no
+ // row's Box2D can intersect either. May leave some files unpruned
when Box2D values
+ // are conservatively wider than geometries, but never produces
false negatives.
Review Comment:
Box2DLeafFilter currently prunes files using the *geometry column*'s `bbox`
(field.bbox) after resolving which geometry column is covered by the Box2D
column. This is not sound for `ST_BoxIntersects/Contains(box_col, lit_box)`
because the filter semantics are on `box_col`, and covering boxes are allowed
to be conservatively wider than the geometry extent; in that case the geometry
bbox can be strictly smaller than the union of Box2D values and this pruning
can drop files that actually contain matching rows (false negatives). To keep
pushdown sound, this needs to use file-level bounds of the Box2D column itself
(e.g., derived from Parquet statistics for xmin/ymin/xmax/ymax) or fall back to
"keep file" unless you can prove the covering column is an exact per-row
geometry envelope (e.g., auto-generated `ST_Box2D(geom)` covering).
##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala:
##########
@@ -88,4 +89,57 @@ object GeoParquetSpatialFilter {
}
override def simpleString: String = s"$columnName ${predicateType.name}
$queryWindow"
}
+
+ /**
+ * Pushdown filter for predicates that operate on a Box2D-typed column (e.g.
+ * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col,
lit_box)`).
+ *
+ * Per-file evaluation: walks the file's GeoParquet column metadata to find
the geometry column
+ * whose covering metadata points at `box2dColumnName`, then prunes using
that geometry column's
+ * recorded bbox.
+ *
+ * Both intersects and contains map to a file-level INTERSECTS check:
per-row containment
+ * implies per-row intersection, which implies the file's union envelope
must intersect the
+ * query box for any row to match. If no geometry column references this
Box2D column as its
+ * covering, the file is kept (cannot prune safely).
+ *
+ * @param box2dColumnName
+ * the Box2D column referenced by the predicate
+ * @param queryBox
+ * the literal Box2D from the predicate's RHS
+ */
+ case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D)
+ extends GeoParquetSpatialFilter {
+
+ override def evaluate(columns: Map[String, GeometryFieldMetaData]):
Boolean = {
+ // Find the geometry column whose covering metadata points at this Box2D
column.
+ val matchingGeomEntry = columns.find { case (_, field) =>
+ field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName))
+ }
+
+ matchingGeomEntry match {
+ case Some((_, field)) =>
+ // Use the geometry column's recorded bbox to prune. The union of
per-row Box2D values
+ // is a superset of the geometry column's bbox (covering boxes are
at least as wide as
+ // their geometries), so if the geom-column bbox does not intersect
the query box, no
+ // row's Box2D can intersect either. May leave some files unpruned
when Box2D values
+ // are conservatively wider than geometries, but never produces
false negatives.
+ val bbox = field.bbox.getOrElse(return true)
+ if (bbox.isEmpty) return true
+ val fileXMin = bbox(0)
+ val fileYMin = bbox(1)
+ val fileXMax = bbox(2)
+ val fileYMax = bbox(3)
+ !(fileXMax < queryBox.getXMin || fileXMin > queryBox.getXMax
+ || fileYMax < queryBox.getYMin || fileYMin > queryBox.getYMax)
+ case None =>
+ // No geometry column references this Box2D column as covering —
cannot prune safely.
+ true
Review Comment:
`columns.find` will return an arbitrary geometry column if multiple columns
reference the same covering Box2D column (Map iteration order is not
guaranteed). That can make pruning nondeterministic and potentially incorrect.
Consider collecting all matching geometry columns; if the match is not unique
(or ambiguous), skip pruning for that file (return true) instead of picking the
first.
##########
spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala:
##########
@@ -317,6 +318,73 @@ class GeoParquetSpatialFilterPushDownSuite extends
TestBaseScala with TableDrive
assert(getPushedDownSpatialFilter(dfFiltered).isEmpty)
}
}
+
+ it("Push down ST_BoxIntersects against a Box2D covering column") {
+ val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture()
+ try {
+ // Q1 region only (region 1, center +10/+10)
+ val q1Filter =
+ "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0),
ST_Point(15.0, 15.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1))
+
+ // Window covering Q2 and Q4 (negative X) — should preserve regions 0
and 2
+ val leftHalfFilter =
+ "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0),
ST_Point(-1.0, 20.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2))
+
+ // Disjoint window prunes everything
+ val disjointFilter =
+ "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0),
ST_Point(200.0, 200.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty)
+ } finally {
+ FileUtils.deleteDirectory(new File(box2dDir).getParentFile)
+ }
+ }
+
+ it("Push down ST_BoxContains against a Box2D covering column") {
+ val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture()
+ try {
+ // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the
file level. A tiny
+ // query box inside Q1 prunes everything except region 1.
+ val containsFilter =
+ "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0),
ST_Point(10.0, 10.0)))"
+ verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1))
+ } finally {
+ FileUtils.deleteDirectory(new File(box2dDir).getParentFile)
+ }
+ }
+ }
+
+ private def setupBox2DCoveringFixture()
+ : (DataFrame, String, Map[Int, Seq[GeoParquetMetaData]]) = {
+ val box2dParent =
+
Files.createTempDirectory("sedona_geoparquet_box2d_").toFile.getAbsolutePath
+ val box2dDir = box2dParent + "/data"
+ val withBox = df.withColumn("geom_bbox", expr("ST_Box2D(geom)"))
+
withBox.coalesce(1).write.partitionBy("region").format("geoparquet").save(box2dDir)
+ val box2dDf = sparkSession.read.format("geoparquet").load(box2dDir)
+ val box2dMetaMap = readGeoParquetMetaDataMap(box2dDir)
+ (box2dDf, box2dDir, box2dMetaMap)
+ }
+
+ private def verifyBox2DFilter(
+ box2dDf: DataFrame,
+ box2dMetaMap: Map[Int, Seq[GeoParquetMetaData]],
+ condition: String,
+ expectedPreservedRegions: Seq[Int]): Unit = {
+ val dfFiltered = box2dDf.where(condition)
+ val pushed = getPushedDownSpatialFilter(dfFiltered)
+ assert(pushed.isDefined, s"Expected filter push-down for: $condition")
+ val preserved = box2dMetaMap
+ .filter { case (_, metaDataList) =>
+ metaDataList.exists(metadata => pushed.get.evaluate(metadata.columns))
+ }
Review Comment:
These new Box2D pushdown tests validate that a filter is recognized and that
the pushed-down filter prunes the expected regions via metadata evaluation, but
they do not assert query-result correctness (unlike `testFilter`, which
compares results against the non-GeoParquet DF). Adding an `expectedResult vs
actualResult` assertion for the Box2D conditions would help catch cases where
the pushdown is applied but produces incorrect results due to overly aggressive
pruning.
##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala:
##########
@@ -42,10 +43,12 @@ import
org.apache.spark.sql.execution.datasources.PushableColumnBase
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormatBase
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.AndFilter
+import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.Box2DLeafFilter
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.LeafFilter
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.OrFilter
-import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_Buffer,
ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance,
ST_DistanceSphere, ST_DistanceSpheroid, ST_Equals, ST_Intersects,
ST_OrderingEquals, ST_Overlaps, ST_Touches, ST_Within}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sedona_sql.UDT.{Box2DUDT, GeometryUDT}
Review Comment:
`Box2DUDT` is imported here but not used in this file. Please remove the
unused import to avoid build/style failures under stricter compiler/linter
settings.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]