This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8430dbfd9482 [SPARK-53961][SQL][TESTS] Fix `FileStreamSinkSuite`
flakiness by using `walkFileTree` instead of `walk`
8430dbfd9482 is described below
commit 8430dbfd948212c5b86703e9bce0ab2013ef5d01
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Oct 20 20:25:18 2025 -0700
[SPARK-53961][SQL][TESTS] Fix `FileStreamSinkSuite` flakiness by using
`walkFileTree` instead of `walk`
### What changes were proposed in this pull request?
This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree`
instead of `walk`.
### Why are the changes needed?
`Files.walk` is flaky like the following when the directory has a race
condition. `walkFileTree` has more robust error handling.
https://github.com/apache/spark/blob/2bb73fbdeb19f0a972786d3ea33d3263bf84ab66/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547
```
[info] - cleanup complete but invalid output for aborted job *** FAILED ***
(438 milliseconds)
[info] java.io.UncheckedIOException:
java.nio.file.NoSuchFileException:
***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output
#output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
[info] at
java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
[info] at
java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
[info] at
java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
[info] at
java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
[info] at
java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
[info] at
java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
[info] at
java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
[info] at
java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
[info] at
scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
[info] at
scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
[info] at
scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
[info] at scala.collection.mutable.Growable.addAll(Growable.scala:61)
[info] at
scala.collection.mutable.Growable.addAll$(Growable.scala:57)
[info] at
scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
[info] at scala.collection.immutable.Set$.from(Set.scala:362)
[info] at
scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
[info] at
scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
[info] at
scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
[info] at
org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)
```
### Does this PR introduce _any_ user-facing change?
No, this is a test case change.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52671 from dongjoon-hyun/SPARK-53961.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/streaming/FileStreamSinkSuite.scala | 24 ++++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index b0aa71a7e1b3..4c06a0109e34 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
import java.io.{File, IOException}
import java.nio.file.{Files, Paths}
+import java.nio.file.attribute.BasicFileAttributes
import java.util.Locale
import scala.collection.mutable.ArrayBuffer
@@ -534,10 +535,25 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
import PendingCommitFilesTrackingManifestFileCommitProtocol._
- val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
- .filter(_.toString.endsWith(".parquet"))
- .map(_.getFileName.toString)
- .toSet
+ import java.nio.file.{Path, _}
+ val outputFileNames = scala.collection.mutable.Set.empty[String]
+ Files.walkFileTree(
+ outputDir.toPath,
+ new SimpleFileVisitor[Path] {
+ override def visitFile(file: Path, attrs: BasicFileAttributes):
FileVisitResult = {
+ val fileName = file.getFileName.toString
+ if (fileName.endsWith(".parquet")) outputFileNames += fileName
+ FileVisitResult.CONTINUE
+ }
+ override def visitFileFailed(file: Path, exc: IOException):
FileVisitResult = {
+ exc match {
+ case _: NoSuchFileException =>
+ FileVisitResult.CONTINUE
+ case _ =>
+ FileVisitResult.TERMINATE
+ }
+ }
+ })
val trackingFileNames =
tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet
// there would be possible to have race condition:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]