Copilot commented on code in PR #2783:
URL: https://github.com/apache/sedona/pull/2783#discussion_r2986677288
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -50,6 +50,11 @@ case class OsmPartitionReader(
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ f.close()
+ return Iterator.empty
+ }
+
f.seek(file.start + offset)
new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
Review Comment:
`f` is only closed when `offset < 0`, but in the normal path the
`FSDataInputStream` opened here is never closed after iteration completes. This
can leak file descriptors / remote connections across partitions. Consider
wrapping the returned iterator so the stream is closed on task completion /
iterator exhaustion (e.g., via a completion listener) and also on exceptions
during iterator construction.
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -50,6 +50,11 @@ case class OsmPartitionReader(
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ f.close()
+ return Iterator.empty
Review Comment:
`apply` opens `f` before computing `offset`, but `findOffset` opens its own
stream as well. In the `offset < 0` case (and generally), this results in two
opens per partition. Consider computing the offset first (or reusing the same
seekable stream) and only opening the main stream if you’ll actually read data,
to reduce filesystem/S3 overhead.
##########
spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala:
##########
@@ -339,6 +339,24 @@ class OsmReaderTest extends TestBaseScala with Matchers {
fieldNames should contain("visible")
}
+ it("should handle file splits where last partition has no block boundary
(GH-2781)") {
+ val originalMaxPartBytes =
sparkSession.conf.get("spark.sql.files.maxPartitionBytes")
+ try {
+ // Force small splits so the last partition starts inside the final
PBF block,
+ // where no OSMData header exists. Without the fix, this causes
EOFException.
+ sparkSession.conf.set("spark.sql.files.maxPartitionBytes", "100000")
+
+ val cnt = sparkSession.read
+ .format("osmpbf")
+ .load(monacoPath)
+ .count()
+
Review Comment:
The test currently doesn’t assert that the file was actually split into
multiple partitions under the reduced `maxPartitionBytes`. Adding an assertion
on the resulting DataFrame/RDD partition count (e.g., `> 1`) would make the
regression test more robust and ensure it exercises the split-boundary scenario.
```suggestion
val df = sparkSession.read
.format("osmpbf")
.load(monacoPath)
// Ensure the file was actually split into multiple partitions under
the reduced
// maxPartitionBytes so that we exercise the split-boundary scenario.
val partitionCount = df.rdd.getNumPartitions
assert(partitionCount > 1)
val cnt = df.count()
```
##########
spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala:
##########
@@ -339,6 +339,24 @@ class OsmReaderTest extends TestBaseScala with Matchers {
fieldNames should contain("visible")
}
+ it("should handle file splits where last partition has no block boundary
(GH-2781)") {
+ val originalMaxPartBytes =
sparkSession.conf.get("spark.sql.files.maxPartitionBytes")
+ try {
+ // Force small splits so the last partition starts inside the final
PBF block,
+ // where no OSMData header exists. Without the fix, this causes
EOFException.
+ sparkSession.conf.set("spark.sql.files.maxPartitionBytes", "100000")
+
+ val cnt = sparkSession.read
+ .format("osmpbf")
+ .load(monacoPath)
+ .count()
+
+ assert(cnt > 0)
+ } finally {
+ sparkSession.conf.set("spark.sql.files.maxPartitionBytes",
originalMaxPartBytes)
Review Comment:
This test manually saves/restores Spark conf via `get`/`set`. The test base
already provides `withConf(...)`, which also handles the case where the conf
key was originally unset. Using it here would reduce boilerplate and avoid
leaving mutated conf behind if additional keys are added later.
```suggestion
withConf("spark.sql.files.maxPartitionBytes" -> "100000") {
// Force small splits so the last partition starts inside the final
PBF block,
// where no OSMData header exists. Without the fix, this causes
EOFException.
val cnt = sparkSession.read
.format("osmpbf")
.load(monacoPath)
.count()
assert(cnt > 0)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]