stevenzwu commented on code in PR #12667:
URL: https://github.com/apache/iceberg/pull/12667#discussion_r2357525445


##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialBound.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bound (minimum or maximum) for Iceberg tables.
+ *
+ * <p>According to the <a 
href="https://iceberg.apache.org/spec/#bound-serialization";>Bound
+ * serialization section of Iceberg Table spec</a>, geospatial bounds are 
serialized differently
+ * from the regular WKB representation. Geometry and geography bounds are 
single point encoded as a
+ * concatenation of 8-byte little-endian IEEE 754 coordinate values in the 
order X, Y, Z (optional),
+ * M (optional).
+ *
+ * <p>The encoding varies based on which coordinates are present:
+ *
+ * <ul>
+ *   <li>x:y (2 doubles) when both z and m are unset
+ *   <li>x:y:z (3 doubles) when only m is unset
+ *   <li>x:y:NaN:m (4 doubles) when only z is unset
+ *   <li>x:y:z:m (4 doubles) when all coordinates are set
+ * </ul>
+ *
+ * <p>This class represents a lower or upper geospatial bound and handles 
serialization and
+ * deserialization of these bounds to/from byte arrays, conforming to the 
Iceberg specification.
+ */
+public class GeospatialBound implements Serializable, 
Comparable<GeospatialBound> {
+  /**
+   * Parses a geospatial bound from a byte buffer according to Iceberg spec.
+   *
+   * <p>Based on the buffer size, this method determines which coordinates are 
present: - 16 bytes
+   * (2 doubles): x and y only - 24 bytes (3 doubles): x, y, and z - 32 bytes 
(4 doubles): x, y, z
+   * (might be NaN), and m
+   *
+   * <p>The ordinates are encoded as 8-byte little-endian IEEE 754 values.
+   *
+   * @param buffer the ByteBuffer containing the serialized geospatial bound
+   * @return a GeospatialBound object representing the parsed bound
+   * @throws IllegalArgumentException if the buffer has an invalid size
+   */
+  public static GeospatialBound fromByteBuffer(ByteBuffer buffer) {
+    // Save original position and byte order to restore them later
+    int originalPosition = buffer.position();
+    ByteOrder originalOrder = buffer.order();
+
+    try {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+      int size = buffer.remaining();
+
+      if (size == 2 * Double.BYTES) {
+        // x:y format (2 doubles)
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        return createXY(coordX, coordY);
+      } else if (size == 3 * Double.BYTES) {
+        // x:y:z format (3 doubles)
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        double coordZ = buffer.getDouble();
+        return createXYZ(coordX, coordY, coordZ);
+      } else if (size == 4 * Double.BYTES) {
+        // x:y:z:m format (4 doubles) - z might be NaN
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        double coordZ = buffer.getDouble();
+        double coordM = buffer.getDouble();
+        return new GeospatialBound(coordX, coordY, coordZ, coordM);
+      } else {
+        throw new IllegalArgumentException(
+            "Invalid buffer size for GeospatialBound: expected 16, 24, or 32 
bytes, got " + size);

Review Comment:
   nit: Iceberg style typically doesn't use class name in error msg. maybe
   ```
   Invalid geo spatial bound buffer size: %d. Valid sizes are 16, 24, or 32 
bytes.
   ```



##########
api/src/main/java/org/apache/iceberg/geospatial/BoundingBox.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bounding box composed of minimum and maximum bounds.
+ *
+ * <p>A bounding box (also called a Minimum Bounding Rectangle or MBR) is 
defined by two points: the
+ * minimum and maximum coordinates that define the box's corners. This 
provides a simple
+ * approximation of a more complex geometry for efficient filtering and data 
skipping.
+ */
+public class BoundingBox {
+  /**
+   * Create a {@link BoundingBox} object from buffers containing min and max 
bounds
+   *
+   * @param min the serialized minimum bound
+   * @param max the serialized maximum bound
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffers(ByteBuffer min, ByteBuffer max) {

Review Comment:
   do we envision of a scenario of using this API?



##########
api/src/main/java/org/apache/iceberg/geospatial/BoundingBox.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bounding box composed of minimum and maximum bounds.
+ *
+ * <p>A bounding box (also called a Minimum Bounding Rectangle or MBR) is 
defined by two points: the
+ * minimum and maximum coordinates that define the box's corners. This 
provides a simple
+ * approximation of a more complex geometry for efficient filtering and data 
skipping.
+ */
+public class BoundingBox {
+  /**
+   * Create a {@link BoundingBox} object from buffers containing min and max 
bounds
+   *
+   * @param min the serialized minimum bound
+   * @param max the serialized maximum bound
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffers(ByteBuffer min, ByteBuffer max) {
+    return new BoundingBox(
+        GeospatialBound.fromByteBuffer(min), 
GeospatialBound.fromByteBuffer(max));
+  }
+
+  /**
+   * Deserialize a byte buffer as a {@link BoundingBox} object
+   *
+   * @param buffer the serialized bounding box
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffer(ByteBuffer buffer) {
+    int originalPosition = buffer.position();

Review Comment:
   what is the reason to restore the original position and byte order? I don't 
see we usually do that in Iceberg for deserialization.



##########
api/src/main/java/org/apache/iceberg/geospatial/BoundingBox.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bounding box composed of minimum and maximum bounds.
+ *
+ * <p>A bounding box (also called a Minimum Bounding Rectangle or MBR) is 
defined by two points: the
+ * minimum and maximum coordinates that define the box's corners. This 
provides a simple
+ * approximation of a more complex geometry for efficient filtering and data 
skipping.
+ */
+public class BoundingBox {
+  /**
+   * Create a {@link BoundingBox} object from buffers containing min and max 
bounds
+   *
+   * @param min the serialized minimum bound
+   * @param max the serialized maximum bound
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffers(ByteBuffer min, ByteBuffer max) {
+    return new BoundingBox(
+        GeospatialBound.fromByteBuffer(min), 
GeospatialBound.fromByteBuffer(max));
+  }
+
+  /**
+   * Deserialize a byte buffer as a {@link BoundingBox} object
+   *
+   * @param buffer the serialized bounding box
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffer(ByteBuffer buffer) {
+    int originalPosition = buffer.position();
+    ByteOrder originalOrder = buffer.order();
+
+    try {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+      int minLen = buffer.getInt();
+      ByteBuffer min = buffer.slice();
+      min.limit(minLen);
+      buffer.position(buffer.position() + minLen);
+
+      int maxLen = buffer.getInt();
+      ByteBuffer max = buffer.slice();
+      max.limit(maxLen);
+
+      return fromByteBuffers(min, max);
+    } finally {
+      // Restore original position and byte order
+      buffer.position(originalPosition);
+      buffer.order(originalOrder);
+    }
+  }
+
+  /**
+   * Create an empty bounding box
+   *
+   * @return an empty bounding box
+   */
+  public static BoundingBox empty() {

Review Comment:
   we don't provide setter methods for this class. Is there any value to create 
an empty bounding box object?



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialPredicateEvaluators.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class GeospatialPredicateEvaluators {
+  private GeospatialPredicateEvaluators() {}
+
+  public interface GeospatialPredicateEvaluator {
+    /**
+     * Test whether this bounding box intersects with another.
+     *
+     * @param bbox1 the first bounding box
+     * @param bbox2 the second bounding box
+     * @return true if this box intersects the other box
+     */
+    boolean intersects(BoundingBox bbox1, BoundingBox bbox2);
+  }
+
+  public static GeospatialPredicateEvaluator create(Type type) {
+    switch (type.typeId()) {
+      case GEOMETRY:
+        return new GeometryEvaluator();
+      case GEOGRAPHY:
+        return new GeographyEvaluator();
+      default:
+        throw new UnsupportedOperationException("Unsupported type for 
BoundingBox: " + type);
+    }
+  }
+
+  static class GeometryEvaluator implements GeospatialPredicateEvaluator {
+    @Override
+    public boolean intersects(BoundingBox bbox1, BoundingBox bbox2) {
+      return intersectsWithWrapAround(bbox1, bbox2);
+    }
+
+    /**
+     * Check if two bounding boxes intersect, taking wrap-around into account.
+     *
+     * <p>Wraparound (or antimeridian crossing) occurs when a geography 
crosses the 180°/-180°
+     * longitude line on a map. In these cases, the minimum X value is greater 
than the maximum X
+     * value (xmin > xmax). This represents a bounding box that wraps around 
the globe.
+     *
+     * <p>For example, a bounding box with xmin=170° and xmax=-170° represents 
an area that spans
+     * from 170° east to 190° east (or equivalently, -170° west). This is 
important for geometries
+     * that cross the antimeridian, like a path from Japan to Alaska.
+     *
+     * <p>When xmin > xmax, a point matches if its X coordinate is either X ≥ 
xmin OR X ≤ xmax,
+     * rather than the usual X ≥ xmin AND X ≤ xmax. In geographic terms, if 
the westernmost
+     * longitude is greater than the easternmost longitude, this indicates an 
antimeridian crossing.
+     *
+     * <p>The Iceberg specification does not explicitly rule out the use of 
wrap-around in bounding
+     * boxes for geometry types, so we handle wrap-around for both geography 
and geometry bounding
+     * boxes.
+     *
+     * @param bbox1 the first bounding box
+     * @param bbox2 the second bounding box
+     * @return true if the bounding boxes intersect
+     */
+    static boolean intersectsWithWrapAround(BoundingBox bbox1, BoundingBox 
bbox2) {
+      // Let's check y first, and if y does not intersect, we can return false
+      if (bbox1.min().y() > bbox2.max().y() || bbox1.max().y() < 
bbox2.min().y()) {
+        return false;
+      }
+
+      // Now check x, need to take wrap-around into account
+      if (bbox1.min().x() <= bbox1.max().x() && bbox2.min().x() <= 
bbox2.max().x()) {
+        // No wrap-around
+        return bbox1.min().x() <= bbox2.max().x() && bbox1.max().x() >= 
bbox2.min().x();
+      } else if (bbox1.min().x() > bbox1.max().x() && bbox2.min().x() <= 
bbox2.max().x()) {
+        // bbox1 wraps around the antimeridian, bbox2 does not
+        return bbox1.min().x() <= bbox2.max().x() || bbox1.max().x() >= 
bbox2.min().x();
+      } else if (bbox1.min().x() <= bbox1.max().x() && bbox2.min().x() > 
bbox2.max().x()) {
+        // bbox2 wraps around the antimeridian, bbox1 does not
+        return intersectsWithWrapAround(bbox2, bbox1);

Review Comment:
   nit: instead of recursion, can we just replicate the similar logic from line 
90?
   ```
   return bbox2.min().x() <= bbox1.max().x() || bbox2.max().x() >= 
bbox1.min().x();
   ```



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialPredicateEvaluators.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class GeospatialPredicateEvaluators {
+  private GeospatialPredicateEvaluators() {}
+
+  public interface GeospatialPredicateEvaluator {
+    /**
+     * Test whether this bounding box intersects with another.
+     *
+     * @param bbox1 the first bounding box
+     * @param bbox2 the second bounding box
+     * @return true if this box intersects the other box
+     */
+    boolean intersects(GeospatialBoundingBox bbox1, GeospatialBoundingBox 
bbox2);
+  }
+
+  public static GeospatialPredicateEvaluator create(Type type) {
+    switch (type.typeId()) {
+      case GEOMETRY:
+        return new GeometryEvaluator();
+      case GEOGRAPHY:
+        return new GeographyEvaluator();
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported type for GeospatialBoundingBox: " + type);
+    }
+  }
+
+  static class GeometryEvaluator implements GeospatialPredicateEvaluator {
+    @Override
+    public boolean intersects(GeospatialBoundingBox bbox1, 
GeospatialBoundingBox bbox2) {
+      return intersectsWithWrapAround(bbox1, bbox2);
+    }
+
+    static boolean intersectsWithWrapAround(
+        GeospatialBoundingBox bbox1, GeospatialBoundingBox bbox2) {
+      // Let's check y first, and if y does not intersect, we can return false
+      if (bbox1.min().y() > bbox2.max().y() || bbox1.max().y() < 
bbox2.min().y()) {
+        return false;
+      }
+
+      // Now check x, need to take wrap-around into account
+      if (bbox1.min().x() <= bbox1.max().x() && bbox2.min().x() <= 
bbox2.max().x()) {
+        // No wrap-around
+        return bbox1.min().x() <= bbox2.max().x() && bbox1.max().x() >= 
bbox2.min().x();
+      } else if (bbox1.min().x() > bbox1.max().x() && bbox2.min().x() <= 
bbox2.max().x()) {
+        // bbox1 wraps around the antimeridian, bbox2 does not
+        return bbox1.min().x() <= bbox2.max().x() || bbox1.max().x() >= 
bbox2.min().x();
+      } else if (bbox1.min().x() <= bbox1.max().x() && bbox2.min().x() > 
bbox2.max().x()) {
+        // bbox2 wraps around the antimeridian, bbox1 does not
+        return intersectsWithWrapAround(bbox2, bbox1);
+      } else {
+        // Both wrap around the antimeridian, they must intersect
+        return true;
+      }
+    }
+  }
+
+  static class GeographyEvaluator implements GeospatialPredicateEvaluator {
+    @Override
+    public boolean intersects(GeospatialBoundingBox bbox1, 
GeospatialBoundingBox bbox2) {
+      validateBoundingBox(bbox1);
+      validateBoundingBox(bbox2);
+      return GeometryEvaluator.intersectsWithWrapAround(bbox1, bbox2);

Review Comment:
   An geo newbie question, as I am ramping up to review the geo PRs. 
   
   geography bounding box intersect evaluation also only looks at X and Y 
dimensions?



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialPredicateEvaluators.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class GeospatialPredicateEvaluators {
+  private GeospatialPredicateEvaluators() {}
+
+  public interface GeospatialPredicateEvaluator {
+    /**
+     * Test whether this bounding box intersects with another.

Review Comment:
   nit: "this/another" is a bit unclear. maybe `whether the two bound boxes 
intersect`?



##########
api/src/main/java/org/apache/iceberg/types/Types.java:
##########
@@ -599,7 +598,7 @@ public TypeID typeId() {
     }
 
     public String crs() {
-      return crs;
+      return crs != null ? crs : DEFAULT_CRS;

Review Comment:
   nit: why don't we handle the null param in the constructor?



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialBound.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bound (minimum or maximum) for Iceberg tables.
+ *
+ * <p>According to the <a 
href="https://iceberg.apache.org/spec/#bound-serialization";>Bound
+ * serialization section of Iceberg Table spec</a>, geospatial bounds are 
serialized differently
+ * from the regular WKB representation. Geometry and geography bounds are 
single point encoded as a
+ * concatenation of 8-byte little-endian IEEE 754 coordinate values in the 
order X, Y, Z (optional),
+ * M (optional).
+ *
+ * <p>The encoding varies based on which coordinates are present:
+ *
+ * <ul>
+ *   <li>x:y (2 doubles) when both z and m are unset
+ *   <li>x:y:z (3 doubles) when only m is unset
+ *   <li>x:y:NaN:m (4 doubles) when only z is unset
+ *   <li>x:y:z:m (4 doubles) when all coordinates are set
+ * </ul>
+ *
+ * <p>This class represents a lower or upper geospatial bound and handles 
serialization and
+ * deserialization of these bounds to/from byte arrays, conforming to the 
Iceberg specification.
+ */
+public class GeospatialBound implements Serializable, 
Comparable<GeospatialBound> {
+  /**
+   * Parses a geospatial bound from a byte buffer according to Iceberg spec.
+   *
+   * <p>Based on the buffer size, this method determines which coordinates are 
present: - 16 bytes
+   * (2 doubles): x and y only - 24 bytes (3 doubles): x, y, and z - 32 bytes 
(4 doubles): x, y, z
+   * (might be NaN), and m
+   *
+   * <p>The ordinates are encoded as 8-byte little-endian IEEE 754 values.
+   *
+   * @param buffer the ByteBuffer containing the serialized geospatial bound
+   * @return a GeospatialBound object representing the parsed bound
+   * @throws IllegalArgumentException if the buffer has an invalid size
+   */
+  public static GeospatialBound fromByteBuffer(ByteBuffer buffer) {
+    // Save original position and byte order to restore them later

Review Comment:
   what's the reason that we need to restore the position and original order?



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialBound.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bound (minimum or maximum) for Iceberg tables.
+ *
+ * <p>According to the <a 
href="https://iceberg.apache.org/spec/#bound-serialization";>Bound
+ * serialization section of Iceberg Table spec</a>, geospatial bounds are 
serialized differently
+ * from the regular WKB representation. Geometry and geography bounds are 
single point encoded as a
+ * concatenation of 8-byte little-endian IEEE 754 coordinate values in the 
order X, Y, Z (optional),
+ * M (optional).
+ *
+ * <p>The encoding varies based on which coordinates are present:
+ *
+ * <ul>
+ *   <li>x:y (2 doubles) when both z and m are unset
+ *   <li>x:y:z (3 doubles) when only m is unset
+ *   <li>x:y:NaN:m (4 doubles) when only z is unset
+ *   <li>x:y:z:m (4 doubles) when all coordinates are set
+ * </ul>
+ *
+ * <p>This class represents a lower or upper geospatial bound and handles 
serialization and
+ * deserialization of these bounds to/from byte arrays, conforming to the 
Iceberg specification.
+ */
+public class GeospatialBound {
+  /**
+   * Parses a geospatial bound from a byte buffer according to Iceberg spec.
+   *
+   * <p>Based on the buffer size, this method determines which coordinates are 
present: - 16 bytes
+   * (2 doubles): x and y only - 24 bytes (3 doubles): x, y, and z - 32 bytes 
(4 doubles): x, y, z
+   * (might be NaN), and m
+   *
+   * <p>The ordinates are encoded as 8-byte little-endian IEEE 754 values.
+   *
+   * @param buffer the ByteBuffer containing the serialized geospatial bound
+   * @return a GeospatialBound object representing the parsed bound
+   * @throws IllegalArgumentException if the buffer has an invalid size
+   */
+  public static GeospatialBound fromByteBuffer(ByteBuffer buffer) {
+    // Save original position and byte order to restore them later
+    int originalPosition = buffer.position();
+    ByteOrder originalOrder = buffer.order();
+
+    try {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+      int size = buffer.remaining();
+
+      if (size == 2 * Double.BYTES) {
+        // x:y format (2 doubles)
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        return createXY(coordX, coordY);
+      } else if (size == 3 * Double.BYTES) {
+        // x:y:z format (3 doubles)
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        double coordZ = buffer.getDouble();
+        return createXYZ(coordX, coordY, coordZ);
+      } else if (size == 4 * Double.BYTES) {
+        // x:y:z:m format (4 doubles) - z might be NaN
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        double coordZ = buffer.getDouble();
+        double coordM = buffer.getDouble();
+        return new GeospatialBound(coordX, coordY, coordZ, coordM);
+      } else {
+        throw new IllegalArgumentException(
+            "Invalid buffer size for GeospatialBound: expected 16, 24, or 32 
bytes, got " + size);
+      }
+    } finally {
+      // Restore original position and byte order
+      buffer.position(originalPosition);
+      buffer.order(originalOrder);
+    }
+  }
+
+  /**
+   * Parses a geospatial bound from a byte array according to Iceberg spec.
+   *
+   * @param bytes the byte array containing the serialized geospatial bound
+   * @return a GeospatialBound object representing the parsed bound
+   * @throws IllegalArgumentException if the byte array has an invalid length
+   */
+  public static GeospatialBound fromByteArray(byte[] bytes) {
+    int length = bytes.length;
+    if (length != 2 * Double.BYTES && length != 3 * Double.BYTES && length != 
4 * Double.BYTES) {
+      throw new IllegalArgumentException(
+          "Invalid byte array length for GeospatialBound: expected 16, 24, or 
32 bytes, got "
+              + length);
+    }
+
+    return fromByteBuffer(ByteBuffer.wrap(bytes));
+  }
+
+  /**
+   * Creates a GeospatialBound with X and Y coordinates only.
+   *
+   * @param x the X coordinate (longitude/easting)
+   * @param y the Y coordinate (latitude/northing)
+   * @return a GeospatialBound with XY coordinates
+   */
+  @SuppressWarnings("ParameterName")
+  public static GeospatialBound createXY(double x, double y) {
+    return new GeospatialBound(x, y, Double.NaN, Double.NaN);
+  }
+
+  /**
+   * Creates a GeospatialBound with X, Y, and Z coordinates, with no M value.
+   *
+   * @param x the X coordinate (longitude/easting)
+   * @param y the Y coordinate (latitude/northing)
+   * @param z the Z coordinate (elevation)
+   * @return a GeospatialBound with XYZ coordinates
+   */
+  @SuppressWarnings("ParameterName")
+  public static GeospatialBound createXYZ(double x, double y, double z) {
+    return new GeospatialBound(x, y, z, Double.NaN);
+  }
+
+  /**
+   * Creates a GeospatialBound with X, Y, Z, and M coordinates.
+   *
+   * @param x the X coordinate (longitude/easting)
+   * @param y the Y coordinate (latitude/northing)
+   * @param z the Z coordinate (elevation)
+   * @param m the M value (measure)
+   * @return a GeospatialBound with XYZM coordinates
+   */
+  @SuppressWarnings("ParameterName")
+  public static GeospatialBound createXYZM(double x, double y, double z, 
double m) {
+    return new GeospatialBound(x, y, z, m);
+  }
+
+  /**
+   * Creates a GeospatialBound with X, Y, and M values, with no Z coordinate.
+   *
+   * @param x the X coordinate (longitude/easting)
+   * @param y the Y coordinate (latitude/northing)
+   * @param m the M value (measure)
+   * @return a GeospatialBound with XYM coordinates
+   */
+  @SuppressWarnings("ParameterName")
+  public static GeospatialBound createXYM(double x, double y, double m) {
+    return new GeospatialBound(x, y, Double.NaN, m);
+  }
+
+  @SuppressWarnings("MemberName")
+  private final double x;
+
+  @SuppressWarnings("MemberName")
+  private final double y;
+
+  @SuppressWarnings("MemberName")
+  private final double z;
+
+  @SuppressWarnings("MemberName")
+  private final double m;
+
+  /** Private constructor - use factory methods instead. */
+  @SuppressWarnings("ParameterName")
+  private GeospatialBound(double x, double y, double z, double m) {
+    this.x = x;
+    this.y = y;
+    this.z = z;
+    this.m = m;
+  }
+
+  /**
+   * Get the X coordinate (longitude/easting).
+   *
+   * @return X coordinate value
+   */
+  @SuppressWarnings("MethodName")
+  public double x() {
+    return x;
+  }
+
+  /**
+   * Get the Y coordinate (latitude/northing).
+   *
+   * @return Y coordinate value
+   */
+  @SuppressWarnings("MethodName")
+  public double y() {
+    return y;
+  }
+
+  /**
+   * Get the Z coordinate (typically elevation).
+   *
+   * @return Z coordinate value or NaN if not set
+   */
+  @SuppressWarnings("MethodName")
+  public double z() {
+    return z;
+  }
+
+  /**
+   * Get the M value (measure).
+   *
+   * @return M value or NaN if not set
+   */
+  @SuppressWarnings("MethodName")
+  public double m() {
+    return m;
+  }
+
+  /**
+   * Check if this bound has a defined Z coordinate.
+   *
+   * @return true if Z is not NaN
+   */
+  public boolean hasZ() {
+    return !Double.isNaN(z);
+  }
+
+  /**
+   * Check if this bound has a defined M value.
+   *
+   * @return true if M is not NaN
+   */
+  public boolean hasM() {
+    return !Double.isNaN(m);
+  }
+
+  /**
+   * Serializes this geospatial bound to a byte buffer according to Iceberg 
spec.
+   *
+   * <p>Following the Iceberg spec, the bound is serialized based on which 
coordinates are set: -
+   * x:y (2 doubles) when both z and m are unset - x:y:z (3 doubles) when only 
m is unset -
+   * x:y:NaN:m (4 doubles) when only z is unset - x:y:z:m (4 doubles) when all 
coordinates are set
+   *
+   * @return A ByteBuffer containing the serialized geospatial bound
+   */
+  public ByteBuffer toByteBuffer() {
+    // Calculate size based on which coordinates are present
+    int size;
+    if (!hasZ() && !hasM()) {
+      // Just x and y
+      size = 2 * Double.BYTES;
+    } else if (hasZ() && !hasM()) {
+      // x, y, and z (no m)
+      size = 3 * Double.BYTES;
+    } else {
+      // x, y, z (or NaN), and m
+      size = 4 * Double.BYTES;
+    }
+
+    ByteBuffer buffer = 
ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);
+    buffer.putDouble(x);
+    buffer.putDouble(y);
+
+    if (hasZ() || hasM()) {
+      // If we have z or m or both, we need to include z (could be NaN)
+      buffer.putDouble(z);
+    }
+
+    if (hasM()) {
+      // If we have m, include it
+      buffer.putDouble(m);
+    }
+
+    buffer.flip();
+    return buffer;
+  }
+
+  @Override
+  public String toString() {
+    return "GeospatialBound(" + simpleString() + ")";
+  }
+
+  public String simpleString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("x=").append(x).append(", y=").append(y);
+
+    if (hasZ()) {
+      sb.append(", z=").append(z);
+    }
+
+    if (hasM()) {
+      sb.append(", m=").append(m);
+    }
+
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    } else if (!(other instanceof GeospatialBound)) {
+      return false;
+    }
+
+    GeospatialBound that = (GeospatialBound) other;
+    return Double.compare(that.x, x) == 0
+        && Double.compare(that.y, y) == 0
+        && Double.compare(that.z, z) == 0
+        && Double.compare(that.m, m) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(GeospatialBound.class, x, y, z, m);

Review Comment:
   I saw we are including class object for type class hash code, which make 
sense. But I am not sure it is necessary here.



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialBound.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Comparator;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bound (minimum or maximum) for Iceberg tables.
+ *
+ * <p>According to the <a 
href="https://iceberg.apache.org/spec/#bound-serialization";>Bound
+ * serialization section of Iceberg Table spec</a>, geospatial bounds are 
serialized differently
+ * from the regular WKB representation. Geometry and geography bounds are 
single point encoded as a
+ * concatenation of 8-byte little-endian IEEE 754 coordinate values in the 
order X, Y, Z (optional),
+ * M (optional).
+ *
+ * <p>The encoding varies based on which coordinates are present:
+ *
+ * <ul>
+ *   <li>x:y (2 doubles) when both z and m are unset
+ *   <li>x:y:z (3 doubles) when only m is unset
+ *   <li>x:y:NaN:m (4 doubles) when only z is unset
+ *   <li>x:y:z:m (4 doubles) when all coordinates are set
+ * </ul>
+ *
+ * <p>This class represents a lower or upper geospatial bound and handles 
serialization and
+ * deserialization of these bounds to/from byte arrays, conforming to the 
Iceberg specification.
+ */
+public class GeospatialBound implements Serializable, 
Comparable<GeospatialBound> {
+  /**
+   * Parses a geospatial bound from a byte buffer according to Iceberg spec.
+   *
+   * <p>Based on the buffer size, this method determines which coordinates are 
present: - 16 bytes
+   * (2 doubles): x and y only - 24 bytes (3 doubles): x, y, and z - 32 bytes 
(4 doubles): x, y, z
+   * (might be NaN), and m
+   *
+   * <p>The ordinates are encoded as 8-byte little-endian IEEE 754 values.
+   *
+   * @param buffer the ByteBuffer containing the serialized geospatial bound
+   * @return a GeospatialBound object representing the parsed bound
+   * @throws IllegalArgumentException if the buffer has an invalid size
+   */
+  public static GeospatialBound fromByteBuffer(ByteBuffer buffer) {
+    // Save original position and byte order to restore them later
+    int originalPosition = buffer.position();
+    ByteOrder originalOrder = buffer.order();
+
+    try {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+      int size = buffer.remaining();
+
+      if (size == 2 * Double.BYTES) {
+        // x:y format (2 doubles)
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        return createXY(coordX, coordY);
+      } else if (size == 3 * Double.BYTES) {
+        // x:y:z format (3 doubles)
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        double coordZ = buffer.getDouble();
+        return createXYZ(coordX, coordY, coordZ);
+      } else if (size == 4 * Double.BYTES) {
+        // x:y:z:m format (4 doubles) - z might be NaN
+        double coordX = buffer.getDouble();
+        double coordY = buffer.getDouble();
+        double coordZ = buffer.getDouble();
+        double coordM = buffer.getDouble();
+        return new GeospatialBound(coordX, coordY, coordZ, coordM);
+      } else {
+        throw new IllegalArgumentException(
+            "Invalid buffer size for GeospatialBound: expected 16, 24, or 32 
bytes, got " + size);
+      }
+    } finally {
+      // Restore original position and byte order
+      buffer.position(originalPosition);
+      buffer.order(originalOrder);
+    }
+  }
+
+  /**
+   * Parses a geospatial bound from a byte array according to Iceberg spec.
+   *
+   * @param bytes the byte array containing the serialized geospatial bound
+   * @return a GeospatialBound object representing the parsed bound
+   * @throws IllegalArgumentException if the byte array has an invalid length
+   */
+  public static GeospatialBound fromByteArray(byte[] bytes) {
+    int length = bytes.length;
+    if (length != 2 * Double.BYTES && length != 3 * Double.BYTES && length != 
4 * Double.BYTES) {

Review Comment:
   nit: Iceberg style uses `Preconditions.checkArgument` usually



##########
api/src/main/java/org/apache/iceberg/geospatial/BoundingBox.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bounding box composed of minimum and maximum bounds.
+ *
+ * <p>A bounding box (also called a Minimum Bounding Rectangle or MBR) is 
defined by two points: the
+ * minimum and maximum coordinates that define the box's corners. This 
provides a simple
+ * approximation of a more complex geometry for efficient filtering and data 
skipping.
+ */
+public class BoundingBox {
+  /**
+   * Create a {@link BoundingBox} object from buffers containing min and max 
bounds
+   *
+   * @param min the serialized minimum bound
+   * @param max the serialized maximum bound
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffers(ByteBuffer min, ByteBuffer max) {
+    return new BoundingBox(
+        GeospatialBound.fromByteBuffer(min), 
GeospatialBound.fromByteBuffer(max));
+  }
+
+  /**
+   * Deserialize a byte buffer as a {@link BoundingBox} object
+   *
+   * @param buffer the serialized bounding box
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffer(ByteBuffer buffer) {
+    int originalPosition = buffer.position();
+    ByteOrder originalOrder = buffer.order();
+
+    try {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+      int minLen = buffer.getInt();
+      ByteBuffer min = buffer.slice();
+      min.limit(minLen);
+      buffer.position(buffer.position() + minLen);
+
+      int maxLen = buffer.getInt();
+      ByteBuffer max = buffer.slice();
+      max.limit(maxLen);
+
+      return fromByteBuffers(min, max);
+    } finally {
+      // Restore original position and byte order
+      buffer.position(originalPosition);
+      buffer.order(originalOrder);
+    }
+  }
+
+  /**
+   * Create an empty bounding box
+   *
+   * @return an empty bounding box
+   */
+  public static BoundingBox empty() {
+    return new BoundingBox(
+        GeospatialBound.createXY(Double.NaN, Double.NaN),
+        GeospatialBound.createXY(Double.NaN, Double.NaN));
+  }
+
+  public BoundingBox(GeospatialBound min, GeospatialBound max) {
+    this.min = min;
+    this.max = max;
+  }
+
+  private final GeospatialBound min;
+  private final GeospatialBound max;
+
+  /**
+   * Get the minimum corner of the bounding box.
+   *
+   * @return the minimum bound
+   */
+  public GeospatialBound min() {
+    return min;
+  }
+
+  /**
+   * Get the maximum corner of the bounding box.
+   *
+   * @return the maximum bound
+   */
+  public GeospatialBound max() {
+    return max;
+  }
+
+  /**
+   * Serializes this bounding box to a byte buffer. The serialized byte buffer 
could be deserialized
+   * using {@link #fromByteBuffer(ByteBuffer)}.
+   *
+   * @return a byte buffer containing the serialized bounding box
+   */
+  public ByteBuffer toByteBuffer() {
+    ByteBuffer minBuffer = min.toByteBuffer();
+    ByteBuffer maxBuffer = max.toByteBuffer();
+
+    int totalSize = Integer.BYTES + minBuffer.remaining() + Integer.BYTES + 
maxBuffer.remaining();

Review Comment:
   should `minBuffer.remaining()` be `minBuffer.limit()`?



##########
api/src/main/java/org/apache/iceberg/geospatial/BoundingBox.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Objects;
+
+/**
+ * Represents a geospatial bounding box composed of minimum and maximum bounds.
+ *
+ * <p>A bounding box (also called a Minimum Bounding Rectangle or MBR) is 
defined by two points: the
+ * minimum and maximum coordinates that define the box's corners. This 
provides a simple
+ * approximation of a more complex geometry for efficient filtering and data 
skipping.
+ */
+public class BoundingBox {
+  /**
+   * Create a {@link BoundingBox} object from buffers containing min and max 
bounds
+   *
+   * @param min the serialized minimum bound
+   * @param max the serialized maximum bound
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffers(ByteBuffer min, ByteBuffer max) {
+    return new BoundingBox(
+        GeospatialBound.fromByteBuffer(min), 
GeospatialBound.fromByteBuffer(max));
+  }
+
+  /**
+   * Deserialize a byte buffer as a {@link BoundingBox} object
+   *
+   * @param buffer the serialized bounding box
+   * @return a BoundingBox instance
+   */
+  public static BoundingBox fromByteBuffer(ByteBuffer buffer) {
+    int originalPosition = buffer.position();
+    ByteOrder originalOrder = buffer.order();
+
+    try {
+      buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+      int minLen = buffer.getInt();
+      ByteBuffer min = buffer.slice();
+      min.limit(minLen);
+      buffer.position(buffer.position() + minLen);
+
+      int maxLen = buffer.getInt();
+      ByteBuffer max = buffer.slice();
+      max.limit(maxLen);
+
+      return fromByteBuffers(min, max);
+    } finally {
+      // Restore original position and byte order
+      buffer.position(originalPosition);
+      buffer.order(originalOrder);
+    }
+  }
+
+  /**
+   * Create an empty bounding box
+   *
+   * @return an empty bounding box
+   */
+  public static BoundingBox empty() {
+    return new BoundingBox(
+        GeospatialBound.createXY(Double.NaN, Double.NaN),
+        GeospatialBound.createXY(Double.NaN, Double.NaN));
+  }
+
+  public BoundingBox(GeospatialBound min, GeospatialBound max) {
+    this.min = min;
+    this.max = max;
+  }
+
+  private final GeospatialBound min;
+  private final GeospatialBound max;
+
+  /**
+   * Get the minimum corner of the bounding box.
+   *
+   * @return the minimum bound
+   */
+  public GeospatialBound min() {
+    return min;
+  }
+
+  /**
+   * Get the maximum corner of the bounding box.
+   *
+   * @return the maximum bound
+   */
+  public GeospatialBound max() {
+    return max;
+  }
+
+  /**
+   * Serializes this bounding box to a byte buffer. The serialized byte buffer 
could be deserialized
+   * using {@link #fromByteBuffer(ByteBuffer)}.
+   *
+   * @return a byte buffer containing the serialized bounding box
+   */
+  public ByteBuffer toByteBuffer() {
+    ByteBuffer minBuffer = min.toByteBuffer();
+    ByteBuffer maxBuffer = max.toByteBuffer();
+
+    int totalSize = Integer.BYTES + minBuffer.remaining() + Integer.BYTES + 
maxBuffer.remaining();
+    ByteBuffer buffer = 
ByteBuffer.allocate(totalSize).order(ByteOrder.LITTLE_ENDIAN);
+
+    buffer.putInt(minBuffer.remaining());
+    buffer.put(minBuffer);
+    buffer.putInt(maxBuffer.remaining());
+    buffer.put(maxBuffer);
+    buffer.flip();
+    return buffer;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    } else if (!(other instanceof BoundingBox)) {
+      return false;
+    }
+
+    BoundingBox that = (BoundingBox) other;
+    return Objects.equals(min, that.min) && Objects.equals(max, that.max);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(min, max);
+  }
+
+  @Override
+  public String toString() {
+    return "BoundingBox{min={" + min.simpleString() + "}, max={" + 
max.simpleString() + "}}";

Review Comment:
   maybe use the guava `MoreObjects` as we do in other places.
   ```
       return MoreObjects.toStringHelper(BoundingBox.class)
           .add("min", min)
           .add("max", max)
           .toString();
   ```



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialPredicateEvaluators.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class GeospatialPredicateEvaluators {
+  private GeospatialPredicateEvaluators() {}
+
+  public interface GeospatialPredicateEvaluator {
+    /**
+     * Test whether this bounding box intersects with another.
+     *
+     * @param bbox1 the first bounding box
+     * @param bbox2 the second bounding box
+     * @return true if this box intersects the other box
+     */
+    boolean intersects(BoundingBox bbox1, BoundingBox bbox2);
+  }
+
+  public static GeospatialPredicateEvaluator create(Type type) {

Review Comment:
   we can have two overloaded create methods with more specific types: 
GEOMETRY, GEOGRAPHY. 
   
   They can also return more specific evaluator types, which is generally 
preferred..



##########
api/src/main/java/org/apache/iceberg/geospatial/GeospatialPredicateEvaluators.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.geospatial;
+
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class GeospatialPredicateEvaluators {
+  private GeospatialPredicateEvaluators() {}
+
+  public interface GeospatialPredicateEvaluator {
+    /**
+     * Test whether this bounding box intersects with another.
+     *
+     * @param bbox1 the first bounding box
+     * @param bbox2 the second bounding box
+     * @return true if this box intersects the other box
+     */
+    boolean intersects(BoundingBox bbox1, BoundingBox bbox2);
+  }
+
+  public static GeospatialPredicateEvaluator create(Type type) {
+    switch (type.typeId()) {
+      case GEOMETRY:
+        return new GeometryEvaluator();
+      case GEOGRAPHY:
+        return new GeographyEvaluator();
+      default:
+        throw new UnsupportedOperationException("Unsupported type for 
BoundingBox: " + type);
+    }
+  }
+
+  static class GeometryEvaluator implements GeospatialPredicateEvaluator {
+    @Override
+    public boolean intersects(BoundingBox bbox1, BoundingBox bbox2) {
+      return intersectsWithWrapAround(bbox1, bbox2);
+    }
+
+    /**
+     * Check if two bounding boxes intersect, taking wrap-around into account.
+     *
+     * <p>Wraparound (or antimeridian crossing) occurs when a geography 
crosses the 180°/-180°
+     * longitude line on a map. In these cases, the minimum X value is greater 
than the maximum X
+     * value (xmin > xmax). This represents a bounding box that wraps around 
the globe.
+     *
+     * <p>For example, a bounding box with xmin=170° and xmax=-170° represents 
an area that spans
+     * from 170° east to 190° east (or equivalently, -170° west). This is 
important for geometries
+     * that cross the antimeridian, like a path from Japan to Alaska.
+     *
+     * <p>When xmin > xmax, a point matches if its X coordinate is either X ≥ 
xmin OR X ≤ xmax,
+     * rather than the usual X ≥ xmin AND X ≤ xmax. In geographic terms, if 
the westernmost
+     * longitude is greater than the easternmost longitude, this indicates an 
antimeridian crossing.
+     *
+     * <p>The Iceberg specification does not explicitly rule out the use of 
wrap-around in bounding
+     * boxes for geometry types, so we handle wrap-around for both geography 
and geometry bounding
+     * boxes.
+     *
+     * @param bbox1 the first bounding box
+     * @param bbox2 the second bounding box
+     * @return true if the bounding boxes intersect
+     */
+    static boolean intersectsWithWrapAround(BoundingBox bbox1, BoundingBox 
bbox2) {
+      // Let's check y first, and if y does not intersect, we can return false
+      if (bbox1.min().y() > bbox2.max().y() || bbox1.max().y() < 
bbox2.min().y()) {
+        return false;
+      }
+
+      // Now check x, need to take wrap-around into account
+      if (bbox1.min().x() <= bbox1.max().x() && bbox2.min().x() <= 
bbox2.max().x()) {
+        // No wrap-around
+        return bbox1.min().x() <= bbox2.max().x() && bbox1.max().x() >= 
bbox2.min().x();
+      } else if (bbox1.min().x() > bbox1.max().x() && bbox2.min().x() <= 
bbox2.max().x()) {
+        // bbox1 wraps around the antimeridian, bbox2 does not
+        return bbox1.min().x() <= bbox2.max().x() || bbox1.max().x() >= 
bbox2.min().x();
+      } else if (bbox1.min().x() <= bbox1.max().x() && bbox2.min().x() > 
bbox2.max().x()) {
+        // bbox2 wraps around the antimeridian, bbox1 does not
+        return intersectsWithWrapAround(bbox2, bbox1);
+      } else {
+        // Both wrap around the antimeridian, they must intersect
+        return true;
+      }
+    }
+  }
+
+  static class GeographyEvaluator implements GeospatialPredicateEvaluator {
+    @Override
+    public boolean intersects(BoundingBox bbox1, BoundingBox bbox2) {
+      validateBoundingBox(bbox1);
+      validateBoundingBox(bbox2);
+      return GeometryEvaluator.intersectsWithWrapAround(bbox1, bbox2);
+    }
+
+    /**
+     * For geography types, coordinates are restricted to the canonical ranges 
of [-180°, 180°] for
+     * longitude (X) and [-90°, 90°] for latitude (Y).
+     *
+     * @param bbox the bounding box to validate
+     * @throws IllegalArgumentException if the bounding box is invalid
+     */
+    private void validateBoundingBox(BoundingBox bbox) {
+      Preconditions.checkArgument(
+          bbox.min().y() >= -90 && bbox.max().y() <= 90, "Latitude out of 
range: %s", bbox);

Review Comment:
   nit: use explicit double literals, like `-90.0d`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to