jiayuasu commented on code in PR #2783:
URL: https://github.com/apache/sedona/pull/2783#discussion_r2993000410
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -50,6 +50,11 @@ case class OsmPartitionReader(
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ f.close()
+ return Iterator.empty
+ }
+
f.seek(file.start + offset)
new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
Review Comment:
Fixed in e73a76a. The returned iterator now wraps the inner PbfIterator and
closes the FSDataInputStream when `hasNext` returns false.
##########
spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala:
##########
@@ -339,6 +339,24 @@ class OsmReaderTest extends TestBaseScala with Matchers {
fieldNames should contain("visible")
}
+ it("should handle file splits where last partition has no block boundary
(GH-2781)") {
+ val originalMaxPartBytes =
sparkSession.conf.get("spark.sql.files.maxPartitionBytes")
+ try {
+ // Force small splits so the last partition starts inside the final
PBF block,
+ // where no OSMData header exists. Without the fix, this causes
EOFException.
+ sparkSession.conf.set("spark.sql.files.maxPartitionBytes", "100000")
+
+ val cnt = sparkSession.read
+ .format("osmpbf")
+ .load(monacoPath)
+ .count()
+
+ assert(cnt > 0)
+ } finally {
+ sparkSession.conf.set("spark.sql.files.maxPartitionBytes",
originalMaxPartBytes)
Review Comment:
Fixed in e73a76a. Switched to `withConf`.
##########
spark/common/src/test/scala/org/apache/sedona/sql/OsmReaderTest.scala:
##########
@@ -339,6 +339,24 @@ class OsmReaderTest extends TestBaseScala with Matchers {
fieldNames should contain("visible")
}
+ it("should handle file splits where last partition has no block boundary
(GH-2781)") {
+ val originalMaxPartBytes =
sparkSession.conf.get("spark.sql.files.maxPartitionBytes")
+ try {
+ // Force small splits so the last partition starts inside the final
PBF block,
+ // where no OSMData header exists. Without the fix, this causes
EOFException.
+ sparkSession.conf.set("spark.sql.files.maxPartitionBytes", "100000")
+
+ val cnt = sparkSession.read
+ .format("osmpbf")
+ .load(monacoPath)
+ .count()
+
Review Comment:
Fixed in e73a76a. Added `assert(df.rdd.getNumPartitions > 1)` before the
count assertion.
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -50,6 +50,11 @@ case class OsmPartitionReader(
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ f.close()
+ return Iterator.empty
Review Comment:
Fixed in e73a76a. Moved `fs.open()` after the offset check so the stream is
only opened when there is data to read. The early return for `offset < 0` no
longer needs to close anything.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]