Copilot commented on code in PR #2946:
URL: https://github.com/apache/sedona/pull/2946#discussion_r3216323776


##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala:
##########
@@ -88,4 +89,57 @@ object GeoParquetSpatialFilter {
     }
     override def simpleString: String = s"$columnName ${predicateType.name} 
$queryWindow"
   }
+
+  /**
+   * Pushdown filter for predicates that operate on a Box2D-typed column (e.g.
+   * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col, 
lit_box)`).
+   *
+   * Per-file evaluation: walks the file's GeoParquet column metadata to find 
the geometry column
+   * whose covering metadata points at `box2dColumnName`, then prunes using 
that geometry column's
+   * recorded bbox.
+   *
+   * Both intersects and contains map to a file-level INTERSECTS check: 
per-row containment
+   * implies per-row intersection, which implies the file's union envelope 
must intersect the
+   * query box for any row to match. If no geometry column references this 
Box2D column as its
+   * covering, the file is kept (cannot prune safely).
+   *
+   * @param box2dColumnName
+   *   the Box2D column referenced by the predicate
+   * @param queryBox
+   *   the literal Box2D from the predicate's RHS
+   */
+  case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D)
+      extends GeoParquetSpatialFilter {
+
+    override def evaluate(columns: Map[String, GeometryFieldMetaData]): 
Boolean = {
+      // Find the geometry column whose covering metadata points at this Box2D 
column.
+      val matchingGeomEntry = columns.find { case (_, field) =>
+        field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName))
+      }
+
+      matchingGeomEntry match {
+        case Some((_, field)) =>
+          // Use the geometry column's recorded bbox to prune. The union of 
per-row Box2D values
+          // is a superset of the geometry column's bbox (covering boxes are 
at least as wide as
+          // their geometries), so if the geom-column bbox does not intersect 
the query box, no
+          // row's Box2D can intersect either. May leave some files unpruned 
when Box2D values
+          // are conservatively wider than geometries, but never produces 
false negatives.

Review Comment:
   Box2DLeafFilter currently prunes files using the *geometry column*'s `bbox` 
(field.bbox) after resolving which geometry column is covered by the Box2D 
column. This is not sound for `ST_BoxIntersects/Contains(box_col, lit_box)` 
because the filter semantics are on `box_col`, and covering boxes are allowed 
to be conservatively wider than the geometry extent; in that case the geometry 
bbox can be strictly smaller than the union of Box2D values and this pruning 
can drop files that actually contain matching rows (false negatives). To keep 
pushdown sound, this needs to use file-level bounds of the Box2D column itself 
(e.g., derived from Parquet statistics for xmin/ymin/xmax/ymax) or fall back to 
"keep file" unless you can prove the covering column is an exact per-row 
geometry envelope (e.g., auto-generated `ST_Box2D(geom)` covering).



##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSpatialFilter.scala:
##########
@@ -88,4 +89,57 @@ object GeoParquetSpatialFilter {
     }
     override def simpleString: String = s"$columnName ${predicateType.name} 
$queryWindow"
   }
+
+  /**
+   * Pushdown filter for predicates that operate on a Box2D-typed column (e.g.
+   * `ST_BoxIntersects(box_col, lit_box)` or `ST_BoxContains(box_col, 
lit_box)`).
+   *
+   * Per-file evaluation: walks the file's GeoParquet column metadata to find 
the geometry column
+   * whose covering metadata points at `box2dColumnName`, then prunes using 
that geometry column's
+   * recorded bbox.
+   *
+   * Both intersects and contains map to a file-level INTERSECTS check: 
per-row containment
+   * implies per-row intersection, which implies the file's union envelope 
must intersect the
+   * query box for any row to match. If no geometry column references this 
Box2D column as its
+   * covering, the file is kept (cannot prune safely).
+   *
+   * @param box2dColumnName
+   *   the Box2D column referenced by the predicate
+   * @param queryBox
+   *   the literal Box2D from the predicate's RHS
+   */
+  case class Box2DLeafFilter(box2dColumnName: String, queryBox: Box2D)
+      extends GeoParquetSpatialFilter {
+
+    override def evaluate(columns: Map[String, GeometryFieldMetaData]): 
Boolean = {
+      // Find the geometry column whose covering metadata points at this Box2D 
column.
+      val matchingGeomEntry = columns.find { case (_, field) =>
+        field.covering.exists(_.bbox.xmin.headOption.contains(box2dColumnName))
+      }
+
+      matchingGeomEntry match {
+        case Some((_, field)) =>
+          // Use the geometry column's recorded bbox to prune. The union of 
per-row Box2D values
+          // is a superset of the geometry column's bbox (covering boxes are 
at least as wide as
+          // their geometries), so if the geom-column bbox does not intersect 
the query box, no
+          // row's Box2D can intersect either. May leave some files unpruned 
when Box2D values
+          // are conservatively wider than geometries, but never produces 
false negatives.
+          val bbox = field.bbox.getOrElse(return true)
+          if (bbox.isEmpty) return true
+          val fileXMin = bbox(0)
+          val fileYMin = bbox(1)
+          val fileXMax = bbox(2)
+          val fileYMax = bbox(3)
+          !(fileXMax < queryBox.getXMin || fileXMin > queryBox.getXMax
+            || fileYMax < queryBox.getYMin || fileYMin > queryBox.getYMax)
+        case None =>
+          // No geometry column references this Box2D column as covering — 
cannot prune safely.
+          true

Review Comment:
   `columns.find` will return an arbitrary geometry column if multiple columns 
reference the same covering Box2D column (Map iteration order is not 
guaranteed). That can make pruning nondeterministic and potentially incorrect. 
Consider collecting all matching geometry columns; if the match is not unique 
(or ambiguous), skip pruning for that file (return true) instead of picking the 
first.
   



##########
spark/common/src/test/scala/org/apache/sedona/sql/GeoParquetSpatialFilterPushDownSuite.scala:
##########
@@ -317,6 +318,73 @@ class GeoParquetSpatialFilterPushDownSuite extends 
TestBaseScala with TableDrive
         assert(getPushedDownSpatialFilter(dfFiltered).isEmpty)
       }
     }
+
+    it("Push down ST_BoxIntersects against a Box2D covering column") {
+      val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture()
+      try {
+        // Q1 region only (region 1, center +10/+10)
+        val q1Filter =
+          "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(5.0, 5.0), 
ST_Point(15.0, 15.0)))"
+        verifyBox2DFilter(box2dDf, box2dMetaMap, q1Filter, Seq(1))
+
+        // Window covering Q2 and Q4 (negative X) — should preserve regions 0 
and 2
+        val leftHalfFilter =
+          "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(-20.0, -20.0), 
ST_Point(-1.0, 20.0)))"
+        verifyBox2DFilter(box2dDf, box2dMetaMap, leftHalfFilter, Seq(0, 2))
+
+        // Disjoint window prunes everything
+        val disjointFilter =
+          "ST_BoxIntersects(geom_bbox, ST_MakeBox2D(ST_Point(100.0, 100.0), 
ST_Point(200.0, 200.0)))"
+        verifyBox2DFilter(box2dDf, box2dMetaMap, disjointFilter, Seq.empty)
+      } finally {
+        FileUtils.deleteDirectory(new File(box2dDir).getParentFile)
+      }
+    }
+
+    it("Push down ST_BoxContains against a Box2D covering column") {
+      val (box2dDf, box2dDir, box2dMetaMap) = setupBox2DCoveringFixture()
+      try {
+        // ST_BoxContains(box_col, lit_box) pushes down as INTERSECTS at the 
file level. A tiny
+        // query box inside Q1 prunes everything except region 1.
+        val containsFilter =
+          "ST_BoxContains(geom_bbox, ST_MakeBox2D(ST_Point(9.0, 9.0), 
ST_Point(10.0, 10.0)))"
+        verifyBox2DFilter(box2dDf, box2dMetaMap, containsFilter, Seq(1))
+      } finally {
+        FileUtils.deleteDirectory(new File(box2dDir).getParentFile)
+      }
+    }
+  }
+
+  private def setupBox2DCoveringFixture()
+      : (DataFrame, String, Map[Int, Seq[GeoParquetMetaData]]) = {
+    val box2dParent =
+      
Files.createTempDirectory("sedona_geoparquet_box2d_").toFile.getAbsolutePath
+    val box2dDir = box2dParent + "/data"
+    val withBox = df.withColumn("geom_bbox", expr("ST_Box2D(geom)"))
+    
withBox.coalesce(1).write.partitionBy("region").format("geoparquet").save(box2dDir)
+    val box2dDf = sparkSession.read.format("geoparquet").load(box2dDir)
+    val box2dMetaMap = readGeoParquetMetaDataMap(box2dDir)
+    (box2dDf, box2dDir, box2dMetaMap)
+  }
+
+  private def verifyBox2DFilter(
+      box2dDf: DataFrame,
+      box2dMetaMap: Map[Int, Seq[GeoParquetMetaData]],
+      condition: String,
+      expectedPreservedRegions: Seq[Int]): Unit = {
+    val dfFiltered = box2dDf.where(condition)
+    val pushed = getPushedDownSpatialFilter(dfFiltered)
+    assert(pushed.isDefined, s"Expected filter push-down for: $condition")
+    val preserved = box2dMetaMap
+      .filter { case (_, metaDataList) =>
+        metaDataList.exists(metadata => pushed.get.evaluate(metadata.columns))
+      }

Review Comment:
   These new Box2D pushdown tests validate that a filter is recognized and that 
the pushed-down filter prunes the expected regions via metadata evaluation, but 
they do not assert query-result correctness (unlike `testFilter`, which 
compares results against the non-GeoParquet DF). Adding an `expectedResult vs 
actualResult` assertion for the Box2D conditions would help catch cases where 
the pushdown is applied but produces incorrect results due to overly aggressive 
pruning.



##########
spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/optimization/SpatialFilterPushDownForGeoParquet.scala:
##########
@@ -42,10 +43,12 @@ import 
org.apache.spark.sql.execution.datasources.PushableColumnBase
 import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormatBase
 import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter
 import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.AndFilter
+import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.Box2DLeafFilter
 import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.LeafFilter
 import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetSpatialFilter.OrFilter
-import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
-import org.apache.spark.sql.sedona_sql.expressions.{ST_AsEWKT, ST_Buffer, 
ST_Contains, ST_CoveredBy, ST_Covers, ST_Crosses, ST_DWithin, ST_Distance, 
ST_DistanceSphere, ST_DistanceSpheroid, ST_Equals, ST_Intersects, 
ST_OrderingEquals, ST_Overlaps, ST_Touches, ST_Within}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.sedona_sql.UDT.{Box2DUDT, GeometryUDT}

Review Comment:
   `Box2DUDT` is imported here but not used in this file. Please remove the 
unused import to avoid build/style failures under stricter compiler/linter 
settings.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to