Copilot commented on code in PR #2783:
URL: https://github.com/apache/sedona/pull/2783#discussion_r2993071074
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -46,16 +46,30 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ return Iterator.empty
+ }
Review Comment:
`return` in Scala is generally discouraged (non-idiomatic and can be harder
to reason about). This can be expressed as an expression-based branch (e.g.,
`if/else`) returning either `Iterator.empty` or the iterator constructed from
the stream.
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -46,16 +46,30 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ return Iterator.empty
+ }
+
+ val f = fs.open(status.getPath)
f.seek(file.start + offset)
- new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
- record => {
- resolveEntity(record, requiredSchema)
- })
+ val iter =
+ new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
+ record => {
+ resolveEntity(record, requiredSchema)
+ })
+
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = {
+ val has = iter.hasNext
+ if (!has) f.close()
+ has
+ }
+ override def next(): InternalRow = iter.next()
+ }
Review Comment:
The FS stream is only closed when the iterator is fully exhausted (`hasNext`
becomes false). In Spark, partitions may be abandoned early (task cancellation,
speculative execution, downstream limit/take-like behavior), which can leak
file descriptors until GC. Prefer registering a `TaskContext` completion
listener to close `f`, or use a managed/closeable iterator pattern that
guarantees closure on task completion.
##########
spark/common/src/main/scala/org/apache/sedona/sql/datasources/osm/OsmPartitionReader.scala:
##########
@@ -46,16 +46,30 @@ case class OsmPartitionReader(
val path = new Path(new URI(file.filePath.toString()))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
- val f = fs.open(status.getPath)
val offset = findOffset(fs, status, file.start)
+ if (offset < 0) {
+ return Iterator.empty
+ }
+
+ val f = fs.open(status.getPath)
f.seek(file.start + offset)
- new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
- record => {
- resolveEntity(record, requiredSchema)
- })
+ val iter =
+ new PbfIterator(new StartEndStream(f, (file.length - offset) +
HEADER_SIZE_LENGTH)).map(
+ record => {
+ resolveEntity(record, requiredSchema)
+ })
+
+ new Iterator[InternalRow] {
+ override def hasNext: Boolean = {
+ val has = iter.hasNext
+ if (!has) f.close()
+ has
Review Comment:
If `iter.hasNext` throws (I/O error, parsing error), `f.close()` will not
run, which can leak the stream in exception paths. Wrap `iter.hasNext` in a
`try`/`catch`/`finally` (or equivalent managed resource pattern) so `f` is
closed when iteration fails.
```suggestion
try {
val has = iter.hasNext
if (!has) f.close()
has
} catch {
case t: Throwable =>
try {
f.close()
} catch {
case _: Throwable => // suppress close exceptions
}
throw t
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]