jiayuasu commented on code in PR #2783:
URL: https://github.com/apache/sedona/pull/2783#discussion_r2993141708
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -46,16 +46,30 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ return Iterator.empty
+ }
Review Comment:
Fixed in 551fb9f. Replaced `return` with `if/else` expression.
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -46,16 +46,30 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ return Iterator.empty
+ }
+
+ val f = fs.open(status.getPath)
f.seek(file.start + offset)
- new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
- record => {
- resolveEntity(record, requiredSchema)
- })
+ val iter =
+ new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
+ record => {
+ resolveEntity(record, requiredSchema)
+ })
+
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = {
+ val has = iter.hasNext
+ if (!has) f.close()
+ has
+ }
+ override def next(): InternalRow = iter.next()
+ }
Review Comment:
Fixed in 551fb9f. Added a `TaskCompletionListener` to close the stream when
the task ends, covering cancellation, speculative execution, and LIMIT
scenarios. A `closeIfNeeded` guard prevents double-close since both iterator
exhaustion and task completion can trigger it.
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -46,16 +46,30 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ return Iterator.empty
+ }
+
+ val f = fs.open(status.getPath)
f.seek(file.start + offset)
- new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
- record => {
- resolveEntity(record, requiredSchema)
- })
+ val iter =
+ new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
+ record => {
+ resolveEntity(record, requiredSchema)
+ })
+
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = {
+ val has = iter.hasNext
+ if (!has) f.close()
+ has
Review Comment:
The `TaskCompletionListener` added for comment #2 already covers this — it
fires on task completion regardless of success or failure, so exception paths
are handled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]