Copilot commented on code in PR #2783:
URL: https://github.com/apache/sedona/pull/2783#discussion_r2986677288


##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -50,6 +50,11 @@ case class OsmPartitionReader(
 
     val offset = findOffset(fs, status, file.start)
 
+    if (offset < 0) {
+      f.close()
+      return Iterator.empty
+    }
+
     f.seek(file.start + offset)
 
     new PbfIterator(new StartEndStream(f, (file.length - offset) + 
HEADER_SIZE_LENGTH)).map(

Review Comment:
   `f` is only closed when `offset < 0`, but in the normal path the 
`FSDataInputStream` opened here is never closed after iteration completes. This 
can leak file descriptors / remote connections across partitions. Consider 
wrapping the returned iterator so the stream is closed on task completion / 
iterator exhaustion (e.g., via a completion listener) and also on exceptions 
during iterator construction.



##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -50,6 +50,11 @@ case class OsmPartitionReader(
 
     val offset = findOffset(fs, status, file.start)
 
+    if (offset < 0) {
+      f.close()
+      return Iterator.empty

Review Comment:
   `apply` opens `f` before computing `offset`, but `findOffset` opens its own 
stream as well. In the `offset < 0` case (and generally), this results in two 
opens per partition. Consider computing the offset first (or reusing the same 
seekable stream) and only opening the main stream if you’ll actually read data, 
to reduce filesystem/S3 overhead.



##########
spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala:
##########
@@ -339,6 +339,24 @@ class OsmReaderTest extends TestBaseScala with Matchers {
       fieldNames should contain("visible")
     }
 
+    it("should handle file splits where last partition has no block boundary 
(GH-2781)") {
+      val originalMaxPartBytes = 
sparkSession.conf.get("spark.sql.files.maxPartitionBytes")
+      try {
+        // Force small splits so the last partition starts inside the final 
PBF block,
+        // where no OSMData header exists. Without the fix, this causes 
EOFException.
+        sparkSession.conf.set("spark.sql.files.maxPartitionBytes", "100000")
+
+        val cnt = sparkSession.read
+          .format("osmpbf")
+          .load(monacoPath)
+          .count()
+

Review Comment:
   The test currently doesn’t assert that the file was actually split into 
multiple partitions under the reduced `maxPartitionBytes`. Adding an assertion 
on the resulting DataFrame/RDD partition count (e.g., `> 1`) would make the 
regression test more robust and ensure it exercises the split-boundary scenario.
   ```suggestion
           val df = sparkSession.read
             .format("osmpbf")
             .load(monacoPath)
   
           // Ensure the file was actually split into multiple partitions under 
the reduced
           // maxPartitionBytes so that we exercise the split-boundary scenario.
           val partitionCount = df.rdd.getNumPartitions
           assert(partitionCount > 1)
   
           val cnt = df.count()
   ```



##########
spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala:
##########
@@ -339,6 +339,24 @@ class OsmReaderTest extends TestBaseScala with Matchers {
       fieldNames should contain("visible")
     }
 
+    it("should handle file splits where last partition has no block boundary 
(GH-2781)") {
+      val originalMaxPartBytes = 
sparkSession.conf.get("spark.sql.files.maxPartitionBytes")
+      try {
+        // Force small splits so the last partition starts inside the final 
PBF block,
+        // where no OSMData header exists. Without the fix, this causes 
EOFException.
+        sparkSession.conf.set("spark.sql.files.maxPartitionBytes", "100000")
+
+        val cnt = sparkSession.read
+          .format("osmpbf")
+          .load(monacoPath)
+          .count()
+
+        assert(cnt > 0)
+      } finally {
+        sparkSession.conf.set("spark.sql.files.maxPartitionBytes", 
originalMaxPartBytes)

Review Comment:
   This test manually saves/restores Spark conf via `get`/`set`. The test base 
already provides `withConf(...)`, which also handles the case where the conf 
key was originally unset. Using it here would reduce boilerplate and avoid 
leaving mutated conf behind if additional keys are added later.
   ```suggestion
         withConf("spark.sql.files.maxPartitionBytes" -> "100000") {
           // Force small splits so the last partition starts inside the final 
PBF block,
           // where no OSMData header exists. Without the fix, this causes 
EOFException.
           val cnt = sparkSession.read
             .format("osmpbf")
             .load(monacoPath)
             .count()
   
           assert(cnt > 0)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to