This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new d1a157d1bc7c [SPARK-57083][SQL] Preserve geography SRID across 
encoders, Parquet readers, and Python conversion
d1a157d1bc7c is described below

commit d1a157d1bc7cd4242cb81fd2d6a90ff43be74c48
Author: Uros Bojanic <[email protected]>
AuthorDate: Wed May 27 18:45:11 2026 +0800

    [SPARK-57083][SQL] Preserve geography SRID across encoders, Parquet 
readers, and Python conversion
    
    ### What changes were proposed in this pull request?
    Now that `GeographyType` supports per-row SRIDs, this PR fixes several code 
paths that were silently dropping the SRID when materializing a `GeographyVal` 
from WKB and manually substituting the default value (`4326`).
    
    ### Why are the changes needed?
    Without these fixes, geographies read from Arrow, Parquet, Python, or 
constructed via `CatalystTypeConverters.convertToCatalyst` had their SRID 
rewritten to the default 4326, regardless of the column's declared SRID or the 
value's actual SRID. This produced silent data corruption for any geography 
stored with a non-default geographic SRID.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes — bug fix. Reading a Parquet column declared as 
`GeographyType(<non-default SRID>)`, materializing geographies from 
Arrow/Python, or converting Geography values via 
`CatalystTypeConverters.convertToCatalyst` now yields values whose SRID matches 
the column's declared SRID. Previously the SRID was always 4326.
    
    ### How was this patch tested?
    Updated relevant test suites and added appropriate unit test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Cursor (Claude Opus 4.7)
    
    Closes #56127 from uros-db/geography-srids.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 4a61083f22e605fa0a4c73915499514200902b79)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/sql/catalyst/util/STUtils.java    |   8 ++
 .../spark/sql/vectorized/ArrowColumnVector.java    |   4 +-
 .../sql/catalyst/CatalystTypeConverters.scala      |   2 +-
 .../sql/catalyst/CatalystTypeConvertersSuite.scala |  25 ++++
 .../spark/sql/catalyst/util/StUtilsSuite.java      |  34 ++++-
 .../datasources/parquet/WKBConverterStrategy.java  |   2 +-
 .../datasources/parquet/ParquetRowConverter.scala  |  20 +--
 .../sql/execution/python/EvaluatePython.scala      |   3 +-
 .../sql/execution/arrow/ArrowWriterSuite.scala     |   8 ++
 .../datasources/parquet/ParquetGeoSuite.scala      |  61 ++++++++-
 .../sql/execution/python/EvaluatePythonSuite.scala | 139 +++++++++++++++++++++
 11 files changed, 288 insertions(+), 18 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
index c5d57fd08fed..c9035569770c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/STUtils.java
@@ -152,6 +152,14 @@ public final class STUtils {
     return toPhysVal(Geography.fromWkb(wkb));
   }
 
+  public static GeographyVal stGeogFromWKB(byte[] wkb, int srid) {
+    // We only allow setting the SRID to geographic values.
+    if(!GeographyType.isSridSupported(srid)) {
+      throw QueryExecutionErrors.stInvalidSridValueError(srid);
+    }
+    return toPhysVal(Geography.fromWkb(wkb, srid));
+  }
+
   // ST_GeomFromWKB
   public static GeometryVal stGeomFromWKB(byte[] wkb) {
     return toPhysVal(Geometry.fromWkb(wkb));
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
index 019bc258579a..8cacfcda0b0e 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
@@ -157,9 +157,7 @@ public class ArrowColumnVector extends ColumnVector {
     int srid = getChild(0).getInt(rowId);
     byte[] bytes = getChild(1).getBinary(rowId);
     gt.assertSridAllowedForType(srid);
-    // TODO(GEO-602): Geog still does not support different SRIDs, once it 
does,
-    // we need to update this.
-    return (bytes == null) ? null : STUtils.stGeogFromWKB(bytes);
+    return (bytes == null) ? null : STUtils.stGeogFromWKB(bytes, srid);
   }
 
   @Override
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index d51007e7d336..be66f851e361 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -666,7 +666,7 @@ object CatalystTypeConverters {
     case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
     case arr: Array[Byte] => arr
     case g: org.apache.spark.sql.types.Geometry => 
STUtils.stGeomFromWKB(g.getBytes, g.getSrid)
-    case g: org.apache.spark.sql.types.Geography => 
STUtils.stGeogFromWKB(g.getBytes)
+    case g: org.apache.spark.sql.types.Geography => 
STUtils.stGeogFromWKB(g.getBytes, g.getSrid)
     case arr: Array[Char] => StringConverter.toCatalyst(arr)
     case arr: Array[_] => new GenericArrayData(arr.map(convertToCatalyst))
     case map: Map[_, _] =>
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
index b730a6c27a3b..222465d82c02 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystTypeConvertersSuite.scala
@@ -488,6 +488,18 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
     assert(STUtils.stSrid(resultVal) === 4326)
   }
 
+  test("converting Geography with non-default SRID via convertToCatalyst") {
+    // Geography supports a variety of geographic SRIDs beyond the default 
4326.
+    Seq(4267, 4269, 4612, 37001, 104030).foreach { srid =>
+      val geog = Geography.fromWKB(pointWkb, srid)
+      val result = CatalystTypeConverters.convertToCatalyst(geog)
+      assert(result.isInstanceOf[GeographyVal])
+      val resultVal = result.asInstanceOf[GeographyVal]
+      assert(java.util.Arrays.equals(STUtils.stAsBinary(resultVal, NDR), 
pointWkb))
+      assert(STUtils.stSrid(resultVal) === srid)
+    }
+  }
+
   test("convertToCatalyst null handling for geospatial types") {
     assert(CatalystTypeConverters.convertToCatalyst(null: Geometry) === null)
     assert(CatalystTypeConverters.convertToCatalyst(null: Geography) === null)
@@ -503,6 +515,19 @@ class CatalystTypeConvertersSuite extends SparkFunSuite 
with SQLHelper {
       parameters = Map("srid" -> "1"))
   }
 
+  test("convertToCatalyst with Geography with invalid SRID") {
+    // Geography only accepts geographic SRIDs (e.g. 0 and 3857 are not 
geographic).
+    Seq(0, 1, 3857).foreach { invalidSrid =>
+      val geog = Geography.fromWKB(pointWkb, invalidSrid)
+      checkError(
+        exception = intercept[SparkIllegalArgumentException] {
+          CatalystTypeConverters.convertToCatalyst(geog)
+        },
+        condition = "ST_INVALID_SRID_VALUE",
+        parameters = Map("srid" -> invalidSrid.toString))
+    }
+  }
+
   test("createToCatalystConverter for GeometryType") {
     val gt = GeometryType(0)
     val converter = CatalystTypeConverters.createToCatalystConverter(gt)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
index c3a806d897dd..7aaa3a8013c1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StUtilsSuite.java
@@ -142,12 +142,44 @@ class STUtilsSuite {
 
   // ST_GeogFromWKB
   @Test
-  void testStGeogFromWKB() {
+  void testStGeogFromWKBNoSrid() {
     GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb);
     assertNotNull(geographyVal);
     assertArrayEquals(testGeographyBytes, geographyVal.getBytes());
   }
 
+  @Test
+  void testStGeogFromWKBWithDefaultSrid() {
+    GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb, 
testGeographySrid);
+    assertNotNull(geographyVal);
+    assertArrayEquals(testGeographyBytes, geographyVal.getBytes());
+  }
+
+  @Test
+  void testStGeogFromWKBWithValidSrid() {
+    // Geography supports a variety of geographic SRIDs (not just the default 
4326).
+    for (int validGeographySrid : new int[]{4267, 4269, 4326, 4612, 37001, 
104030}) {
+      GeographyVal geographyVal = STUtils.stGeogFromWKB(testWkb, 
validGeographySrid);
+      assertNotNull(geographyVal);
+      byte[] expectedBytes = new byte[testWkb.length + sridLen];
+      byte[] geogSrid = 
ByteBuffer.allocate(sridLen).order(end).putInt(validGeographySrid).array();
+      System.arraycopy(geogSrid, 0, expectedBytes, 0, sridLen);
+      System.arraycopy(testWkb, 0, expectedBytes, sridLen, testWkb.length);
+      assertArrayEquals(expectedBytes, geographyVal.getBytes());
+    }
+  }
+
+  @Test
+  void testStGeogFromWKBWithInvalidSrid() {
+    // SRIDs that are either out of range or correspond to non-geographic 
SRSes (e.g. 0, 3857).
+    for (int invalidGeographySrid : new int[]{-9999, -2, -1, 0, 1, 2, 3857, 
9999}) {
+      SparkIllegalArgumentException exception = 
assertThrows(SparkIllegalArgumentException.class,
+              () -> STUtils.stGeogFromWKB(testWkb, invalidGeographySrid));
+      assertEquals("ST_INVALID_SRID_VALUE", exception.getCondition());
+      assertTrue(exception.getMessage().contains("value: " + 
invalidGeographySrid + "."));
+    }
+  }
+
   // ST_GeomFromWKB
   @Test
   void testStGeomFromWKBNoSrid() {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
index a8be90951289..dc821f6c5710 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/WKBConverterStrategy.java
@@ -46,6 +46,6 @@ enum WKBToGeographyConverter implements WKBConverterStrategy {
 
   @Override
   public byte[] convert(byte[] wkb, int srid) {
-    return STUtils.stGeogFromWKB(wkb).getBytes();
+    return STUtils.stGeogFromWKB(wkb, srid).getBytes();
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index f253cbd0d0d3..50cc104eb8f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -415,8 +415,8 @@ private[parquet] class ParquetRowConverter(
       case geom: GeometryType =>
         new ParquetGeometryConverter(geom.srid, updater)
 
-      case _: GeographyType =>
-        new ParquetGeographyConverter(updater)
+      case geog: GeographyType =>
+        new ParquetGeographyConverter(geog.srid, updater)
 
       // As long as the parquet type is INT64 timestamp, whether logical 
annotation
       // `isAdjustedToUTC` is false or true, it will be read as Spark's 
TimestampLTZ type
@@ -619,7 +619,7 @@ private[parquet] class ParquetRowConverter(
   }
 
   /**
-   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   * Parquet converter for geometries. A dictionary is used to minimize WKB 
decoding cost.
    */
   private final class ParquetGeometryConverter(srid: Int, updater: 
ParentContainerUpdater)
       extends ParquetPrimitiveConverter(updater) {
@@ -655,9 +655,9 @@ private[parquet] class ParquetRowConverter(
   }
 
   /**
-   * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
+   * Parquet converter for geographies. A dictionary is used to minimize WKB 
decoding cost.
    */
-  private final class ParquetGeographyConverter(updater: 
ParentContainerUpdater)
+  private final class ParquetGeographyConverter(srid: Int, updater: 
ParentContainerUpdater)
       extends ParquetPrimitiveConverter(updater) {
 
     private var expandedDictionary: Array[GeographyVal] = null
@@ -666,7 +666,7 @@ private[parquet] class ParquetRowConverter(
 
     override def setDictionary(dictionary: Dictionary): Unit = {
       this.expandedDictionary = Array.tabulate(dictionary.getMaxId + 1) { i =>
-        STUtils.stGeogFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe)
+        STUtils.stGeogFromWKB(dictionary.decodeToBinary(i).getBytesUnsafe, 
srid)
       }
     }
 
@@ -678,15 +678,15 @@ private[parquet] class ParquetRowConverter(
       val buffer = value.toByteBuffer
       val numBytes = buffer.remaining()
 
-      val geometry = if (buffer.hasArray) {
+      val geography = if (buffer.hasArray) {
         val array = buffer.array()
         val offset = buffer.arrayOffset() + buffer.position()
-        STUtils.stGeogFromWKB(array.slice(offset, offset + numBytes))
+        STUtils.stGeogFromWKB(array.slice(offset, offset + numBytes), srid)
       } else {
-        STUtils.stGeogFromWKB(value.getBytesUnsafe)
+        STUtils.stGeogFromWKB(value.getBytesUnsafe, srid)
       }
 
-      updater.set(geometry)
+      updater.set(geography)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
index 6a9b4978e27b..973a767144b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala
@@ -245,7 +245,8 @@ object EvaluatePython {
         val geographySrid = s.get("srid").asInstanceOf[Int]
         g.assertSridAllowedForType(geographySrid)
         STUtils.stGeogFromWKB(
-          s.get("wkb").asInstanceOf[Array[Byte]])
+          s.get("wkb").asInstanceOf[Array[Byte]],
+          geographySrid)
     }
 
     case g: GeometryType => (obj: Any) => nullSafeConvert(obj) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
index c3e9af54d431..52001c2a021d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala
@@ -105,15 +105,23 @@ class ArrowWriterSuite extends SparkFunSuite {
     }
 
     val geographies = wkbs.map(x => InternalGeography.fromWkb(x, 
4326).getValue)
+    val geographies4267 = wkbs.map(x => InternalGeography.fromWkb(x, 
4267).getValue)
+    val geographies4269 = wkbs.map(x => InternalGeography.fromWkb(x, 
4269).getValue)
     val geometries = wkbs.map(x => InternalGeometry.fromWkb(x, 0).getValue)
     val mixedGeometries = wkbs.zip(Seq(0, 4326)).map {
       case (g, srid) => InternalGeometry.fromWkb(g, srid).getValue
     }
+    val mixedGeographies = wkbs.zip(Seq(4267, 4269)).map {
+      case (g, srid) => InternalGeography.fromWkb(g, srid).getValue
+    }
 
     check(GeometryType(0), geometries)
     check(GeographyType(4326), geographies)
+    check(GeographyType(4267), geographies4267)
+    check(GeographyType(4269), geographies4269)
     check(GeometryType("ANY"), mixedGeometries)
     check(GeographyType("ANY"), geographies)
+    check(GeographyType("ANY"), mixedGeographies)
     check(BooleanType, Seq(true, null, false))
     check(ByteType, Seq(1.toByte, 2.toByte, null, 4.toByte))
     check(ShortType, Seq(1.toShort, 2.toShort, null, 4.toShort))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
index 107b5b7675b1..057f722f215c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetGeoSuite.scala
@@ -22,8 +22,9 @@ import java.io.File
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.functions.{st_asbinary, st_geogfromwkb, 
st_geomfromwkb}
+import org.apache.spark.sql.functions.{st_asbinary, st_geogfromwkb, 
st_geomfromwkb, st_srid}
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{Geography, GeographyType, StructField, 
StructType}
 
 class ParquetGeoSuite
     extends ParquetCompatibilityTest
@@ -98,6 +99,32 @@ class ParquetGeoSuite
     testReadWrite(Seq(point1Wkb, line1Wkb, makePolygonWkb()))
   }
 
+  test("geography preserves non-default SRID through Parquet round-trip") {
+    // Geography supports a variety of geographic SRIDs beyond the default 
4326. Verify that the
+    // SRID is preserved when written to and read back from Parquet, across 
the row-based reader
+    // (ParquetGeographyConverter) and the vectorized reader 
(WKBToGeographyConverter).
+    Seq(4267, 4269, 4326).foreach { srid =>
+      withTempDir { dir =>
+        val schema = StructType(Seq(
+          StructField("geog", GeographyType(srid), nullable = true)))
+        val wkbValues = Seq(point0Wkb, point1Wkb, line0Wkb)
+        val rdd = sparkContext.parallelize(
+          wkbValues.map(wkb => Row(Geography.fromWKB(wkb, srid))))
+        val df = spark.createDataFrame(rdd, schema)
+        withAllParquetWriters {
+          df.write.mode("overwrite").parquet(dir.getAbsolutePath)
+          withAllParquetReaders {
+            // Verify both the WKB payload and the SRID round-trip correctly.
+            checkAnswer(
+              spark.read.parquet(dir.getAbsolutePath)
+                .select(st_asbinary($"geog"), st_srid($"geog")),
+              wkbValues.map(wkb => Row(wkb, srid)))
+          }
+        }
+      }
+    }
+  }
+
   test("dictionary encoding") {
     val wkbValues = Seq(
       point0Wkb,
@@ -119,4 +146,36 @@ class ParquetGeoSuite
       }
     }
   }
+
+  test("geography preserves non-default SRID with dictionary encoding") {
+    // Force dictionary encoding by repeating a small number of values many 
times. This exercises
+    // the setDictionary path of ParquetGeographyConverter and the dictionary 
path of
+    // WKBToGeographyConverter, both of which must materialize geographies 
with the column's SRID.
+    val srid = 4267
+    val wkbValues = Seq(point0Wkb, point1Wkb, line0Wkb)
+    val repeatedWkbs = List.fill(10000)(wkbValues).flatten
+    val schema = StructType(Seq(
+      StructField("geog", GeographyType(srid), nullable = true)))
+    val rdd = sparkContext.parallelize(
+      repeatedWkbs.map(wkb => Row(Geography.fromWKB(wkb, srid))))
+    val df = spark.createDataFrame(rdd, schema)
+
+    Seq(true, false).foreach { useDictionary =>
+      withSQLConf(ParquetOutputFormat.ENABLE_DICTIONARY -> 
useDictionary.toString) {
+        withTempDir { dir =>
+          withAllParquetWriters {
+            df.write.mode("overwrite").parquet(dir.getAbsolutePath)
+            withAllParquetReaders {
+              // Aggregate-style assertion to keep the comparison cheap on 30K 
rows: every row
+              // should round-trip with the original SRID.
+              val readBack = spark.read.parquet(dir.getAbsolutePath)
+                .select(st_srid($"geog").as("srid"))
+              assert(readBack.count() === repeatedWkbs.length)
+              assert(readBack.where($"srid" =!= srid).count() === 0)
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/EvaluatePythonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/EvaluatePythonSuite.scala
new file mode 100644
index 000000000000..aee19587dcaa
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/EvaluatePythonSuite.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, 
SparkRuntimeException}
+import org.apache.spark.sql.catalyst.util.STUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.{GeographyVal, GeometryVal}
+
+class EvaluatePythonSuite extends SparkFunSuite {
+
+  // POINT(1 2) in WKB, little-endian.
+  private val pointWkb: Array[Byte] = 
"010100000000000000000031400000000000001C40"
+    .grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
+
+  private def pyGeo(srid: Int, wkb: Array[Byte]): java.util.HashMap[String, 
Any] = {
+    val m = new java.util.HashMap[String, Any]()
+    m.put("srid", srid)
+    m.put("wkb", wkb)
+    m
+  }
+
+  // ----- GeographyType -----
+
+  test("makeFromJava(GeographyType): preserves per-row SRID for fixed-SRID 
columns") {
+    // Geography supports a variety of geographic SRIDs beyond the default 
4326. Ensure that the
+    // SRID is preserved on the Python -> Catalyst conversion path.
+    Seq(4267, 4269, 4326, 4612, 37001, 104030).foreach { srid =>
+      val convert = EvaluatePython.makeFromJava(GeographyType(srid))
+      val result = convert(pyGeo(srid, pointWkb))
+      assert(result.isInstanceOf[GeographyVal])
+      assert(STUtils.stSrid(result.asInstanceOf[GeographyVal]) === srid)
+    }
+  }
+
+  test("makeFromJava(GeographyType ANY): preserves per-row SRID for mixed-SRID 
columns") {
+    val convert = EvaluatePython.makeFromJava(GeographyType("ANY"))
+    Seq(4267, 4269, 4326).foreach { srid =>
+      val result = convert(pyGeo(srid, pointWkb))
+      assert(result.isInstanceOf[GeographyVal])
+      assert(STUtils.stSrid(result.asInstanceOf[GeographyVal]) === srid)
+    }
+  }
+
+  test("makeFromJava(GeographyType): rejects SRID mismatch on a fixed-SRID 
column") {
+    val convert = EvaluatePython.makeFromJava(GeographyType(4326))
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        convert(pyGeo(4267, pointWkb))
+      },
+      condition = "GEO_ENCODER_SRID_MISMATCH_ERROR",
+      parameters = Map("type" -> "GEOGRAPHY", "valueSrid" -> "4267", 
"typeSrid" -> "4326"))
+  }
+
+  test("makeFromJava(GeographyType ANY): rejects non-geographic SRID") {
+    val convert = EvaluatePython.makeFromJava(GeographyType("ANY"))
+    // SRID 0 is not a geographic SRID; even mixed-SRID columns must reject it.
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        convert(pyGeo(0, pointWkb))
+      },
+      condition = "ST_INVALID_SRID_VALUE",
+      parameters = Map("srid" -> "0"))
+    // SRID 3857 is a valid Cartesian SRID but not geographic.
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        convert(pyGeo(3857, pointWkb))
+      },
+      condition = "ST_INVALID_SRID_VALUE",
+      parameters = Map("srid" -> "3857"))
+  }
+
+  test("makeFromJava(GeographyType): null is preserved") {
+    val convert = EvaluatePython.makeFromJava(GeographyType(4326))
+    assert(convert(null) === null)
+  }
+
+  // ----- GeometryType -----
+
+  test("makeFromJava(GeometryType): preserves per-row SRID for fixed-SRID 
columns") {
+    // Geometry supports both the default SRID 0 and a variety of 
Cartesian/geographic SRIDs.
+    Seq(0, 3857, 4267, 4269, 4326, 32601, 102964).foreach { srid =>
+      val convert = EvaluatePython.makeFromJava(GeometryType(srid))
+      val result = convert(pyGeo(srid, pointWkb))
+      assert(result.isInstanceOf[GeometryVal])
+      assert(STUtils.stSrid(result.asInstanceOf[GeometryVal]) === srid)
+    }
+  }
+
+  test("makeFromJava(GeometryType ANY): preserves per-row SRID for mixed-SRID 
columns") {
+    val convert = EvaluatePython.makeFromJava(GeometryType("ANY"))
+    Seq(0, 3857, 4267, 4269, 4326).foreach { srid =>
+      val result = convert(pyGeo(srid, pointWkb))
+      assert(result.isInstanceOf[GeometryVal])
+      assert(STUtils.stSrid(result.asInstanceOf[GeometryVal]) === srid)
+    }
+  }
+
+  test("makeFromJava(GeometryType): rejects SRID mismatch on a fixed-SRID 
column") {
+    val convert = EvaluatePython.makeFromJava(GeometryType(0))
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        convert(pyGeo(4326, pointWkb))
+      },
+      condition = "GEO_ENCODER_SRID_MISMATCH_ERROR",
+      parameters = Map("type" -> "GEOMETRY", "valueSrid" -> "4326", "typeSrid" 
-> "0"))
+  }
+
+  test("makeFromJava(GeometryType ANY): rejects unsupported SRID") {
+    val convert = EvaluatePython.makeFromJava(GeometryType("ANY"))
+    // SRID 1 is not a registered SRID, so even mixed-SRID columns must reject 
it.
+    checkError(
+      exception = intercept[SparkIllegalArgumentException] {
+        convert(pyGeo(1, pointWkb))
+      },
+      condition = "ST_INVALID_SRID_VALUE",
+      parameters = Map("srid" -> "1"))
+  }
+
+  test("makeFromJava(GeometryType): null is preserved") {
+    val convert = EvaluatePython.makeFromJava(GeometryType(0))
+    assert(convert(null) === null)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to